You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/17 08:45:22 UTC

[08/10] flink git commit: [FLINK-3937] programmatic resuming of clusters

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index a5b8af7..9130fdd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -55,6 +56,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -77,9 +79,6 @@ public class YarnClusterClient extends ClusterClient {
 	// (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
 	private final Path sessionFilesDir;
 
-	/** The leader retrieval service for connecting to the cluster and finding the active leader. */
-	private final LeaderRetrievalService leaderRetrievalService;
-
 	//---------- Class internal fields -------------------
 
 	private final AbstractYarnClusterDescriptor clusterDescriptor;
@@ -92,6 +91,7 @@ public class YarnClusterClient extends ClusterClient {
 
 	private boolean isConnected = false;
 
+	private final boolean perJobCluster;
 
 	/**
 	 * Create a new Flink on YARN cluster.
@@ -101,6 +101,7 @@ public class YarnClusterClient extends ClusterClient {
 	 * @param appReport the YARN application ID
 	 * @param flinkConfig Flink configuration
 	 * @param sessionFilesDir Location of files required for YARN session
+	 * @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown
 	 * @throws IOException
 	 * @throws YarnException
 	 */
@@ -109,7 +110,8 @@ public class YarnClusterClient extends ClusterClient {
 		final YarnClient yarnClient,
 		final ApplicationReport appReport,
 		org.apache.flink.configuration.Configuration flinkConfig,
-		Path sessionFilesDir) throws IOException, YarnException {
+		Path sessionFilesDir,
+		boolean perJobCluster) throws IOException, YarnException {
 
 		super(flinkConfig);
 
@@ -122,18 +124,16 @@ public class YarnClusterClient extends ClusterClient {
 		this.applicationId = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
+		this.perJobCluster = perJobCluster;
 
+		/* The leader retrieval service for connecting to the cluster and finding the active leader. */
+		LeaderRetrievalService leaderRetrievalService;
 		try {
 			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
 		} catch (Exception e) {
 			throw new IOException("Could not create the leader retrieval service.", e);
 		}
 
-
-		if (isConnected) {
-			throw new IllegalStateException("Already connected to the cluster.");
-		}
-
 		// start application client
 		LOG.info("Start application client.");
 
@@ -182,28 +182,31 @@ public class YarnClusterClient extends ClusterClient {
 
 		isConnected = true;
 
-		logAndSysout("Waiting until all TaskManagers have connected");
+		if (perJobCluster) {
 
-		while(true) {
-			GetClusterStatusResponse status = getClusterStatus();
-			if (status != null) {
-				if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
-					logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
-						+ clusterDescriptor.getTaskManagerCount() + ")");
+			logAndSysout("Waiting until all TaskManagers have connected");
+
+			while (true) {
+				GetClusterStatusResponse status = getClusterStatus();
+				if (status != null) {
+					if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
+						logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
+							+ clusterDescriptor.getTaskManagerCount() + ")");
+					} else {
+						logAndSysout("All TaskManagers are connected");
+						break;
+					}
 				} else {
-					logAndSysout("All TaskManagers are connected");
-					break;
+					logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
 				}
-			} else {
-				logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
-			}
 
-			try {
-				Thread.sleep(500);
-			} catch (InterruptedException e) {
-				LOG.error("Interrupted while waiting for TaskManagers");
-				System.err.println("Thread is interrupted");
-				throw new IOException("Interrupted while waiting for TaskManagers", e);
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e) {
+					LOG.error("Interrupted while waiting for TaskManagers");
+					System.err.println("Thread is interrupted");
+					throw new IOException("Interrupted while waiting for TaskManagers", e);
+				}
 			}
 		}
 	}
@@ -214,9 +217,12 @@ public class YarnClusterClient extends ClusterClient {
 		}
 		LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
 
-		if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) {
-			LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally");
+		try {
+			Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+		} catch (IllegalStateException e) {
+			// we are already in the shutdown hook
 		}
+
 		// tell the actor to shut down.
 		applicationClient.tell(PoisonPill.getInstance(), applicationClient);
 
@@ -265,12 +271,30 @@ public class YarnClusterClient extends ClusterClient {
 
 	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		if (isDetached()) {
-			JobSubmissionResult result = super.runDetached(jobGraph, classLoader);
+		if (perJobCluster) {
 			stopAfterJob(jobGraph.getJobID());
-			return result;
+		}
+
+		if (isDetached()) {
+			return super.runDetached(jobGraph, classLoader);
 		} else {
-			return super.run(jobGraph, classLoader);
+			try {
+				return super.run(jobGraph, classLoader);
+			} finally {
+				// show cluster status
+				List<String> msgs = getNewMessages();
+				if (msgs != null && msgs.size() > 1) {
+
+					logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+					for (String msg : msgs) {
+						logAndSysout(msg);
+					}
+				}
+				if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
+					logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
+					logAndSysout("YARN Diagnostics: " + getDiagnostics());
+				}
+			}
 		}
 	}
 
@@ -298,8 +322,9 @@ public class YarnClusterClient extends ClusterClient {
 			throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
 		}
 		if(hasBeenShutdown()) {
-			throw new RuntimeException("The YarnClusterClient has already been stopped");
+			return null;
 		}
+
 		Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
 		Object clusterStatus;
 		try {
@@ -417,32 +442,20 @@ public class YarnClusterClient extends ClusterClient {
 	@Override
 	public void finalizeCluster() {
 
-		if (!isConnected) {
-			throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
-		}
-
-		if (isDetached()) {
-			// only disconnect if we are running detached
+		if (isDetached() || !perJobCluster) {
+			// only disconnect if we are not running a per job cluster
 			disconnect();
-			return;
+		} else {
+			shutdownCluster();
 		}
+	}
 
-		// show cluster status
-
-		List<String> msgs = getNewMessages();
-		if (msgs != null && msgs.size() > 1) {
+	public void shutdownCluster() {
 
-			logAndSysout("The following messages were created by the YARN cluster while running the Job:");
-			for (String msg : msgs) {
-				logAndSysout(msg);
-			}
-		}
-		if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
-			logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
-			logAndSysout("YARN Diagnostics: " + getDiagnostics());
+		if (!isConnected) {
+			throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
 		}
 
-
 		if(hasBeenShutDown.getAndSet(true)) {
 			return;
 		}
@@ -471,13 +484,30 @@ public class YarnClusterClient extends ClusterClient {
 			actorSystem.awaitTermination();
 		}
 
-		LOG.info("Deleting files in " + sessionFilesDir);
 		try {
-			FileSystem shutFS = FileSystem.get(hadoopConfig);
-			shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
-			shutFS.close();
-		}catch(IOException e){
-			LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
+			File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig);
+			if (propertiesFile.isFile()) {
+				if (propertiesFile.delete()) {
+					LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
+				} else {
+					LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
+				}
+			}
+		} catch (Exception e) {
+			LOG.warn("Exception while deleting the JobManager address file", e);
+		}
+
+		if (sessionFilesDir != null) {
+			LOG.info("Deleting files in " + sessionFilesDir);
+			try {
+				FileSystem shutFS = FileSystem.get(hadoopConfig);
+				shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+				shutFS.close();
+			} catch (IOException e) {
+				LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
+			}
+		} else {
+			LOG.warn("Session file directory not set. Not deleting session files");
 		}
 
 		try {
@@ -571,7 +601,6 @@ public class YarnClusterClient extends ClusterClient {
 
 	@Override
 	public boolean isDetached() {
-		// either we have set detached mode using the general '-d' flag or using the Yarn CLI flag 'yd'
 		return super.isDetached() || clusterDescriptor.isDetachedMode();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 43e7c7b..5f745b2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -17,10 +17,12 @@
  */
 package org.apache.flink.yarn;
 
+
 /**
  * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
  */
 public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
 	@Override
 	protected Class<?> getApplicationMasterClass() {
 		return YarnApplicationMasterRunner.class;

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fdcc858..5eca4f1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -28,11 +28,9 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -59,6 +57,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
 /**
  * Class handling the command line interface to the YARN session.
  */
@@ -97,8 +97,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	private final Option CONTAINER;
 	private final Option SLOTS;
 	private final Option DETACHED;
+	@Deprecated
 	private final Option STREAMING;
 	private final Option NAME;
+	
+	private final Options ALL_OPTIONS;
 
 	/**
 	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
@@ -118,7 +121,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) {
 		this.acceptInteractiveInput = acceptInteractiveInput;
-		
+
 		QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
 		APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
 		QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
@@ -132,37 +135,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
 		STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
 		NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
+		
+		ALL_OPTIONS = new Options();
+		ALL_OPTIONS.addOption(FLINK_JAR);
+		ALL_OPTIONS.addOption(JM_MEMORY);
+		ALL_OPTIONS.addOption(TM_MEMORY);
+		ALL_OPTIONS.addOption(CONTAINER);
+		ALL_OPTIONS.addOption(QUEUE);
+		ALL_OPTIONS.addOption(QUERY);
+		ALL_OPTIONS.addOption(SHIP_PATH);
+		ALL_OPTIONS.addOption(SLOTS);
+		ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
+		ALL_OPTIONS.addOption(DETACHED);
+		ALL_OPTIONS.addOption(STREAMING);
+		ALL_OPTIONS.addOption(NAME);
+		ALL_OPTIONS.addOption(APPLICATION_ID);
 	}
 
-	/**
-	 * Attaches a new Yarn Client to running YARN application.
-	 *
-	 */
-	public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) {
-		AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-		if (flinkYarnClient == null) {
-			return null;
-		}
 
-		if (!cmd.hasOption(APPLICATION_ID.getOpt())) {
-			LOG.error("Missing required argument " + APPLICATION_ID.getOpt());
-			printUsage();
-			return null;
-		}
-
-		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
-		GlobalConfiguration.loadConfiguration(confDirPath);
-		Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
-		flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-
-		try {
-			return flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt()));
-		} catch (Exception e) {
-			LOG.error("Could not attach to YARN session", e);
-			return null;
-		}
-	}
 	/**
 	 * Resumes from a Flink Yarn properties file
 	 * @param flinkConfiguration The flink configuration
@@ -170,7 +160,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	 */
 	private boolean resumeFromYarnProperties(Configuration flinkConfiguration) {
 		// load the YARN properties
-		File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration));
+		File propertiesFile = getYarnPropertiesLocation(flinkConfiguration);
 		if (!propertiesFile.exists()) {
 			return false;
 		}
@@ -209,7 +199,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			try {
 				jobManagerAddress = ClientUtils.parseHostPortAddress(address);
 				// store address in config from where it is retrieved by the retrieval service
-				CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress);
+				CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress);
 			}
 			catch (Exception e) {
 				throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e);
@@ -228,10 +218,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		return true;
 	}
 
-	public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
-
+	public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
 
-		YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor();
+		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
 
 		if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
 			LOG.error("Missing required argument {}", CONTAINER.getOpt());
@@ -343,19 +332,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		return yarnClusterDescriptor;
 	}
 
-	@Override
-	public YarnClusterClient createClient(String applicationName, CommandLine cmdLine) throws Exception {
-
-		YarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
-
-		try {
-			return yarnClusterDescriptor.deploy();
-		} catch (Exception e) {
-			throw new RuntimeException("Error deploying the YARN cluster", e);
-		}
-
-	}
-
 	private void printUsage() {
 		System.out.println("Usage:");
 		HelpFormatter formatter = new HelpFormatter();
@@ -367,17 +343,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		formatter.printHelp(" ", req);
 
 		formatter.setSyntaxPrefix("   Optional");
-		Options opt = new Options();
-		opt.addOption(JM_MEMORY);
-		opt.addOption(TM_MEMORY);
-		opt.addOption(QUERY);
-		opt.addOption(QUEUE);
-		opt.addOption(SLOTS);
-		opt.addOption(DYNAMIC_PROPERTIES);
-		opt.addOption(DETACHED);
-		opt.addOption(STREAMING);
-		opt.addOption(NAME);
-		formatter.printHelp(" ", opt);
+		Options options = new Options();
+		addGeneralOptions(options);
+		addRunOptions(options);
+		formatter.printHelp(" ", options);
 	}
 
 	private static void writeYarnProperties(Properties properties, File propertiesFile) {
@@ -439,6 +408,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 					switch (command) {
 						case "quit":
 						case "stop":
+							yarnCluster.shutdownCluster();
 							break label;
 
 						case "help":
@@ -466,38 +436,62 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	@Override
-	public String getIdentifier() {
+	public boolean isActive(CommandLine commandLine, Configuration configuration) {
+		String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
+		boolean yarnJobManager = ID.equals(jobManagerOption);
+		return yarnJobManager || resumeFromYarnProperties(configuration);
+	}
+
+	@Override
+	public String getId() {
 		return ID;
 	}
 
-	public void addOptions(Options options) {
-		options.addOption(FLINK_JAR);
-		options.addOption(JM_MEMORY);
-		options.addOption(TM_MEMORY);
-		options.addOption(CONTAINER);
-		options.addOption(QUEUE);
-		options.addOption(QUERY);
-		options.addOption(SHIP_PATH);
-		options.addOption(SLOTS);
-		options.addOption(DYNAMIC_PROPERTIES);
-		options.addOption(DETACHED);
-		options.addOption(STREAMING);
-		options.addOption(NAME);
+	@Override
+	public void addRunOptions(Options baseOptions) {
+		for (Object option : ALL_OPTIONS.getOptions()) {
+			baseOptions.addOption((Option) option);
+		}
 	}
 
+	@Override
+	public void addGeneralOptions(Options baseOptions) {
+		baseOptions.addOption(APPLICATION_ID);
+	}
 
-	public void getYARNAttachCLIOptions(Options options) {
-		options.addOption(APPLICATION_ID);
+	@Override
+	public YarnClusterClient retrieveCluster(
+			CommandLine cmdLine,
+			Configuration config) throws UnsupportedOperationException {
+
+		// first check for an application id
+		if (cmdLine.hasOption(APPLICATION_ID.getOpt())) {
+			String applicationID = cmdLine.getOptionValue(APPLICATION_ID.getOpt());
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+			yarnDescriptor.setFlinkConfiguration(config);
+			return yarnDescriptor.retrieve(applicationID);
+		// then try to load from yarn properties
+		} else if (resumeFromYarnProperties(config)) {
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+			yarnDescriptor.setFlinkConfiguration(config);
+			return yarnDescriptor.retrieveFromConfig(config);
+		}
+
+		throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
 	}
 
 	@Override
-	public ClusterClient retrieveCluster(Configuration config) throws Exception {
+	public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) {
+
+		AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
+		yarnClusterDescriptor.setFlinkConfiguration(config);
 
-		if(resumeFromYarnProperties(config)) {
-			return new StandaloneClusterClient(config);
+		try {
+			return yarnClusterDescriptor.deploy();
+		} catch (Exception e) {
+			throw new RuntimeException("Error deploying the YARN cluster", e);
 		}
 
-		return null;
 	}
 
 	public int run(String[] args) {
@@ -505,7 +499,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		//	Command Line Options
 		//
 		Options options = new Options();
-		addOptions(options);
+		addGeneralOptions(options);
+		addRunOptions(options);
 
 		CommandLineParser parser = new PosixParser();
 		CommandLine cmd;
@@ -519,10 +514,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 		// Query cluster for metrics
 		if (cmd.hasOption(QUERY.getOpt())) {
-			YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
 			String description;
 			try {
-				description = flinkYarnClient.getClusterDescription();
+				description = yarnDescriptor.getClusterDescription();
 			} catch (Exception e) {
 				System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
 				e.printStackTrace(System.err);
@@ -531,56 +526,61 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			System.out.println(description);
 			return 0;
 		} else if (cmd.hasOption(APPLICATION_ID.getOpt())) {
-			yarnCluster = attachFlinkYarnClient(cmd);
+
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+			try {
+				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt()));
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve existing Yarn application", e);
+			}
 
 			if (detachedMode) {
 				LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
 					"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-					"yarn application -kill "+yarnCluster.getApplicationId());
+					"yarn application -kill "+yarnCluster.getClusterIdentifier());
+				yarnCluster.disconnect();
 			} else {
-				runInteractiveCli(yarnCluster);
-
-				if (!yarnCluster.hasBeenStopped()) {
-					LOG.info("Command Line Interface requested session shutdown");
-					yarnCluster.shutdown(false);
-				}
+				runInteractiveCli(yarnCluster, true);
 			}
 		} else {
 
-			YarnClusterDescriptor flinkYarnClient;
+			AbstractYarnClusterDescriptor yarnDescriptor;
 			try {
-				flinkYarnClient = createDescriptor(null, cmd);
+				yarnDescriptor = createDescriptor(null, cmd);
 			} catch (Exception e) {
 				System.err.println("Error while starting the YARN Client. Please check log output!");
 				return 1;
 			}
 
 			try {
-				yarnCluster = flinkYarnClient.deploy();
+				yarnCluster = yarnDescriptor.deploy();
 			} catch (Exception e) {
 				System.err.println("Error while deploying YARN cluster: "+e.getMessage());
 				e.printStackTrace(System.err);
 				return 1;
 			}
 			//------------------ ClusterClient deployed, handle connection details
-			String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
+			String jobManagerAddress =
+				yarnCluster.getJobManagerAddress().getAddress().getHostAddress() +
+					":" + yarnCluster.getJobManagerAddress().getPort();
+
 			System.out.println("Flink JobManager is now running on " + jobManagerAddress);
 			System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
 
 			// file that we write into the conf/ dir containing the jobManager address and the dop.
-			File yarnPropertiesFile = new File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
+			File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration());
 
 			Properties yarnProps = new Properties();
 			yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
-			if (flinkYarnClient.getTaskManagerSlots() != -1) {
+			if (yarnDescriptor.getTaskManagerSlots() != -1) {
 				String parallelism =
-						Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
+						Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());
 				yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
 			}
 			// add dynamic properties
-			if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
+			if (yarnDescriptor.getDynamicPropertiesEncoded() != null) {
 				yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-						flinkYarnClient.getDynamicPropertiesEncoded());
+						yarnDescriptor.getDynamicPropertiesEncoded());
 			}
 			writeYarnProperties(yarnProps, yarnPropertiesFile);
 
@@ -592,21 +592,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
 						"yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" +
 						"Please also note that the temporary files of the YARN session in {} will not be removed.",
-						flinkYarnClient.getSessionFilesDir());
+						yarnDescriptor.getSessionFilesDir());
 				yarnCluster.disconnect();
 			} else {
 				runInteractiveCli(yarnCluster, acceptInteractiveInput);
-
-				if (!yarnCluster.hasBeenShutdown()) {
-					LOG.info("Command Line Interface requested session shutdown");
-					yarnCluster.shutdown();
-				}
-
-				try {
-					yarnPropertiesFile.delete();
-				} catch (Exception e) {
-					LOG.warn("Exception while deleting the JobManager address file", e);
-				}
 			}
 		}
 		return 0;
@@ -649,11 +638,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		}
 	}
 
-	private static String getYarnPropertiesLocation(Configuration conf) {
+	public static File getYarnPropertiesLocation(Configuration conf) {
 		String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
 		String currentUser = System.getProperty("user.name");
-		String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+		String propertiesFileLocation =
+			conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+
+		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
+	}
 
-		return propertiesFileLocation + File.separator + YARN_PROPERTIES_FILE + currentUser;
+	protected AbstractYarnClusterDescriptor getClusterDescriptor() {
+		return new YarnClusterDescriptor();
 	}
 }