You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/17 08:45:23 UTC

[09/10] flink git commit: [FLINK-3937] programmatic resuming of clusters

[FLINK-3937] programmatic resuming of clusters

- integrates with and extends the refactoring of FLINK-3667
- enables to resume from Yarn properties or Yarn application id
- introduces additional StandaloneClusterDescriptor
- introduces DefaultCLI to get rid of standalone mode switches in CliFrontend
- various fixes and improvements
- remove legacy code from CliFrontend
- change activation code of CustomCommandLine interface
- use checked exceptions to signal supported operations
- remove all checked exceptions of type Exception
- fix logging and reduce verbosity of per-job clusters
- print 'id' argument in YarnSessionCli
- minor renaming of methods names
- improve documentation
- deprecate streaming option
- extend CliFrontendYarnAddressConfigurationTest
- move loading of custom CLIs to CliFrontend

This closes #2085


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4ac8522
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4ac8522
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4ac8522

Branch: refs/heads/master
Commit: f4ac852275da36ee33aa54ae9097293ccc981afa
Parents: 875d4d2
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Apr 25 16:28:51 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jun 17 10:37:58 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 217 +++++------
 .../flink/client/cli/CliFrontendParser.java     | 225 ++++++------
 .../flink/client/cli/CustomCommandLine.java     |  34 +-
 .../org/apache/flink/client/cli/DefaultCLI.java |  77 ++++
 .../client/deployment/ClusterDescriptor.java    |  14 +-
 .../deployment/StandaloneClusterDescriptor.java |  56 +++
 .../flink/client/program/ClusterClient.java     |  16 +-
 .../CliFrontendAddressConfigurationTest.java    |  59 +--
 .../org/apache/flink/api/scala/FlinkShell.scala |  24 +-
 ...CliFrontendYarnAddressConfigurationTest.java | 360 +++++++++++++++----
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  10 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 220 ++++++++----
 .../apache/flink/yarn/YarnClusterClient.java    | 151 ++++----
 .../flink/yarn/YarnClusterDescriptor.java       |   2 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 226 ++++++------
 15 files changed, 1062 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3064f8d..a01ab53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.client;
 
-import akka.actor.ActorSystem;
-
+import org.apache.commons.cli.CommandLine;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -30,6 +29,7 @@ import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.cli.InfoOptions;
 import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
@@ -39,7 +39,6 @@ import org.apache.flink.client.cli.StopOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -56,7 +55,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
@@ -67,13 +65,12 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -81,6 +78,8 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.text.SimpleDateFormat;
@@ -89,6 +88,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -102,9 +102,11 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepo
  */
 public class CliFrontend {
 
+	private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
+
 	// actions
-	public static final String ACTION_RUN = "run";
-	public static final String ACTION_INFO = "info";
+	private static final String ACTION_RUN = "run";
+	private static final String ACTION_INFO = "info";
 	private static final String ACTION_LIST = "list";
 	private static final String ACTION_CANCEL = "cancel";
 	private static final String ACTION_STOP = "stop";
@@ -116,19 +118,24 @@ public class CliFrontend {
 	private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 
 	// --------------------------------------------------------------------------------------------
+
+	private static final List<CustomCommandLine> customCommandLine = new LinkedList<>();
+
+	static {
+		/** command line interface of the YARN session, with a special initialization here
+		 *  to prefix all options with y/yarn. */
+		loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
+		customCommandLine.add(new DefaultCLI());
+	}
+
 	// --------------------------------------------------------------------------------------------
 
-	private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
 
 
 	private final Configuration config;
 
 	private final FiniteDuration clientTimeout;
 
-	private final FiniteDuration lookupTimeout;
-
-	private ActorSystem actorSystem;
-
 	/**
 	 *
 	 * @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded
@@ -146,6 +153,8 @@ public class CliFrontend {
 		// load the configuration
 		LOG.info("Trying to load configuration file");
 		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+		System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());
+
 		this.config = GlobalConfiguration.getConfiguration();
 
 		try {
@@ -156,7 +165,6 @@ public class CliFrontend {
 		}
 
 		this.clientTimeout = AkkaUtils.getClientTimeout(config);
-		this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
 	}
 
 
@@ -798,19 +806,20 @@ public class CliFrontend {
 	 *
 	 * @param options Command line options
 	 */
-	protected void updateConfig(CommandLineOptions options) {
-		if(options.getJobManagerAddress() != null){
-			if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-				jobManagerAddress = CliFrontendParser.getFlinkYarnSessionCli()
-					.attachFlinkYarnClient(options.getCommandLine())
-					.getJobManagerAddress();
-			InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
-			writeJobManagerAddressToConfig(config, jobManagerAddress);
+	protected ClusterClient retrieveClient(CommandLineOptions options) {
+		CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
+		try {
+			ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
+			LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
+			return client;
+		} catch (Exception e) {
+			LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
+			throw new IllegalConfigurationException("Couldn't retrieve client for cluster", e);
 		}
 	}
 
 	/**
-	 * Retrieves the {@link ActorGateway} for the JobManager. The JobManager address is retrieved
+	 * Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved
 	 * from the provided {@link CommandLineOptions}.
 	 *
 	 * @param options CommandLineOptions specifying the JobManager URL
@@ -818,92 +827,41 @@ public class CliFrontend {
 	 * @throws Exception
 	 */
 	protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
-		// overwrite config values with given command line options
-		updateConfig(options);
-
-		// start an actor system if needed
-		if (this.actorSystem == null) {
-			LOG.info("Starting actor system to communicate with JobManager");
-			try {
-				scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
-				this.actorSystem = AkkaUtils.createActorSystem(
-						config,
-						new Some<scala.Tuple2<String, Object>>(systemEndpoint));
-			}
-			catch (Exception e) {
-				throw new IOException("Could not start actor system to communicate with JobManager", e);
-			}
-
-			LOG.info("Actor system successfully started");
-		}
-
-		LOG.info("Trying to lookup the JobManager gateway");
-		// Retrieve the ActorGateway from the LeaderRetrievalService
-		LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-
-		return LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, lookupTimeout);
+		return retrieveClient(options).getJobManagerGateway();
 	}
 
 	/**
-	 * Retrieves a {@link ClusterClient} object from the given command line options and other parameters.
-	 *
-	 * @param options Command line options which contain JobManager address
+	 * Creates a {@link ClusterClient} object from the given command line options and other parameters.
+	 * @param options Command line options
 	 * @param programName Program name
 	 * @throws Exception
 	 */
 	protected ClusterClient getClient(
 			CommandLineOptions options,
-			String programName)
-		throws Exception {
-		InetSocketAddress jobManagerAddress;
-
-		// try to get the JobManager address via command-line args
-		if (options.getJobManagerAddress() != null) {
+			String programName) throws Exception {
 
-			// Get the custom command-lines (e.g. Yarn/Mesos)
-			CustomCommandLine<?> activeCommandLine =
-				CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
+		// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+		CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
 
-			if (activeCommandLine != null) {
-				logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console");
-
-				// Default yarn application name to use, if nothing is specified on the command line
+		ClusterClient client;
+		try {
+			client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
+			logAndSysout("Cluster retrieved");
+		} catch (UnsupportedOperationException e) {
+			try {
 				String applicationName = "Flink Application: " + programName;
-
-				ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine());
-
+				client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
 				logAndSysout("Cluster started");
-				logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
-
-				return client;
-			} else {
-				// job manager address supplied on the command-line
-				LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress());
-				jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
-				writeJobManagerAddressToConfig(config, jobManagerAddress);
-				return new StandaloneClusterClient(config);
-			}
-
-		// try to get the JobManager address via resuming of a cluster
-		} else {
-			for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) {
-				ClusterClient client = cli.retrieveCluster(config);
-				if (client != null) {
-					LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
-					return client;
-				}
+			} catch (UnsupportedOperationException e2) {
+				throw new IllegalConfigurationException(
+					"The JobManager address is neither provided at the command-line, " +
+						"nor configured in flink-conf.yaml.");
 			}
 		}
 
-		// read JobManager address from the config
-		if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) {
-			return new StandaloneClusterClient(config);
-		// We tried hard but couldn't find a JobManager address
-		} else {
-			throw new IllegalConfigurationException(
-				"The JobManager address is neither provided at the command-line, " +
-					"nor configured in flink-conf.yaml.");
-		}
+		logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
+		logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
+		return client;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -917,7 +875,7 @@ public class CliFrontend {
 	 * @return The return code for the process.
 	 */
 	private int handleArgException(Exception e) {
-		LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));
+		LOG.error("Invalid command line arguments. " + (e.getMessage() == null ? "" : e.getMessage()));
 
 		System.out.println(e.getMessage());
 		System.out.println();
@@ -1039,14 +997,6 @@ public class CliFrontend {
 		}
 	}
 
