You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:25 UTC

[04/14] flink git commit: [FLINK-8349] [flip6] Remove Yarn specific commands from YarnClusterDescriptor

[FLINK-8349] [flip6] Remove Yarn specific commands from YarnClusterDescriptor

Remove Yarn specific commands from YarnClusterDescriptor. This is a preparational
step to make the FlinkYarnSessionCli work with the Flip-6 RestClusterClient.

This closes #5229.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/402499f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/402499f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/402499f0

Branch: refs/heads/master
Commit: 402499f06a4b590ac47df64ecc01055c06b0399b
Parents: 10e900b
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 11 23:25:58 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 16:14:03 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |   9 +
 .../client/deployment/ClusterDescriptor.java    |   9 +
 .../Flip6StandaloneClusterDescriptor.java       |   6 +
 .../deployment/StandaloneClusterDescriptor.java |   6 +
 .../flink/client/program/ClusterClient.java     |  13 +-
 .../client/program/StandaloneClusterClient.java |   4 -
 .../client/program/rest/RestClusterClient.java  |   7 +-
 .../client/cli/util/DummyClusterDescriptor.java |   6 +
 .../YARNSessionCapacitySchedulerITCase.java     |  24 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   9 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |  59 ++-
 .../yarn/AbstractYarnClusterDescriptor.java     |  21 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 155 +------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 415 ++++++++++---------
 .../yarn/cli/YarnApplicationStatusMonitor.java  |   3 +-
 15 files changed, 352 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 453d086..847a5f8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -232,6 +232,15 @@ public class CliFrontend {
 
 				executeProgram(program, client, userParallelism);
 			} finally {
+				if (clusterId == null && !client.isDetached()) {
+					// terminate the cluster only if we have started it before and if it's not detached
+					try {
+						clusterDescriptor.terminateCluster(client.getClusterIdentifier());
+					} catch (FlinkException e) {
+						LOG.info("Could not properly terminate the Flink cluster.", e);
+					}
+				}
+
 				try {
 					client.shutdown();
 				} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index 07eeabc..b1f566c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.deployment;
 
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
 
 /**
  * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
@@ -59,4 +60,12 @@ public interface ClusterDescriptor extends AutoCloseable {
 	ClusterClient deployJobCluster(
 		final ClusterSpecification clusterSpecification,
 		final JobGraph jobGraph);
+
+	/**
+	 * Terminates the cluster with the given cluster id.
+	 *
+	 * @param clusterId identifying the cluster to shut down
+	 * @throws FlinkException if the cluster could not be terminated
+	 */
+	void terminateCluster(String clusterId) throws FlinkException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index a35a68b..70fd9f7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -62,6 +63,11 @@ public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor {
 	}
 
 	@Override
+	public void terminateCluster(String clusterId) throws FlinkException {
+		throw new UnsupportedOperationException("Cannot terminate a standalone Flip-6 cluster.");
+	}
+
+	@Override
 	public void close() throws Exception {
 		// nothing to do
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index ae25194..5638232 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
 
 /**
  * A deployment descriptor for an existing cluster.
@@ -61,6 +62,11 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor {
 	}
 
 	@Override
+	public void terminateCluster(String clusterId) throws FlinkException {
+		throw new UnsupportedOperationException("Cannot terminate a standalone cluster.");
+	}
+
+	@Override
 	public void close() throws Exception {
 		// nothing to do
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 7c05e3e..efa23fb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -262,11 +262,7 @@ public abstract class ClusterClient {
 	 */
 	public void shutdown() throws Exception {
 		synchronized (this) {
-			try {
-				finalizeCluster();
-			} finally {
-				actorSystemLoader.shutdown();
-			}
+			actorSystemLoader.shutdown();
 
 			if (highAvailabilityServices != null) {
 				highAvailabilityServices.close();
@@ -938,7 +934,7 @@ public abstract class ClusterClient {
 	 * May return new messages from the cluster.
 	 * Messages can be for example about failed containers or container launch requests.
 	 */
-	protected abstract List<String> getNewMessages();
+	public abstract List<String> getNewMessages();
 
 	/**
 	 * Returns a string representation of the cluster.
@@ -946,11 +942,6 @@ public abstract class ClusterClient {
 	public abstract String getClusterIdentifier();
 
 	/**
-	 * Request the cluster to shut down or disconnect.
-	 */
-	protected abstract void finalizeCluster();
-
-	/**
 	 * Set the mode of this client (detached or blocking job execution).
 	 * @param isDetached If true, the client will submit programs detached via the {@code run} method
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 10e1bdd..1782a25 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -105,8 +105,4 @@ public class StandaloneClusterClient extends ClusterClient {
 			return super.run(jobGraph, classLoader);
 		}
 	}
-
-	@Override
-	protected void finalizeCluster() {}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 6b21cff..a6bff1a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -329,12 +329,7 @@ public class RestClusterClient extends ClusterClient {
 	}
 
 	@Override
-	protected List<String> getNewMessages() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	protected void finalizeCluster() {
+	public List<String> getNewMessages() {
 		throw new UnsupportedOperationException();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index d300055..2bb3ed0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -56,6 +57,11 @@ public class DummyClusterDescriptor implements ClusterDescriptor {
 	}
 
 	@Override
+	public void terminateCluster(String clusterId) throws FlinkException {
+		throw new UnsupportedOperationException("DummyClusterDescriptor does not support cluster termination.");
+	}
+
+	@Override
 	public void close() {
 		// nothing to do
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 5bed22e..c806c5e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -74,6 +75,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.yarn.UtilsTest.addTestAppender;
 import static org.apache.flink.yarn.UtilsTest.checkForLogString;
 
@@ -98,7 +100,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 * Test regular operation, including command line parameter parsing.
 	 */
 	@Test
-	public void testClientStartup() {
+	public void testClientStartup() throws IOException {
 		LOG.info("Starting testClientStartup()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 						"-n", "1",
@@ -116,7 +118,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 * The parallelism is requested at the YARN client (-ys).
 	 */
 	@Test
-	public void perJobYarnCluster() {
+	public void perJobYarnCluster() throws IOException {
 		LOG.info("Starting perJobYarnCluster()");
 		addTestAppender(JobClient.class, Level.INFO);
 		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
@@ -145,7 +147,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 * memory remains.
 	 */
 	@Test
-	public void perJobYarnClusterOffHeap() {
+	public void perJobYarnClusterOffHeap() throws IOException {
 		LOG.info("Starting perJobYarnCluster()");
 		addTestAppender(JobClient.class, Level.INFO);
 		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
@@ -363,15 +365,19 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 * target queue. With an error message, we can help users identifying the issue)
 	 */
 	@Test
-	public void testNonexistingQueueWARNmessage() {
+	public void testNonexistingQueueWARNmessage() throws IOException {
 		LOG.info("Starting testNonexistingQueueWARNmessage()");
 		addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+		try {
+			runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 				"-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "1",
 				"-jm", "768",
 				"-tm", "1024",
 				"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
+		} catch (Exception e) {
+			assertTrue(ExceptionUtils.findThrowableWithMessage(e, "to unknown queue: doesntExist").isPresent());
+		}
 		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues");
 		LOG.info("Finished testNonexistingQueueWARNmessage()");
 	}
@@ -380,7 +386,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client.
 	 */
 	@Test
-	public void perJobYarnClusterWithParallelism() {
+	public void perJobYarnClusterWithParallelism() throws IOException {
 		LOG.info("Starting perJobYarnClusterWithParallelism()");
 		// write log messages to stdout as well, so that the runWithArgs() method
 		// is catching the log output
@@ -407,7 +413,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 * Test a fire-and-forget job submission to a YARN cluster.
 	 */
 	@Test(timeout = 60000)
-	public void testDetachedPerJobYarnCluster() {
+	public void testDetachedPerJobYarnCluster() throws IOException {
 		LOG.info("Starting testDetachedPerJobYarnCluster()");
 
 		File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
@@ -423,7 +429,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	 * Test a fire-and-forget job submission to a YARN cluster.
 	 */
 	@Test(timeout = 60000)
-	public void testDetachedPerJobYarnClusterWithStreamingJob() {
+	public void testDetachedPerJobYarnClusterWithStreamingJob() throws IOException {
 		LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
 
 		File exampleJarLocation = new File("target/programs/StreamingWordCount.jar");
@@ -435,7 +441,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
 	}
 
-	private void testDetachedPerJobYarnClusterInternal(String job) {
+	private void testDetachedPerJobYarnClusterInternal(String job) throws IOException {
 		YarnClient yc = YarnClient.createYarnClient();
 		yc.init(YARN_CONFIGURATION);
 		yc.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index cc26350..15fe355 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
@@ -80,7 +81,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	 * Test regular operation, including command line parameter parsing.
 	 */
 	@Test(timeout = 60000) // timeout after a minute.
-	public void testDetachedMode() throws InterruptedException {
+	public void testDetachedMode() throws InterruptedException, IOException {
 		LOG.info("Starting testDetachedMode()");
 		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
 		Runner runner =
@@ -158,7 +159,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	 * <p>This test validates through 666*2 cores in the "cluster".
 	 */
 	@Test
-	public void testQueryCluster() {
+	public void testQueryCluster() throws IOException {
 		LOG.info("Starting testQueryCluster()");
 		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
 		LOG.info("Finished testQueryCluster()");
@@ -178,7 +179,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	 */
 	@Ignore("The test is too resource consuming (8.5 GB of memory)")
 	@Test
-	public void testResourceComputation() {
+	public void testResourceComputation() throws IOException {
 		addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
 		LOG.info("Starting testResourceComputation()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
@@ -206,7 +207,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	 */
 	@Ignore("The test is too resource consuming (8 GB of memory)")
 	@Test
-	public void testfullAlloc() {
+	public void testfullAlloc() throws IOException {
 		addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
 		LOG.info("Starting testfullAlloc()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index ed02892..eeda32d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -61,6 +61,9 @@ import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -86,6 +89,7 @@ public abstract class YarnTestBase extends TestLogger {
 
 	protected static final PrintStream ORIGINAL_STDOUT = System.out;
 	protected static final PrintStream ORIGINAL_STDERR = System.err;
+	private static final InputStream ORIGINAL_STDIN = System.in;
 
 	protected static final String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name";
 
@@ -510,13 +514,18 @@ public abstract class YarnTestBase extends TestLogger {
 	/**
 	 * This method returns once the "startedAfterString" has been seen.
 	 */
-	protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) {
+	protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) throws IOException {
 		LOG.info("Running with args {}", Arrays.toString(args));
 
 		outContent = new ByteArrayOutputStream();
 		errContent = new ByteArrayOutputStream();
+		PipedOutputStream out = new PipedOutputStream();
+		PipedInputStream in = new PipedInputStream(out);
+		PrintStream stdinPrintStream = new PrintStream(out);
+
 		System.setOut(new PrintStream(outContent));
 		System.setErr(new PrintStream(errContent));
+		System.setIn(in);
 
 		final int startTimeoutSeconds = 60;
 
@@ -525,7 +534,8 @@ public abstract class YarnTestBase extends TestLogger {
 			flinkConfiguration,
 			CliFrontend.getConfigurationDirectoryFromEnv(),
 			type,
-			0);
+			0,
+			stdinPrintStream);
 		runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
 		runner.start();
 
@@ -539,7 +549,7 @@ public abstract class YarnTestBase extends TestLogger {
 			}
 			// check if thread died
 			if (!runner.isAlive()) {
-				sendOutput();
+				resetStreamsAndSendOutput();
 				if (runner.getRunnerError() != null) {
 					throw new RuntimeException("Runner failed with exception.", runner.getRunnerError());
 				}
@@ -547,13 +557,13 @@ public abstract class YarnTestBase extends TestLogger {
 			}
 		}
 
-		sendOutput();
+		resetStreamsAndSendOutput();
 		Assert.fail("During the timeout period of " + startTimeoutSeconds + " seconds the " +
 				"expected string did not show up");
 		return null;
 	}
 
-	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) {
+	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) throws IOException {
 		runWithArgs(args, terminateAfterString, failOnStrings, type, returnCode, false);
 	}
 
@@ -566,13 +576,17 @@ public abstract class YarnTestBase extends TestLogger {
 	 * @param expectedReturnValue Expected return code from the runner.
 	 * @param checkLogForTerminateString  If true, the runner checks also the log4j logger for the terminate string
 	 */
-	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnPatterns, RunTypes type, int expectedReturnValue, boolean checkLogForTerminateString) {
+	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnPatterns, RunTypes type, int expectedReturnValue, boolean checkLogForTerminateString) throws IOException {
 		LOG.info("Running with args {}", Arrays.toString(args));
 
 		outContent = new ByteArrayOutputStream();
 		errContent = new ByteArrayOutputStream();
+		PipedOutputStream out = new PipedOutputStream();
+		PipedInputStream in = new PipedInputStream(out);
+		PrintStream stdinPrintStream = new PrintStream(out);
 		System.setOut(new PrintStream(outContent));
 		System.setErr(new PrintStream(errContent));
+		System.setIn(in);
 
 		// we wait for at most three minutes
 		final int startTimeoutSeconds = 180;
@@ -583,11 +597,13 @@ public abstract class YarnTestBase extends TestLogger {
 			flinkConfiguration,
 			CliFrontend.getConfigurationDirectoryFromEnv(),
 			type,
-			expectedReturnValue);
+			expectedReturnValue,
+			stdinPrintStream);
 		runner.start();
 
 		boolean expectedStringSeen = false;
 		boolean testPassedFromLog4j = false;
+		long shutdownTimeout = 30000L;
 		do {
 			sleep(1000);
 			String outContentString = outContent.toString();
@@ -597,9 +613,15 @@ public abstract class YarnTestBase extends TestLogger {
 					Pattern pattern = Pattern.compile(failOnString);
 					if (pattern.matcher(outContentString).find() || pattern.matcher(errContentString).find()) {
 						LOG.warn("Failing test. Output contained illegal string '" + failOnString + "'");
-						sendOutput();
+						resetStreamsAndSendOutput();
 						// stopping runner.
 						runner.sendStop();
+						// wait for the thread to stop
+						try {
+							runner.join(shutdownTimeout);
+						} catch (InterruptedException e) {
+							LOG.warn("Interrupted while stopping runner", e);
+						}
 						Assert.fail("Output contained illegal string '" + failOnString + "'");
 					}
 				}
@@ -622,7 +644,7 @@ public abstract class YarnTestBase extends TestLogger {
 				runner.sendStop();
 				// wait for the thread to stop
 				try {
-					runner.join(30000);
+					runner.join(shutdownTimeout);
 				}
 				catch (InterruptedException e) {
 					LOG.warn("Interrupted while stopping runner", e);
@@ -639,7 +661,7 @@ public abstract class YarnTestBase extends TestLogger {
 		}
 		while (runner.getRunnerError() == null && !expectedStringSeen && System.currentTimeMillis() < deadline);
 
-		sendOutput();
+		resetStreamsAndSendOutput();
 
 		if (runner.getRunnerError() != null) {
 			// this lets the test fail.
@@ -651,9 +673,10 @@ public abstract class YarnTestBase extends TestLogger {
 		LOG.info("Test was successful");
 	}
 
-	protected static void sendOutput() {
+	protected static void resetStreamsAndSendOutput() {
 		System.setOut(ORIGINAL_STDOUT);
 		System.setErr(ORIGINAL_STDERR);
+		System.setIn(ORIGINAL_STDIN);
 
 		LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
 		LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
@@ -668,6 +691,8 @@ public abstract class YarnTestBase extends TestLogger {
 		private final String configurationDirectory;
 		private final int expectedReturnValue;
 
+		private final PrintStream stdinPrintStream;
+
 		private RunTypes type;
 		private FlinkYarnSessionCli yCli;
 		private Throwable runnerError;
@@ -677,13 +702,15 @@ public abstract class YarnTestBase extends TestLogger {
 				org.apache.flink.configuration.Configuration configuration,
 				String configurationDirectory,
 				RunTypes type,
-				int expectedReturnValue) {
+				int expectedReturnValue,
+				PrintStream stdinPrintStream) {
 
 			this.args = args;
 			this.configuration = Preconditions.checkNotNull(configuration);
 			this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 			this.type = type;
 			this.expectedReturnValue = expectedReturnValue;
+			this.stdinPrintStream = Preconditions.checkNotNull(stdinPrintStream);
 		}
 
 		@Override
@@ -697,8 +724,8 @@ public abstract class YarnTestBase extends TestLogger {
 							configurationDirectory,
 							"",
 							"",
-							false);
-						returnValue = yCli.run(args, configuration, configurationDirectory);
+							true);
+						returnValue = yCli.run(args);
 						break;
 					case CLI_FRONTEND:
 						try {
@@ -727,9 +754,7 @@ public abstract class YarnTestBase extends TestLogger {
 
 		/** Stops the Yarn session. */
 		public void sendStop() {
-			if (yCli != null) {
-				yCli.stop();
-			}
+			stdinPrintStream.println("stop");
 		}
 
 		public Throwable getRunnerError() {

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0a977df..4affb78 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -330,7 +332,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	// -------------------------------------------------------------
 
 	@Override
-	public YarnClusterClient retrieve(String applicationID) {
+	public ClusterClient retrieve(String applicationID) {
 
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
@@ -393,6 +395,23 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 	}
 
+	@Override
+	public void terminateCluster(String clusterId) throws FlinkException {
+		try {
+			yarnClient.killApplication(ConverterUtils.toApplicationId(clusterId));
+		} catch (IOException | YarnException e) {
+			throw new FlinkException("Could not terminate cluster with id " + clusterId + '.', e);
+		}
+	}
+
+	public void terminateCluster(ApplicationId applicationId) throws FlinkException {
+		try {
+			yarnClient.killApplication(applicationId);
+		} catch (YarnException | IOException e) {
+			throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
+		}
+	}
+
 	/**
 	 * This method will block until the ApplicationMaster/JobManager have been
 	 * deployed on YARN.

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 5fb7f90..991b3b9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -24,7 +24,6 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
@@ -33,8 +32,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -43,17 +40,15 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.Option;
 import scala.concurrent.Await;
@@ -67,8 +62,6 @@ public class YarnClusterClient extends ClusterClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
 
-	private Thread clientShutdownHook = new ClientShutdownHook();
-
 	//---------- Class internal fields -------------------
 
 	private final AbstractYarnClusterDescriptor clusterDescriptor;
@@ -80,8 +73,6 @@ public class YarnClusterClient extends ClusterClient {
 	private final ApplicationId appId;
 	private final String trackingURL;
 
-	private boolean isConnected = true;
-
 	/** Indicator whether this cluster has just been created. */
 	private final boolean newlyCreatedCluster;
 
@@ -120,32 +111,6 @@ public class YarnClusterClient extends ClusterClient {
 			flinkConfig,
 			actorSystemLoader,
 			highAvailabilityServices);
-
-		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
-	}
-
-	/**
-	 * Disconnect from the Yarn cluster.
-	 */
-	public void disconnect() {
-
-		if (hasBeenShutDown.getAndSet(true)) {
-			return;
-		}
-
-		if (!isConnected) {
-			throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
-		}
-
-		LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
-
-		try {
-			Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
-		} catch (IllegalStateException e) {
-			// we are already in the shutdown hook
-		}
-
-		isConnected = false;
 	}
 
 	// -------------------------- Interaction with the cluster ------------------------
@@ -207,7 +172,7 @@ public class YarnClusterClient extends ClusterClient {
 
 	@Override
 	public String getClusterIdentifier() {
-		return "Yarn cluster with application id " + appReport.getApplicationId();
+		return ConverterUtils.toString(appReport.getApplicationId());
 	}
 
 	/**
@@ -215,13 +180,6 @@ public class YarnClusterClient extends ClusterClient {
 	 */
 	@Override
 	public GetClusterStatusResponse getClusterStatus() {
-		if (!isConnected) {
-			throw new IllegalStateException("The cluster is not connected to the cluster.");
-		}
-		if (hasBeenShutdown()) {
-			throw new IllegalStateException("The cluster has already been shutdown.");
-		}
-
 		try {
 			final Future<Object> clusterStatusOption =
 				getJobManagerGateway().ask(
@@ -236,15 +194,7 @@ public class YarnClusterClient extends ClusterClient {
 	@Override
 	public List<String> getNewMessages() {
 
-		if (hasBeenShutdown()) {
-			throw new RuntimeException("The YarnClusterClient has already been stopped");
-		}
-
-		if (!isConnected) {
-			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
-		}
-
-		List<String> ret = new ArrayList<String>();
+		List<String> ret = new ArrayList<>();
 		// get messages from ApplicationClient (locally)
 		while (true) {
 			Object result;
@@ -283,105 +233,6 @@ public class YarnClusterClient extends ClusterClient {
 		return ret;
 	}
 
-	// -------------------------- Shutdown handling ------------------------
-
-	private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
-
-	/**
-	 * Shuts down or disconnects from the YARN cluster.
-	 */
-	@Override
-	public void finalizeCluster() {
-		if (isDetached() || !newlyCreatedCluster) {
-			disconnect();
-		} else {
-			shutdownCluster();
-		}
-	}
-
-	/**
-	 * Shuts down the Yarn application.
-	 */
-	public void shutdownCluster() {
-
-		if (hasBeenShutDown.getAndSet(true)) {
-			return;
-		}
-
-		if (!isConnected) {
-			throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
-		}
-
-		try {
-			Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
-		} catch (IllegalStateException e) {
-			// we are already in the shutdown hook
-		}
-
-		LOG.info("Sending shutdown request to the Application Master");
-		try {
-			Future<Object> response =
-				Patterns.ask(applicationClient.get(),
-					new YarnMessages.LocalStopYarnSession(ApplicationStatus.CANCELED,
-						"Flink YARN Client requested shutdown"),
-					new Timeout(akkaDuration));
-			Await.ready(response, akkaDuration);
-		} catch (Exception e) {
-			LOG.warn("Error while stopping YARN cluster.", e);
-		}
-
-		try {
-			File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
-			if (propertiesFile.isFile()) {
-				if (propertiesFile.delete()) {
-					LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
-				} else {
-					LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
-				}
-			}
-		} catch (Exception e) {
-			LOG.warn("Exception while deleting the JobManager address file", e);
-		}
-
-		try {
-			ApplicationReport appReport = clusterDescriptor.getYarnClient().getApplicationReport(appId);
-
-			LOG.info("Application " + appId + " finished with state " + appReport
-				.getYarnApplicationState() + " and final state " + appReport
-				.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
-			if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
-				== YarnApplicationState.KILLED) {
-				LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
-				LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
-					+ "the full application log using this command:"
-					+ System.lineSeparator()
-					+ "\tyarn logs -applicationId " + appReport.getApplicationId()
-					+ System.lineSeparator()
-					+ "(It sometimes takes a few seconds until the logs are aggregated)");
-			}
-		} catch (Exception e) {
-			LOG.warn("Couldn't get final report", e);
-		}
-	}
-
-	public boolean hasBeenShutdown() {
-		return hasBeenShutDown.get();
-	}
-
-	private class ClientShutdownHook extends Thread {
-		@Override
-		public void run() {
-			LOG.info("Shutting down YarnClusterClient from the client shutdown hook");
-
-			try {
-				shutdown();
-			} catch (Throwable t) {
-				LOG.warn("Could not properly shut down the yarn cluster client.", t);
-			}
-		}
-	}
-
 	@Override
 	public boolean isDetached() {
 		return super.isDetached() || clusterDescriptor.isDetachedMode();

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d797f47..75a270d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -19,9 +19,11 @@
 package org.apache.flink.yarn.cli;
 
 import org.apache.flink.client.cli.AbstractCustomCommandLine;
+import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -36,20 +38,21 @@ import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptorV2;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,8 +76,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
@@ -96,7 +101,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 
 	// YARN-session related constants
 	private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
-	static final String YARN_APPLICATION_ID_KEY = "applicationID";
+	private static final String YARN_APPLICATION_ID_KEY = "applicationID";
 	private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
 	private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
 
@@ -148,8 +153,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 
 	private final String yarnApplicationIdFromYarnProperties;
 
+	private final String yarnPropertiesFileLocation;
+
 	//------------------------------------ Internal fields -------------------------
-	private YarnClusterClient yarnCluster;
 	private boolean detachedMode = false;
 
 	public FlinkYarnSessionCli(
@@ -181,7 +187,12 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
 		container = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
 		slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
-		dynamicproperties = new Option(shortPrefix + "D", true, "Dynamic properties");
+		dynamicproperties = Option.builder(shortPrefix + "D")
+			.argName("property=value")
+			.numberOfArgs(2)
+			.valueSeparator()
+			.desc("use value for given property")
+			.build();
 		detached = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
 		streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
 		name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
@@ -206,7 +217,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		allOptions.addOption(flip6);
 
 		// try loading a potential yarn properties file
-		final File yarnPropertiesLocation = getYarnPropertiesLocation(configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
+		this.yarnPropertiesFileLocation = configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
+		final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);
 
 		yarnPropertiesFile = new Properties();
 
@@ -216,7 +228,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 			try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
 				yarnPropertiesFile.load(is);
 			} catch (IOException ioe) {
-				throw new FlinkException("Could not read the Yarn properties file " + yarnPropertiesLocation + '.');
+				throw new FlinkException("Could not read the Yarn properties file " + yarnPropertiesLocation +
+					". Please delete the file at " + yarnPropertiesLocation.getAbsolutePath() + '.', ioe);
 			}
 
 			yarnApplicationIdFromYarnProperties = yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
@@ -305,10 +318,21 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
 		}
 
-		String[] dynamicProperties = null;
-		if (cmd.hasOption(dynamicproperties.getOpt())) {
-			dynamicProperties = cmd.getOptionValues(dynamicproperties.getOpt());
-		}
+		final Properties properties = cmd.getOptionProperties(dynamicproperties.getOpt());
+
+		String[] dynamicProperties = properties.stringPropertyNames().stream()
+			.flatMap(
+				(String key) -> {
+					final String value = properties.getProperty(key);
+
+					if (value != null) {
+						return Stream.of(key + dynamicproperties.getValueSeparator() + value);
+					} else {
+						return Stream.empty();
+					}
+				})
+			.toArray(String[]::new);
+
 		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
 
 		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
@@ -534,184 +558,168 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		return effectiveConfiguration;
 	}
 
-	public int run(
-			String[] args,
-			Configuration configuration,
-			String configurationDirectory) {
+	public int run(String[] args) throws CliArgsException, FlinkException {
 		//
 		//	Command Line Options
 		//
-		Options options = new Options();
-		addGeneralOptions(options);
-		addRunOptions(options);
+		final CommandLine cmd = parseCommandLineOptions(args, true);
 
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd;
-		try {
-			cmd = parser.parse(options, args);
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			printUsage();
-			return 1;
-		}
+		final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd);
 
-		// Query cluster for metrics
-		if (cmd.hasOption(query.getOpt())) {
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
-				configuration,
-				configurationDirectory,
-				cmd.hasOption(flip6.getOpt()));
-			String description;
-			try {
-				description = yarnDescriptor.getClusterDescription();
-			} catch (Exception e) {
-				System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage());
-				e.printStackTrace(System.err);
-				return 1;
-			}
-			System.out.println(description);
-			return 0;
-		} else if (cmd.hasOption(applicationId.getOpt())) {
+		try {
+			// Query cluster for metrics
+			if (cmd.hasOption(query.getOpt())) {
+				final String description = yarnClusterDescriptor.getClusterDescription();
+				System.out.println(description);
+				return 0;
+			} else {
+				final ClusterClient clusterClient;
+				final ApplicationId yarnApplicationId;
 
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
-				configuration,
-				configurationDirectory,
-				cmd.hasOption(flip6.getOpt()));
+				if (cmd.hasOption(applicationId.getOpt())) {
+					yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
 
-			//configure ZK namespace depending on the value passed
-			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
-									cmd.getOptionValue(zookeeperNamespace.getOpt())
-									: yarnDescriptor.getFlinkConfiguration()
-									.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
-			LOG.info("Going to use the ZK namespace: {}", zkNamespace);
-			yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
+					clusterClient = yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
+				} else {
+					final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
 
-			try {
-				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
-			} catch (Exception e) {
-				throw new RuntimeException("Could not retrieve existing Yarn application", e);
-			}
+					clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
 
-			if (detachedMode) {
-				LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
-					"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-					"yarn application -kill " + applicationId.getOpt());
-				yarnCluster.disconnect();
-			} else {
-				ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-
-				try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
-						yarnDescriptor.getYarnClient(),
-						yarnCluster.getApplicationId(),
-						new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
-					runInteractiveCli(
-						yarnCluster,
-						yarnApplicationStatusMonitor,
-						true);
-				} finally {
-					// shut down the scheduled executor service
-					ExecutorUtils.gracefulShutdown(
-						1000L,
-						TimeUnit.MILLISECONDS,
-						scheduledExecutorService);
-				}
-			}
-		} else {
+					//------------------ ClusterClient deployed, handle connection details
+					yarnApplicationId = ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier());
 
-			try (AbstractYarnClusterDescriptor yarnDescriptor = createClusterDescriptor(cmd)){
-				final ClusterSpecification clusterSpecification;
+					String jobManagerAddress =
+						clusterClient.getJobManagerAddress().getAddress().getHostName() +
+							':' + clusterClient.getJobManagerAddress().getPort();
 
-				try {
-					clusterSpecification = getClusterSpecification(cmd);
-				} catch (FlinkException e) {
-					System.err.println("Error while creating the cluster specification: " + e.getMessage());
-					e.printStackTrace();
-					return 1;
-				}
+					System.out.println("Flink JobManager is now running on " + jobManagerAddress);
+					System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
 
-				try {
-					yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification);
-				} catch (Exception e) {
-					System.err.println("Error while deploying YARN cluster: " + e.getMessage());
-					e.printStackTrace(System.err);
-					return 1;
-				}
-				//------------------ ClusterClient deployed, handle connection details
-				String jobManagerAddress =
-					yarnCluster.getJobManagerAddress().getAddress().getHostName() +
-						":" + yarnCluster.getJobManagerAddress().getPort();
-
-				System.out.println("Flink JobManager is now running on " + jobManagerAddress);
-				System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
-
-				// file that we write into the conf/ dir containing the jobManager address and the dop.
-				File yarnPropertiesFile = getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
-
-				Properties yarnProps = new Properties();
-				yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
-				if (clusterSpecification.getSlotsPerTaskManager() != -1) {
-					String parallelism =
-						Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers());
-					yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
-				}
-				// add dynamic properties
-				if (yarnDescriptor.getDynamicPropertiesEncoded() != null) {
-					yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-						yarnDescriptor.getDynamicPropertiesEncoded());
+					writeYarnPropertiesFile(
+						yarnApplicationId,
+						clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(),
+						yarnClusterDescriptor.getDynamicPropertiesEncoded());
 				}
-				writeYarnProperties(yarnProps, yarnPropertiesFile);
-
-				//------------------ ClusterClient running, let user control it ------------
 
 				if (detachedMode) {
-					// print info and quit:
 					LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill " + yarnCluster.getApplicationId());
-					yarnCluster.waitForClusterToBeReady();
-					yarnCluster.disconnect();
+						"yarn application -kill " + applicationId.getOpt());
 				} else {
+					ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 
-					ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+					final YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+						yarnClusterDescriptor.getYarnClient(),
+						yarnApplicationId,
+						new ScheduledExecutorServiceAdapter(scheduledExecutorService));
 
-					try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
-						yarnDescriptor.getYarnClient(),
-						yarnCluster.getApplicationId(),
-						new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+					try {
 						runInteractiveCli(
-							yarnCluster,
+							clusterClient,
 							yarnApplicationStatusMonitor,
 							acceptInteractiveInput);
 					} finally {
+						try {
+							yarnApplicationStatusMonitor.close();
+						} catch (Exception e) {
+							LOG.info("Could not properly close the Yarn application status monitor.", e);
+						}
+
+						try {
+							clusterClient.shutdown();
+						} catch (Exception e) {
+							LOG.info("Could not properly shutdown cluster client.", e);
+						}
+
+						try {
+							yarnClusterDescriptor.terminateCluster(yarnApplicationId);
+						} catch (FlinkException e) {
+							LOG.info("Could not properly terminate the Flink cluster.", e);
+						}
+
 						// shut down the scheduled executor service
 						ExecutorUtils.gracefulShutdown(
 							1000L,
 							TimeUnit.MILLISECONDS,
 							scheduledExecutorService);
+
+						deleteYarnPropertiesFile();
+
+						try {
+							final ApplicationReport applicationReport = yarnClusterDescriptor
+								.getYarnClient()
+								.getApplicationReport(yarnApplicationId);
+
+							logFinalApplicationReport(applicationReport);
+						} catch (YarnException | IOException e) {
+							LOG.info("Could not log the final application report.", e);
+						}
 					}
 				}
-			} catch (FlinkException e) {
-				System.err.println("Error while deploying a Flink cluster: " + e.getMessage());
-				e.printStackTrace();
-				return 1;
+			}
+		} finally {
+			try {
+				yarnClusterDescriptor.close();
+			} catch (Exception e) {
+				LOG.info("Could not properly close the yarn cluster descriptor.", e);
 			}
 		}
+
 		return 0;
 	}
 
-	/**
-	 * Utility method for tests.
-	 */
-	public void stop() {
-		if (yarnCluster != null) {
-			LOG.info("Command line interface is shutting down the yarnCluster");
+	private void logFinalApplicationReport(ApplicationReport appReport) {
+		LOG.info("Application " + appReport.getApplicationId() + " finished with state " + appReport
+			.getYarnApplicationState() + " and final state " + appReport
+			.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
+		if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+			== YarnApplicationState.KILLED) {
+			LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
+			LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+				+ "the full application log using this command:"
+				+ System.lineSeparator()
+				+ "\tyarn logs -applicationId " + appReport.getApplicationId()
+				+ System.lineSeparator()
+				+ "(It sometimes takes a few seconds until the logs are aggregated)");
+		}
+	}
 
-			try {
-				yarnCluster.shutdown();
-			} catch (Throwable t) {
-				LOG.warn("Could not properly shutdown the yarn cluster.", t);
+	private void deleteYarnPropertiesFile() {
+		// try to clean up the old yarn properties file
+		try {
+			File propertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);
+			if (propertiesFile.isFile()) {
+				if (propertiesFile.delete()) {
+					LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile());
+				} else {
+					LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile());
+				}
 			}
+		} catch (Exception e) {
+			LOG.warn("Exception while deleting the JobManager address file", e);
+		}
+	}
+
+	private void writeYarnPropertiesFile(
+			ApplicationId yarnApplicationId,
+			int parallelism,
+			@Nullable String dynamicProperties) {
+		// file that we write into the conf/ dir containing the jobManager address and the dop.
+		final File yarnPropertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);
+
+		Properties yarnProps = new Properties();
+		yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnApplicationId.toString());
+		if (parallelism > 0) {
+			yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, Integer.toString(parallelism));
+		}
+
+		// add dynamic properties
+		if (dynamicProperties != null) {
+			yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicProperties);
 		}
+
+		writeYarnProperties(yarnProps, yarnPropertiesFile);
 	}
 
 	private void logAndSysout(String message) {
@@ -719,28 +727,64 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		System.out.println(message);
 	}
 
-	public static void main(final String[] args) throws Exception {
+	public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
+		if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
+			Map<String, String> properties = new HashMap<>();
+
+			String[] propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+			for (String propLine : propertyLines) {
+				if (propLine == null) {
+					continue;
+				}
+
+				int firstEquals = propLine.indexOf("=");
+
+				if (firstEquals >= 0) {
+					String key = propLine.substring(0, firstEquals).trim();
+					String value = propLine.substring(firstEquals + 1, propLine.length()).trim();
+
+					if (!key.isEmpty()) {
+						properties.put(key, value);
+					}
+				}
+			}
+			return properties;
+		}
+		else {
+			return Collections.emptyMap();
+		}
+	}
+
+	public static void main(final String[] args) {
 		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
 
 		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
 
-		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
-			flinkConfiguration,
-			configurationDirectory,
-			"",
-			""); // no prefix for the YARN session
+		int retCode;
+
+		try {
+			final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
+				flinkConfiguration,
+				configurationDirectory,
+				"",
+				""); // no prefix for the YARN session
 
-		SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
+			SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
 
-		final int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args, flinkConfiguration, configurationDirectory));
+			retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
+		} catch (CliArgsException e) {
+			retCode = handleCliArgsException(e);
+		} catch (Exception e) {
+			retCode = handleError(e);
+		}
 
 		System.exit(retCode);
 	}
 
 	private static void runInteractiveCli(
-		YarnClusterClient clusterClient,
-		YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
-		boolean readConsoleInput) {
+			ClusterClient clusterClient,
+			YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+			boolean readConsoleInput) {
 		try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
 			boolean continueRepl = true;
 			int numTaskmanagers = 0;
@@ -799,7 +843,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		}
 	}
 
-	private static void printClusterMessages(YarnClusterClient clusterClient) {
+	private static void printClusterMessages(ClusterClient clusterClient) {
 		final List<String> messages = clusterClient.getNewMessages();
 		if (!messages.isEmpty()) {
 			System.err.println("New messages from the YARN cluster: ");
@@ -819,8 +863,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 	 * @throws InterruptedException
 	 */
 	private static boolean repStep(
-		BufferedReader in,
-		boolean readConsoleInput) throws IOException, InterruptedException {
+			BufferedReader in,
+			boolean readConsoleInput) throws IOException, InterruptedException {
 
 		// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
 		long startTime = System.currentTimeMillis();
@@ -859,32 +903,25 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		propertiesFile.setReadable(true, false); // readable for all.
 	}
 
-	public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
-		if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
-			Map<String, String> properties = new HashMap<>();
+	private static int handleCliArgsException(CliArgsException e) {
+		LOG.error("Could not parse the command line arguments.", e);
 
-			String[] propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-			for (String propLine : propertyLines) {
-				if (propLine == null) {
-					continue;
-				}
+		System.out.println(e.getMessage());
+		System.out.println();
+		System.out.println("Use the help option (-h or --help) to get help on the command.");
+		return 1;
+	}
 
-				int firstEquals = propLine.indexOf("=");
+	private static int handleError(Exception e) {
+		LOG.error("Error while running the Flink Yarn session.", e);
 
-				if (firstEquals >= 0) {
-					String key = propLine.substring(0, firstEquals).trim();
-					String value = propLine.substring(firstEquals + 1, propLine.length()).trim();
+		System.err.println();
+		System.err.println("------------------------------------------------------------");
+		System.err.println(" The program finished with the following exception:");
+		System.err.println();
 
-					if (!key.isEmpty()) {
-						properties.put(key, value);
-					}
-				}
-			}
-			return properties;
-		}
-		else {
-			return Collections.emptyMap();
-		}
+		e.printStackTrace();
+		return 1;
 	}
 
 	public static File getYarnPropertiesLocation(@Nullable String yarnPropertiesFileLocation) {
@@ -902,7 +939,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
 	}
 
-	protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
+	private static AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
 		final YarnClient yarnClient = YarnClient.createYarnClient();
 		if (flip6) {
 			return new YarnClusterDescriptorV2(configuration, configurationDirectory, yarnClient);

http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
index 88d7747..f96b581 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
@@ -59,7 +59,7 @@ public class YarnApplicationStatusMonitor implements AutoCloseable {
 
 		applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay(
 			this::updateApplicationStatus,
-			UPDATE_INTERVAL,
+			0L,
 			UPDATE_INTERVAL,
 			TimeUnit.MILLISECONDS);
 
@@ -83,6 +83,7 @@ public class YarnApplicationStatusMonitor implements AutoCloseable {
 				applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
 			} catch (Exception e) {
 				LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
+				applicationStatus = ApplicationStatus.UNKNOWN;
 				return;
 			}