-	public void shutdown() {
-		ActorSystem sys = this.actorSystem;
-		if (sys != null) {
-			this.actorSystem = null;
-			sys.shutdown();
-		}
-	}
-
 	/**
 	 * Submits the job based on the arguments
 	 */
@@ -1070,7 +1020,8 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	public static String getConfigurationDirectoryFromEnv() {
-		String location = System.getenv(ENV_CONFIG_DIRECTORY);
+		String envLocation = System.getenv(ENV_CONFIG_DIRECTORY);
+		String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY);
 
 		if (location != null) {
 			if (new File(location).exists()) {
@@ -1102,9 +1053,65 @@ public class CliFrontend {
 	 * @param address Address to write to the configuration
 	 * @param config The config to write to
 	 */
-	public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) {
+	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
 		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Custom command-line
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the custom command-line for the arguments.
+	 * @param commandLine The input to the command-line.
+	 * @return custom command-line which is active (may only be one at a time)
+	 */
+	public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+		for (CustomCommandLine cli : customCommandLine) {
+			if (cli.isActive(commandLine, config)) {
+				return cli;
+			}
+		}
+		throw new IllegalStateException("No command-line ran.");
+	}
+
+	/**
+	 * Retrieves the loaded custom command-lines.
+	 * @return An unmodifiyable list of loaded custom command-lines.
+	 */
+	public static List<CustomCommandLine> getCustomCommandLineList() {
+		return Collections.unmodifiableList(customCommandLine);
+	}
+
+	/**
+	 * Loads a class from the classpath that implements the CustomCommandLine interface.
+	 * @param className The fully-qualified class name to load.
+	 * @param params The constructor parameters
+	 */
+	private static void loadCustomCommandLine(String className, Object... params) {
+
+		try {
+			Class<? extends CustomCommandLine> customCliClass =
+				Class.forName(className).asSubclass(CustomCommandLine.class);
+
+			// construct class types from the parameters
+			Class<?>[] types = new Class<?>[params.length];
+			for (int i = 0; i < params.length; i++) {
+				Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
+				types[i] = params[i].getClass();
+			}
+
+			Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
+			final CustomCommandLine cli = constructor.newInstance(params);
+
+			customCommandLine.add(cli);
+
+		} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
+			| InvocationTargetException e) {
+			LOG.warn("Unable to locate custom CLI class {}. " +
+				"Flink is not compiled with support for this class.", className, e);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 9b935e8..c90793d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -24,16 +24,10 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.client.CliFrontend;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 
 /**
  * A simple command line parser (based on Apache Commons CLI) that extracts command
@@ -44,16 +38,6 @@ public class CliFrontendParser {
 	private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class);
 
 
-	/** command line interface of the YARN session, with a special initialization here
-	 *  to prefix all options with y/yarn. */
-	private static final Map<String, CustomCommandLine> customCommandLine = new HashMap<>(1);
-
-	static {
-		// we could easily add more here in the future
-		loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
-	}
-
-
 	static final Option HELP_OPTION = new Option("h", "help", false,
 			"Show the help message for the CLI Frontend or the action.");
 
@@ -82,9 +66,8 @@ public class CliFrontendParser {
 	static final Option ARGS_OPTION = new Option("a", "arguments", true,
 			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
 
-	static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
+	public static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
 			"Address of the JobManager (master) to which to connect. " +
-			"Specify " + getCliIdentifierString() +" as the JobManager to deploy a cluster for the job. " +
 			"Use this flag to connect to a different JobManager than the one specified in the configuration.");
 
 	static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
@@ -146,6 +129,10 @@ public class CliFrontendParser {
 		options.addOption(HELP_OPTION);
 		// backwards compatibility: ignore verbose flag (-v)
 		options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
+		// add general options of all CLIs
+		for (CustomCommandLine customCLI : CliFrontend.getCustomCommandLineList()) {
+			customCLI.addGeneralOptions(options);
+		}
 		return options;
 	}
 
@@ -158,11 +145,6 @@ public class CliFrontendParser {
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
 		options.addOption(SAVEPOINT_PATH_OPTION);
-
-		for (CustomCommandLine customCLI : customCommandLine.values()) {
-			customCLI.addOptions(options);
-		}
-
 		return options;
 	}
 
@@ -177,62 +159,85 @@ public class CliFrontendParser {
 	}
 
 	private static Options getRunOptions(Options options) {
-		Options o = getProgramSpecificOptions(options);
-		return getJobManagerAddressOption(o);
+		options = getProgramSpecificOptions(options);
+		options = getJobManagerAddressOption(options);
+		return addCustomCliOptions(options, true);
 	}
 
-	private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
-		Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
-		return getJobManagerAddressOption(o);
-	}
 
 	private static Options getJobManagerAddressOption(Options options) {
 		options.addOption(ADDRESS_OPTION);
-		yarnSessionCLi.getYARNAttachCLIOptions(options);
-
 		return options;
 	}
 
 	private static Options getInfoOptions(Options options) {
 		options = getProgramSpecificOptions(options);
 		options = getJobManagerAddressOption(options);
-		return options;
+		return addCustomCliOptions(options, false);
+	}
+
+	private static Options getListOptions(Options options) {
+		options.addOption(RUNNING_OPTION);
+		options.addOption(SCHEDULED_OPTION);
+		options = getJobManagerAddressOption(options);
+		return addCustomCliOptions(options, false);
+	}
+
+	private static Options getCancelOptions(Options options) {
+		options = getJobManagerAddressOption(options);
+		return addCustomCliOptions(options, false);
+	}
+
+	private static Options getStopOptions(Options options) {
+		options = getJobManagerAddressOption(options);
+		return addCustomCliOptions(options, false);
+	}
+
+	private static Options getSavepointOptions(Options options) {
+		options = getJobManagerAddressOption(options);
+		options.addOption(SAVEPOINT_DISPOSE_OPTION);
+		return addCustomCliOptions(options, false);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Help
+	// --------------------------------------------------------------------------------------------
+
+	private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
+		Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+		return getJobManagerAddressOption(o);
 	}
 
+
 	private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
 		options.addOption(CLASS_OPTION);
 		options.addOption(PARALLELISM_OPTION);
-		options = getJobManagerAddressOption(options);
 		return options;
 	}
 
-	private static Options getListOptions(Options options) {
+	private static Options getListOptionsWithoutDeprecatedOptions(Options options) {
 		options.addOption(RUNNING_OPTION);
 		options.addOption(SCHEDULED_OPTION);
 		options = getJobManagerAddressOption(options);
 		return options;
 	}
 
-	private static Options getCancelOptions(Options options) {
+	private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {
 		options = getJobManagerAddressOption(options);
 		return options;
 	}
 
-	private static Options getStopOptions(Options options) {
+	private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
 		options = getJobManagerAddressOption(options);
 		return options;
 	}
 
-	private static Options getSavepointOptions(Options options) {
+	private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
 		options = getJobManagerAddressOption(options);
 		options.addOption(SAVEPOINT_DISPOSE_OPTION);
 		return options;
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Help
-	// --------------------------------------------------------------------------------------------
-
 	/**
 	 * Prints the help for the client.
 	 */
@@ -261,14 +266,7 @@ public class CliFrontendParser {
 		formatter.setSyntaxPrefix("  \"run\" action options:");
 		formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
 
-		// prints options from all available command-line classes
-		for (Map.Entry<String, CustomCommandLine> entry: customCommandLine.entrySet()) {
-			formatter.setSyntaxPrefix("  Additional arguments if -m " + entry.getKey() + " is set:");
-			Options customOpts = new Options();
-			entry.getValue().addOptions(customOpts);
-			formatter.printHelp(" ", customOpts);
-			System.out.println();
-		}
+		printCustomCliOptions(formatter, true);
 
 		System.out.println();
 	}
@@ -282,10 +280,9 @@ public class CliFrontendParser {
 		System.out.println("\n  Syntax: info [OPTIONS] <jar-file> <arguments>");
 		formatter.setSyntaxPrefix("  \"info\" action options:");
 		formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
-		formatter.setSyntaxPrefix("  Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
-		Options yarnOpts = new Options();
-		yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
-		formatter.printHelp(" ", yarnOpts);
+
+		printCustomCliOptions(formatter, false);
+
 		System.out.println();
 	}
 
@@ -297,7 +294,10 @@ public class CliFrontendParser {
 		System.out.println("\nAction \"list\" lists running and scheduled programs.");
 		System.out.println("\n  Syntax: list [OPTIONS]");
 		formatter.setSyntaxPrefix("  \"list\" action options:");
-		formatter.printHelp(" ", getListOptions(new Options()));
+		formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options()));
+
+		printCustomCliOptions(formatter, false);
+
 		System.out.println();
 	}
 
@@ -309,7 +309,10 @@ public class CliFrontendParser {
 		System.out.println("\nAction \"stop\" stops a running program (streaming jobs only).");
 		System.out.println("\n  Syntax: stop [OPTIONS] <Job ID>");
 		formatter.setSyntaxPrefix("  \"stop\" action options:");
-		formatter.printHelp(" ", getStopOptions(new Options()));
+		formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options()));
+
+		printCustomCliOptions(formatter, false);
+
 		System.out.println();
 	}
 
@@ -321,11 +324,10 @@ public class CliFrontendParser {
 		System.out.println("\nAction \"cancel\" cancels a running program.");
 		System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");
 		formatter.setSyntaxPrefix("  \"cancel\" action options:");
-		formatter.printHelp(" ", getCancelOptions(new Options()));
-		formatter.setSyntaxPrefix("  Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
-		Options yarnOpts = new Options();
-		yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
-		formatter.printHelp(" ", yarnOpts);
+		formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options()));
+
+		printCustomCliOptions(formatter, false);
+
 		System.out.println();
 	}
 
@@ -337,10 +339,50 @@ public class CliFrontendParser {
 		System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
 		System.out.println("\n  Syntax: savepoint [OPTIONS] <Job ID>");
 		formatter.setSyntaxPrefix("  \"savepoint\" action options:");
-		formatter.printHelp(" ", getSavepointOptions(new Options()));
+		formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options()));
+
+		printCustomCliOptions(formatter, false);
+
 		System.out.println();
 	}
 
+	/**
+	 * Adds custom cli options
+	 * @param options The options to add options to
+	 * @param runOptions Whether to include run options
+	 * @return Options with additions
+	 */
+	private static Options addCustomCliOptions(Options options, boolean runOptions) {
+		for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) {
+			cli.addGeneralOptions(options);
+			if (runOptions) {
+				cli.addRunOptions(options);
+			}
+		}
+		return options;
+	}
+
+	/**
+	 * Prints custom cli options
+	 * @param formatter The formatter to use for printing
+	 * @param runOptions True if the run options should be printed, False to print only general options
+	 */
+	private static void printCustomCliOptions(HelpFormatter formatter, boolean runOptions) {
+		// prints options from all available command-line classes
+		for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) {
+			if (cli.getId() != null) {
+				formatter.setSyntaxPrefix("  Options for " + cli.getId() + " mode:");
+				Options customOpts = new Options();
+				cli.addGeneralOptions(customOpts);
+				if (runOptions) {
+					cli.addRunOptions(customOpts);
+				}
+				formatter.printHelp(" ", customOpts);
+				System.out.println();
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Line Parsing
 	// --------------------------------------------------------------------------------------------
@@ -410,63 +452,4 @@ public class CliFrontendParser {
 		}
 	}
 
-	public static Map<String, CustomCommandLine> getAllCustomCommandLine() {
-		if (customCommandLine.isEmpty()) {
-			LOG.warn("No custom command-line classes were loaded.");
-		}
-		return Collections.unmodifiableMap(customCommandLine);
-	}
-
-	private static String getCliIdentifierString() {
-		StringBuilder builder = new StringBuilder();
-		boolean first = true;
-		for (String identifier : customCommandLine.keySet()) {
-			if (!first) {
-				builder.append(", ");
-			}
-			first = false;
-			builder.append("'").append(identifier).append("'");
-		}
-		return builder.toString();
-	}
-
-	/**
-	 * Gets the custom command-line for this identifier.
-	 * @param identifier The unique identifier for this command-line implementation.
-	 * @return CustomCommandLine or null if none was found
-	 */
-	public static CustomCommandLine getActiveCustomCommandLine(String identifier) {
-		return CliFrontendParser.getAllCustomCommandLine().get(identifier);
-	}
-
-	private static void loadCustomCommandLine(String className, Object... params) {
-
-		try {
-			Class<? extends CustomCommandLine> customCliClass =
-				Class.forName(className).asSubclass(CustomCommandLine.class);
-
-			// construct class types from the parameters
-			Class<?>[] types = new Class<?>[params.length];
-			for (int i = 0; i < params.length; i++) {
-				Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
-				types[i] = params[i].getClass();
-			}
-
-			Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
-			final CustomCommandLine cli = constructor.newInstance(params);
-
-			String cliIdentifier = Preconditions.checkNotNull(cli.getIdentifier());
-			CustomCommandLine existing = customCommandLine.put(cliIdentifier, cli);
-
-			if (existing != null) {
-				throw new IllegalStateException("Attempted to register " + cliIdentifier +
-					" but there is already a command-line with this identifier.");
-			}
-		} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
-			| InvocationTargetException e) {
-			LOG.warn("Unable to locate custom CLI class {}. " +
-				"Flink is not compiled with support for this class.", className, e);
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index cd5e0e6..aecdc7c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -29,29 +29,47 @@ import org.apache.flink.configuration.Configuration;
 public interface CustomCommandLine<ClusterType extends ClusterClient> {
 
 	/**
-	 * Returns a unique identifier for this custom command-line.
-	 * @return An unique identifier string
+	 * Signals whether the custom command-line wants to execute or not
+	 * @param commandLine The command-line options
+	 * @param configuration The Flink configuration
+	 * @return True if the command-line wants to run, False otherwise
 	 */
-	String getIdentifier();
+	boolean isActive(CommandLine commandLine, Configuration configuration);
 
 	/**
-	 * Adds custom options to the existing options.
+	 * Gets the unique identifier of this CustomCommandLine
+	 * @return A unique identifier
+	 */
+	String getId();
+
+	/**
+	 * Adds custom options to the existing run options.
+	 * @param baseOptions The existing options.
+	 */
+	void addRunOptions(Options baseOptions);
+
+	/**
+	 * Adds custom options to the existing general options.
 	 * @param baseOptions The existing options.
 	 */
-	void addOptions(Options baseOptions);
+	void addGeneralOptions(Options baseOptions);
 
 	/**
 	 * Retrieves a client for a running cluster
+	 * @param commandLine The command-line parameters from the CliFrontend
 	 * @param config The Flink config
-	 * @return Client if a cluster could be retrieve, null otherwise
+	 * @return Client if a cluster could be retrieved
+	 * @throws UnsupportedOperationException if the operation is not supported
 	 */
-	ClusterClient retrieveCluster(Configuration config) throws Exception;
+	ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
 
 	/**
 	 * Creates the client for the cluster
 	 * @param applicationName The application name to use
 	 * @param commandLine The command-line options parsed by the CliFrontend
+	 * @param config The Flink config to use
 	 * @return The client to communicate with the cluster which the CustomCommandLine brought up.
+	 * @throws UnsupportedOperationException if the operation is not supported
 	 */
-	ClusterType createClient(String applicationName, CommandLine commandLine) throws Exception;
+	ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
new file mode 100644
index 0000000..8bceed7
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+import java.net.InetSocketAddress;
+
+import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * The default CLI which is used for interaction with standalone clusters.
+ */
+public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
+
+	@Override
+	public boolean isActive(CommandLine commandLine, Configuration configuration) {
+		// always active because we can try to read a JobManager address from the config
+		return true;
+	}
+
+	@Override
+	public String getId() {
+		return null;
+	}
+
+	@Override
+	public void addRunOptions(Options baseOptions) {
+	}
+
+	@Override
+	public void addGeneralOptions(Options baseOptions) {
+	}
+
+	@Override
+	public StandaloneClusterClient retrieveCluster(CommandLine commandLine, Configuration config) {
+
+		if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
+			String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
+			InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort);
+			setJobManagerAddressInConfig(config, jobManagerAddress);
+		}
+
+		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
+		return descriptor.retrieve(null);
+	}
+
+	@Override
+	public StandaloneClusterClient createCluster(
+			String applicationName,
+			CommandLine commandLine,
+			Configuration config) throws UnsupportedOperationException {
+
+		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
+		return descriptor.deploy();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index cf0595b..59cece3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -30,12 +30,20 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {
 	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...)
 	 *
 	 */
-	String getClusterDescription() throws Exception;
+	String getClusterDescription();
+
+	/**
+	 * Retrieves an existing Flink Cluster.
+	 * @param applicationID The unique application identifier of the running cluster
+	 * @return Client for the cluster
+	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
+	 */
+	ClientType retrieve(String applicationID) throws UnsupportedOperationException;
 
 	/**
 	 * Triggers deployment of a cluster
 	 * @return Client for the cluster
-	 * @throws Exception
+	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
 	 */
-	ClientType deploy() throws Exception;
+	ClientType deploy() throws UnsupportedOperationException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
new file mode 100644
index 0000000..57ccc47
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * A deployment descriptor for an existing cluster
+ */
+public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterClient> {
+
+	private final Configuration config;
+
+	public StandaloneClusterDescriptor(Configuration config) {
+		this.config = config;
+	}
+
+	@Override
+	public String getClusterDescription() {
+		String host = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "");
+		int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		return "Standalone cluster at " + host + ":" + port;
+	}
+
+	@Override
+	public StandaloneClusterClient retrieve(String applicationID) {
+		try {
+			return new StandaloneClusterClient(config);
+		} catch (Exception e) {
+			throw new RuntimeException("Couldn't retrieve standalone cluster", e);
+		}
+	}
+
+	@Override
+	public StandaloneClusterClient deploy() {
+		throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index b56428d..def9578 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -76,7 +76,7 @@ import akka.actor.ActorSystem;
  */
 public abstract class ClusterClient {
 
-	private static final Logger LOG = LoggerFactory.getLogger(ClusterClient.class);
+	private final Logger LOG = LoggerFactory.getLogger(getClass());
 
 	/** The optimizer used in the optimization of batch programs */
 	final Optimizer compiler;
@@ -203,9 +203,9 @@ public abstract class ClusterClient {
 	 */
 	public InetSocketAddress getJobManagerAddressFromConfig() {
 		try {
-		String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-		return new InetSocketAddress(hostName, port);
+			String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+			int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+			return new InetSocketAddress(hostName, port);
 		} catch (Exception e) {
 			throw new RuntimeException("Failed to retrieve JobManager address", e);
 		}
@@ -255,11 +255,13 @@ public abstract class ClusterClient {
 	}
 
 	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
+		Logger log = LoggerFactory.getLogger(ClusterClient.class);
+
 		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-			LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
+			log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
 			p.setDefaultParallelism(parallelism);
 		}
-		LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
+		log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
 
 		return compiler.compile(p);
 	}
@@ -603,7 +605,7 @@ public abstract class ClusterClient {
 	 * @return ActorGateway of the current job manager leader
 	 * @throws Exception
 	 */
-	protected ActorGateway getJobManagerGateway() throws Exception {
+	public ActorGateway getJobManagerGateway() throws Exception {
 		LOG.info("Looking up JobManager");
 
 		return LeaderRetrievalUtils.retrieveLeaderGateway(

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index de85ca8..c6b1111 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -23,9 +23,13 @@ import static org.junit.Assert.fail;
 
 import static org.mockito.Mockito.*;
 
+import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
 
+import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -57,14 +61,12 @@ public class CliFrontendAddressConfigurationTest {
 	public void testValidConfig() {
 		try {
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
 
-			CommandLineOptions options = mock(CommandLineOptions.class);
-
-			frontend.updateConfig(options);
-			Configuration config = frontend.getConfiguration();
+			ClusterClient clusterClient = frontend.retrieveClient(options);
 
 			checkJobManagerAddress(
-					config,
+					clusterClient.getFlinkConfiguration(),
 					CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS,
 					CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
 		}
@@ -74,43 +76,12 @@ public class CliFrontendAddressConfigurationTest {
 			}
 	}
 
-	@Test
-	public void testInvalidConfigAndNoOption() {
-		try {
+	@Test(expected = IllegalConfigurationException.class)
+	public void testInvalidConfigAndNoOption() throws Exception {
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-			CommandLineOptions options = mock(CommandLineOptions.class);
-
-			frontend.updateConfig(options);
-			Configuration config = frontend.getConfiguration();
+			RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
 
-			checkJobManagerAddress(config, null, -1);
-
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testInvalidConfigAndOption() {
-		try {
-			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-
-			CommandLineOptions options = mock(CommandLineOptions.class);
-			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
-
-			frontend.updateConfig(options);
-			Configuration config = frontend.getConfiguration();
-
-			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
-
-			checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+			frontend.retrieveClient(options);
 	}
 
 	@Test
@@ -118,12 +89,10 @@ public class CliFrontendAddressConfigurationTest {
 		try {
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 
-			CommandLineOptions options = mock(CommandLineOptions.class);
-			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
-
-			frontend.updateConfig(options);
+			RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
 
-			Configuration config = frontend.getConfiguration();
+			ClusterClient client = frontend.retrieveClient(options);
+			Configuration config = client.getFlinkConfiguration();
 
 			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 1a8870b..f3b3507 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala
 
 import java.io._
 
+import org.apache.commons.cli.CommandLine
 import org.apache.flink.client.cli.CliFrontendParser
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.client.CliFrontend
@@ -245,11 +246,13 @@ object FlinkShell {
     yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString))
     yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString))
 
-    val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
 
     val options = CliFrontendParser.parseRunCommand(args.toArray)
+    val frontend = new CliFrontend()
+    val config = frontend.getConfiguration
+    val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
 
-    val cluster = customCLI.createClient("Flink Scala Shell", options.getCommandLine)
+    val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config)
 
     val address = cluster.getJobManagerAddress.getAddress.getHostAddress
     val port = cluster.getJobManagerAddress.getPort
@@ -259,12 +262,21 @@ object FlinkShell {
 
   def fetchDeployedYarnClusterInfo() = {
 
-    // load configuration
-    val globalConfig = GlobalConfiguration.getConfiguration
 
-    val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
+    val args = ArrayBuffer[String](
+      "-m", "yarn-cluster"
+    )
 
-    val cluster = customCLI.retrieveCluster(globalConfig)
+    val options = CliFrontendParser.parseRunCommand(args.toArray)
+    val frontend = new CliFrontend()
+    val config = frontend.getConfiguration
+    val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
+
+    val cluster = customCLI.retrieveCluster(options.getCommandLine, config)
+
+    if (cluster == null) {
+      throw new RuntimeException("Yarn Cluster could not be retrieved.")
+    }
 
     val jobManager = cluster.getJobManagerAddress
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index c6a1ade..217ad3d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -18,27 +18,45 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.commons.cli.CommandLine;
 import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.junit.*;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.util.LinkedList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests that verify that the CLI client picks up the correct address for the JobManager
@@ -80,8 +98,10 @@ public class CliFrontendYarnAddressConfigurationTest {
 
 	private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
 	private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
+	private static final ApplicationId TEST_YARN_APPLICATION_ID =
+		ApplicationId.newInstance(System.currentTimeMillis(), 42);
 
-	private static final String propertiesFile =
+	private static final String validPropertiesFile =
 		"jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT;
 
 
@@ -101,110 +121,292 @@ public class CliFrontendYarnAddressConfigurationTest {
 	 * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
 	 */
 	@Test
-	public void testYarnConfig() {
-		try {
-			File tmpFolder = temporaryFolder.newFolder();
-			String currentUser = System.getProperty("user.name");
+	public void testResumeFromYarnPropertiesFile() throws Exception {
 
-			// copy .yarn-properties-<username>
-			File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
-			Files.write(testPropertiesFile.toPath(), propertiesFile.getBytes(), StandardOpenOption.CREATE);
+		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
-			// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
-			String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
-			File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
-			Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
+		// start CLI Frontend
+		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
 
-			// start CLI Frontend
-			TestCLI frontend = new TestCLI(tmpFolder.getAbsolutePath());
+		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
 
-			CommandLineOptions options = mock(CommandLineOptions.class);
+		frontend.retrieveClient(options);
+		checkJobManagerAddress(
+			frontend.getConfiguration(),
+			TEST_YARN_JOB_MANAGER_ADDRESS,
+			TEST_YARN_JOB_MANAGER_PORT);
 
-			frontend.getClient(options, "Program name");
+	}
 
-			frontend.updateConfig(options);
-			Configuration config = frontend.getConfiguration();
+	@Test(expected = IllegalConfigurationException.class)
+	public void testResumeFromYarnPropertiesFileWithFinishedApplication() throws Exception {
 
- 			checkJobManagerAddress(
-					config,
-					TEST_YARN_JOB_MANAGER_ADDRESS,
-					TEST_YARN_JOB_MANAGER_PORT);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
+		// start CLI Frontend
+		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+
+		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
+
+		frontend.retrieveClient(options);
+		checkJobManagerAddress(
+			frontend.getConfiguration(),
+			TEST_YARN_JOB_MANAGER_ADDRESS,
+			TEST_YARN_JOB_MANAGER_PORT);
 	}
-	public static class TestCLI extends CliFrontend {
-		TestCLI(String configDir) throws Exception {
-			super(configDir);
-		}
 
-		@Override
-		public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
-			return super.getClient(options, programName);
-		}
+	@Test(expected = IllegalConfigurationException.class)
+	public void testInvalidYarnPropertiesFile() throws Exception {
 
-		@Override
-		public void updateConfig(CommandLineOptions options) {
-			super.updateConfig(options);
-		}
+		File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
+
+		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+
+		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
+
+		frontend.retrieveClient(options);
+		Configuration config = frontend.getConfiguration();
+
+		checkJobManagerAddress(
+			config,
+			TEST_JOB_MANAGER_ADDRESS,
+			TEST_JOB_MANAGER_PORT);
 	}
 
+
 	@Test
-	public void testInvalidYarnConfig() {
-		try {
-			File tmpFolder = temporaryFolder.newFolder();
+	public void testResumeFromYarnID() throws Exception {
+		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
-			// copy invalid .yarn-properties-<username>
-			File testPropertiesFile = new File(tmpFolder, ".yarn-properties");
-			Files.write(testPropertiesFile.toPath(), invalidPropertiesFile.getBytes(), StandardOpenOption.CREATE);
+		// start CLI Frontend
+		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
 
-			// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
-			String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
-			File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
-			Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
+		RunOptions options =
+			CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
 
-			TestCLI cli = new TestCLI(tmpFolder.getAbsolutePath());
+		frontend.retrieveClient(options);
 
-			CommandLineOptions options = mock(CommandLineOptions.class);
+		checkJobManagerAddress(
+			frontend.getConfiguration(),
+			TEST_YARN_JOB_MANAGER_ADDRESS,
+			TEST_YARN_JOB_MANAGER_PORT);
+	}
 
-			cli.updateConfig(options);
+	@Test(expected = IllegalConfigurationException.class)
+	public void testResumeFromInvalidYarnID() throws Exception {
+		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
-			Configuration config = cli.getConfiguration();
+		// start CLI Frontend
+		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
 
-			checkJobManagerAddress(
-				config,
-				TEST_JOB_MANAGER_ADDRESS,
-				TEST_JOB_MANAGER_PORT);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		RunOptions options =
+			CliFrontendParser.parseRunCommand(new String[] {"-yid", ApplicationId.newInstance(0, 666).toString()});
+
+		frontend.retrieveClient(options);
+		checkJobManagerAddress(
+			frontend.getConfiguration(),
+			TEST_YARN_JOB_MANAGER_ADDRESS,
+			TEST_YARN_JOB_MANAGER_PORT);
+	}
+
+	@Test(expected = IllegalConfigurationException.class)
+	public void testResumeFromYarnIDWithFinishedApplication() throws Exception {
+		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
+		// start CLI Frontend
+		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+
+		RunOptions options =
+			CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+
+		frontend.retrieveClient(options);
+
+		checkJobManagerAddress(
+			frontend.getConfiguration(),
+			TEST_YARN_JOB_MANAGER_ADDRESS,
+			TEST_YARN_JOB_MANAGER_PORT);
 	}
 
 
 	@Test
-	public void testManualOptionsOverridesYarn() {
-		try {
-			File emptyFolder = temporaryFolder.newFolder();
-			TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
+	public void testYarnIDOverridesPropertiesFile() throws Exception {
+		File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
+
+		// start CLI Frontend
+		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+
+		RunOptions options =
+			CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+
+		frontend.retrieveClient(options);
+
+		checkJobManagerAddress(
+			frontend.getConfiguration(),
+			TEST_YARN_JOB_MANAGER_ADDRESS,
+			TEST_YARN_JOB_MANAGER_PORT);
+	}
+
+
+	@Test
+	public void testManualOptionsOverridesYarn() throws Exception {
+
+		File emptyFolder = temporaryFolder.newFolder();
+		File testConfFile = new File(emptyFolder.getAbsolutePath(), "flink-conf.yaml");
+		Files.createFile(testConfFile.toPath());
 
-			CommandLineOptions options = mock(CommandLineOptions.class);
-			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
+		TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
 
-			frontend.updateConfig(options);
+		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
 
-			Configuration config = frontend.getConfiguration();
+		frontend.retrieveClient(options);
 
-			InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
+		Configuration config = frontend.getConfiguration();
 
-			checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
+		InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
+
+		checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
+
+	}
+
+
+	///////////
+	// Utils //
+	///////////
+
+	private File writeYarnPropertiesFile(String contents) throws IOException {
+		File tmpFolder = temporaryFolder.newFolder();
+		String currentUser = System.getProperty("user.name");
+
+		// copy .yarn-properties-<username>
+		File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
+		Files.write(testPropertiesFile.toPath(), contents.getBytes(), StandardOpenOption.CREATE);
+
+		// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
+		String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
+		File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
+		Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
+
+		return tmpFolder.getAbsoluteFile();
+	}
+
+	private static class TestCLI extends CliFrontend {
+		TestCLI(String configDir) throws Exception {
+			super(configDir);
+		}
+
+		@Override
+		// make method public
+		public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
+			return super.getClient(options, programName);
+		}
+
+		@Override
+		// make method public
+		public ClusterClient retrieveClient(CommandLineOptions options) {
+			return super.retrieveClient(options);
+		}
+	}
+
+
+	/**
+	 * Injects an extended FlinkYarnSessionCli that deals with mocking Yarn communication
+	 */
+	private static class CustomYarnTestCLI extends TestCLI {
+
+		// the default application status for yarn applications to be retrieved
+		private final FinalApplicationStatus finalApplicationStatus;
+
+		CustomYarnTestCLI(String configDir) throws Exception {
+			this(configDir, FinalApplicationStatus.UNDEFINED);
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+
+		CustomYarnTestCLI(String configDir, FinalApplicationStatus finalApplicationStatus) throws Exception {
+			super(configDir);
+			this.finalApplicationStatus = finalApplicationStatus;
+		}
+
+		@Override
+		public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+			// inject the testing FlinkYarnSessionCli
+			return new TestingYarnSessionCli();
+		}
+
+		/**
+		 * Testing FlinkYarnSessionCli which returns a modified cluster descriptor for testing.
+		 */
+		private class TestingYarnSessionCli extends FlinkYarnSessionCli {
+			TestingYarnSessionCli() {
+				super("y", "yarn");
+			}
+
+			@Override
+			// override cluster descriptor to replace the YarnClient
+			protected AbstractYarnClusterDescriptor getClusterDescriptor() {
+				return new TestingYarnClusterDescriptor();
+			}
+
+			/**
+			 * Replace the YarnClient for this test.
+			 */
+			private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
+
+				@Override
+				protected YarnClient getYarnClient() {
+					return new TestYarnClient();
+				}
+
+				@Override
+				protected YarnClusterClient createYarnClusterClient(
+						AbstractYarnClusterDescriptor descriptor,
+						YarnClient yarnClient,
+						ApplicationReport report,
+						Configuration flinkConfiguration,
+						Path sessionFilesDir,
+						boolean perJobCluster) throws IOException, YarnException {
+
+					return Mockito.mock(YarnClusterClient.class);
+				}
+
+
+				private class TestYarnClient extends YarnClientImpl {
+
+					private final List<ApplicationReport> reports = new LinkedList<>();
+
+					TestYarnClient() {
+						{   // a report that of our Yarn application we want to resume from
+							ApplicationReport report = Mockito.mock(ApplicationReport.class);
+							Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
+							Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
+							Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
+							Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+							this.reports.add(report);
+						}
+						{   // a second report, just for noise
+							ApplicationReport report = Mockito.mock(ApplicationReport.class);
+							Mockito.when(report.getHost()).thenReturn("1.2.3.4");
+							Mockito.when(report.getRpcPort()).thenReturn(-123);
+							Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
+							Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+							this.reports.add(report);
+						}
+					}
+
+					@Override
+					public List<ApplicationReport> getApplications() throws YarnException, IOException {
+						return reports;
+					}
+
+					@Override
+					public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
+						for (ApplicationReport report : reports) {
+							if (report.getApplicationId().equals(appId)) {
+								return report;
+							}
+						}
+						throw new YarnException();
+					}
+				}
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index c842bdc..f71dd63 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -23,7 +23,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 
@@ -37,8 +36,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
-
 public class FlinkYarnSessionCliTest {
 
 	@Rule
@@ -53,9 +50,10 @@ public class FlinkYarnSessionCliTest {
 		fakeConf.createNewFile();
 		map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
 		TestBaseUtils.setEnv(map);
-		Options options = new Options();
 		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
-		cli.addOptions(options);
+		Options options = new Options();
+		cli.addGeneralOptions(options);
+		cli.addRunOptions(options);
 
 		CommandLineParser parser = new PosixParser();
 		CommandLine cmd = null;
@@ -66,7 +64,7 @@ public class FlinkYarnSessionCliTest {
 			Assert.fail("Parsing failed with " + e.getMessage());
 		}
 
-		YarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
+		AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
 
 		Assert.assertNotNull(flinkYarnDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c471fa4..aebb14d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 
@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -73,18 +75,8 @@ import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_
 import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
 
 /**
-* All classes in this package contain code taken from
-* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
-* and
-* https://github.com/hortonworks/simple-yarn-app
-* and
-* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
-*
-* The Flink jar is uploaded to HDFS by this client.
-* The application master and all the TaskManager containers get the jar file downloaded
-* by YARN into their local fs.
-*
-*/
+ * The descriptor with deployment information for spwaning or resuming a {@link YarnClusterClient}.
+ */
 public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
 	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
 
@@ -132,7 +124,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private boolean detached;
 
-	private String customName = null;
+	private String customName;
+
 
 	public AbstractYarnClusterDescriptor() {
 		// for unit tests only
@@ -321,49 +314,112 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 * Gets a Hadoop Yarn client
 	 * @return Returns a YarnClient which has to be shutdown manually
 	 */
-	public static YarnClient getYarnClient(Configuration conf) {
+	protected YarnClient getYarnClient() {
 		YarnClient yarnClient = YarnClient.createYarnClient();
 		yarnClient.init(conf);
 		yarnClient.start();
 		return yarnClient;
 	}
 
-	@Override
-	public YarnClusterClient deploy() throws Exception {
+	/**
+	 * Retrieves the Yarn application and cluster from the config
+	 * @param config The config with entries to retrieve the cluster
+	 * @return YarnClusterClient
+	 * @deprecated This should be removed in the future
+	 */
+	public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config)
+			throws UnsupportedOperationException {
+		String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+		int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
 
-		UserGroupInformation.setConfiguration(conf);
-		UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+		if (jobManagerHost != null && jobManagerPort != -1) {
 
-		if (UserGroupInformation.isSecurityEnabled()) {
-			if (!ugi.hasKerberosCredentials()) {
-				throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
-					"You may use kinit to authenticate and request a TGT from the Kerberos server.");
+			YarnClient yarnClient = getYarnClient();
+			final List<ApplicationReport> applicationReports;
+			try {
+				applicationReports = yarnClient.getApplications();
+			} catch (Exception e) {
+				throw new RuntimeException("Couldn't get Yarn application reports", e);
 			}
-			return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
-				@Override
-				public YarnClusterClient run() throws Exception {
-					return deployInternal();
+			for (ApplicationReport report : applicationReports) {
+				if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) {
+					LOG.info("Found application '{}' " +
+						"with JobManager host name '{}' and port '{}' from Yarn properties file.",
+						report.getApplicationId(), jobManagerHost, jobManagerPort);
+					return retrieve(report.getApplicationId().toString());
 				}
-			});
-		} else {
-			return deployInternal();
+			}
+
 		}
+
+		LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address '{}:{}'",
+			jobManagerHost, jobManagerPort);
+
+		throw new IllegalConfigurationException("Could not resume Yarn cluster from config.");
 	}
 
 	@Override
-	public AbstractFlinkYarnCluster attach(String appId) throws Exception {
-		// check if required Hadoop environment variables are set. If not, warn user
-		if(System.getenv("HADOOP_CONF_DIR") == null &&
-			System.getenv("YARN_CONF_DIR") == null) {
-			LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
-				"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
-				"configuration for accessing YARN.");
+	public YarnClusterClient retrieve(String applicationID) {
+
+		try {
+			// check if required Hadoop environment variables are set. If not, warn user
+			if (System.getenv("HADOOP_CONF_DIR") == null &&
+				System.getenv("YARN_CONF_DIR") == null) {
+				LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
+					"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
+					"configuration for accessing YARN.");
+			}
+
+			final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
+			final YarnClient yarnClient = getYarnClient();
+			final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
+
+			if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
+				// Flink cluster is not running anymore
+				LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}",
+					applicationID, appReport.getFinalApplicationStatus());
+				throw new RuntimeException("The Yarn application " + applicationID + " doesn't run anymore.");
+			}
+
+			LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
+				appReport.getHost(), appReport.getRpcPort(), applicationID);
+
+			flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
+			flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
+
+			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, sessionFilesDir, false);
+		} catch (Exception e) {
+			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
+	}
 
-		final ApplicationId yarnAppId = ConverterUtils.toApplicationId(appId);
+	@Override
+	public YarnClusterClient deploy() {
 
-		return new FlinkYarnCluster(yarnClient, yarnAppId, conf, flinkConfiguration, sessionFilesDir, detached);
+		try {
+
+			UserGroupInformation.setConfiguration(conf);
+			UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+			if (UserGroupInformation.isSecurityEnabled()) {
+				if (!ugi.hasKerberosCredentials()) {
+					throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
+						"You may use kinit to authenticate and request a TGT from the Kerberos server.");
+				}
+				return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
+					@Override
+					public YarnClusterClient run() throws Exception {
+						return deployInternal();
+					}
+				});
+			} else {
+				return deployInternal();
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Couldn't deploy Yarn cluster", e);
+		}
 	}
+
 	/**
 	 * This method will block until the ApplicationMaster/JobManager have been
 	 * deployed on YARN.
@@ -377,7 +433,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
 
 		// Create application via yarnClient
-		final YarnClient yarnClient = getYarnClient(conf);
+		final YarnClient yarnClient = getYarnClient();
 		final YarnClientApplication yarnApplication = yarnClient.createApplication();
 		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
 
@@ -726,7 +782,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 
 		// the Flink cluster is deployed in YARN. Represent cluster
-		return new YarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir);
+		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
 	}
 
 	/**
@@ -780,40 +836,44 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	@Override
-	public String getClusterDescription() throws Exception {
+	public String getClusterDescription() {
 
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		PrintStream ps = new PrintStream(baos);
+		try {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			PrintStream ps = new PrintStream(baos);
 
-		YarnClient yarnClient = getYarnClient(conf);
-		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+			YarnClient yarnClient = getYarnClient();
+			YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
 
-		ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
-		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-		final String format = "|%-16s |%-16s %n";
-		ps.printf("|Property         |Value          %n");
-		ps.println("+---------------------------------------+");
-		int totalMemory = 0;
-		int totalCores = 0;
-		for(NodeReport rep : nodes) {
-			final Resource res = rep.getCapability();
-			totalMemory += res.getMemory();
-			totalCores += res.getVirtualCores();
-			ps.format(format, "NodeID", rep.getNodeId());
-			ps.format(format, "Memory", res.getMemory() + " MB");
-			ps.format(format, "vCores", res.getVirtualCores());
-			ps.format(format, "HealthReport", rep.getHealthReport());
-			ps.format(format, "Containers", rep.getNumContainers());
+			ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
+			List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+			final String format = "|%-16s |%-16s %n";
+			ps.printf("|Property         |Value          %n");
 			ps.println("+---------------------------------------+");
+			int totalMemory = 0;
+			int totalCores = 0;
+			for (NodeReport rep : nodes) {
+				final Resource res = rep.getCapability();
+				totalMemory += res.getMemory();
+				totalCores += res.getVirtualCores();
+				ps.format(format, "NodeID", rep.getNodeId());
+				ps.format(format, "Memory", res.getMemory() + " MB");
+				ps.format(format, "vCores", res.getVirtualCores());
+				ps.format(format, "HealthReport", rep.getHealthReport());
+				ps.format(format, "Containers", rep.getNumContainers());
+				ps.println("+---------------------------------------+");
+			}
+			ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
+			List<QueueInfo> qInfo = yarnClient.getAllQueues();
+			for (QueueInfo q : qInfo) {
+				ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
+					q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
+			}
+			yarnClient.stop();
+			return baos.toString();
+		} catch (Exception e) {
+			throw new RuntimeException("Couldn't get cluster description", e);
 		}
-		ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
-		List<QueueInfo> qInfo = yarnClient.getAllQueues();
-		for(QueueInfo q : qInfo) {
-			ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
-				q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
-		}
-		yarnClient.stop();
-		return baos.toString();
 	}
 
 	public String getSessionFilesDir() {
@@ -918,9 +978,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private static class YarnDeploymentException extends RuntimeException {
 		private static final long serialVersionUID = -812040641215388943L;
 
-		public YarnDeploymentException() {
-		}
-
 		public YarnDeploymentException(String message) {
 			super(message);
 		}
@@ -954,5 +1011,24 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 	}
+
+	/**
+	 * Creates a YarnClusterClient; may be overriden in tests
+	 */
+	protected YarnClusterClient createYarnClusterClient(
+			AbstractYarnClusterDescriptor descriptor,
+			YarnClient yarnClient,
+			ApplicationReport report,
+			org.apache.flink.configuration.Configuration flinkConfiguration,
+			Path sessionFilesDir,
+			boolean perJobCluster) throws IOException, YarnException {
+		return new YarnClusterClient(
+			descriptor,
+			yarnClient,
+			report,
+			flinkConfiguration,
+			sessionFilesDir,
+			perJobCluster);
+	}
 }