You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/17 08:45:21 UTC

[07/10] flink git commit: [FLINK-3667] refactor client communication classes

[FLINK-3667] refactor client communication classes

- ClusterDescriptor: base interface for cluster deployment descriptors
- ClusterDescriptor: YarnClusterDescriptor

- ClusterClient: base class for ClusterClients, handles lifecycle of cluster
- ClusterClient: shares configuration with the implementations
- ClusterClient: StandaloneClusterClient, YarnClusterClient
- ClusterClient: remove run methods and enable detached mode via flag

- CliFrontend: remove all Yarn specific logic
- CliFrontend: remove all cluster setup logic

- CustomCommandLine: interface for other cluster implementations
- Customcommandline: enables creation of new cluster or resuming from existing

- Yarn: move Yarn classes and functionality to the yarn module (yarn
  properties, yarn interfaces)
- Yarn: improve reliability of cluster startup
- Yarn Tests: only disable parallel execution of ITCases

This closes #1978


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9b52a31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9b52a31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9b52a31

Branch: refs/heads/master
Commit: f9b52a3114a2114e6846091acf3abb294a49615b
Parents: efc344a
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Apr 22 19:52:54 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jun 17 10:37:58 2016 +0200

----------------------------------------------------------------------
 .../api/avro/AvroExternalJarProgramITCase.java  |  15 +-
 .../org/apache/flink/client/CliFrontend.java    | 359 ++-----
 .../flink/client/FlinkYarnSessionCli.java       | 505 ----------
 .../org/apache/flink/client/RemoteExecutor.java |   9 +-
 .../flink/client/cli/CliFrontendParser.java     | 114 ++-
 .../flink/client/cli/CustomCommandLine.java     |  57 ++
 .../client/deployment/ClusterDescriptor.java    |  41 +
 .../org/apache/flink/client/program/Client.java | 624 ------------
 .../flink/client/program/ClusterClient.java     | 695 ++++++++++++++
 .../client/program/ContextEnvironment.java      |  12 +-
 .../program/ContextEnvironmentFactory.java      |  18 +-
 .../client/program/DetachedEnvironment.java     |   6 +-
 .../client/program/StandaloneClusterClient.java |  98 ++
 .../CliFrontendAddressConfigurationTest.java    | 125 +--
 .../client/CliFrontendPackageProgramTest.java   |   5 +-
 .../apache/flink/client/CliFrontendRunTest.java |  26 +-
 .../flink/client/CliFrontendTestUtils.java      |  32 +-
 .../TestingClusterClientWithoutActorSystem.java |  55 ++
 .../client/program/ClientConnectionTest.java    |   2 +-
 .../apache/flink/client/program/ClientTest.java |  33 +-
 .../program/ExecutionPlanCreationTest.java      |   2 +-
 .../org/apache/flink/storm/api/FlinkClient.java |  11 +-
 .../flink/api/common/JobExecutionResult.java    |   3 +
 .../flink/api/common/JobSubmissionResult.java   |  24 +-
 .../main/flink-bin/conf/log4j-cli.properties    |   2 +-
 .../src/main/flink-bin/yarn-bin/yarn-session.sh |   2 +-
 .../operations/DegreesWithExceptionITCase.java  |   2 +-
 .../ReduceOnEdgesWithExceptionITCase.java       |   2 +-
 .../ReduceOnNeighborsWithExceptionITCase.java   |   2 +-
 .../webmonitor/handlers/JarActionHandler.java   |   4 +-
 .../apache/flink/runtime/client/JobClient.java  |  17 +-
 .../clusterframework/ApplicationStatus.java     |   1 +
 .../clusterframework/FlinkResourceManager.java  |   2 +-
 .../messages/GetClusterStatusResponse.java      |   2 +-
 .../runtime/yarn/AbstractFlinkYarnClient.java   | 143 ---
 .../runtime/yarn/AbstractFlinkYarnCluster.java  | 123 ---
 .../org/apache/flink/api/scala/FlinkShell.scala |  82 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |   2 +-
 .../elasticsearch2/ElasticsearchSinkITCase.java |   2 +-
 .../environment/RemoteStreamEnvironment.java    |   9 +-
 .../environment/StreamContextEnvironment.java   |   5 +-
 .../RemoteEnvironmentITCase.java                |   2 +-
 .../flink/test/misc/AutoParallelismITCase.java  |   2 +-
 .../test/recovery/SimpleRecoveryITCase.java     |   2 +-
 flink-yarn-tests/pom.xml                        |  15 +-
 ...CliFrontendYarnAddressConfigurationTest.java | 220 +++++
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  14 +-
 .../flink/yarn/TestingFlinkYarnClient.java      |  71 --
 .../yarn/TestingYarnClusterDescriptor.java      |  71 ++
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   9 +-
 .../YARNSessionCapacitySchedulerITCase.java     |   6 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  20 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   4 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 943 +++++++++++++++++++
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  28 -
 .../apache/flink/yarn/FlinkYarnClientBase.java  | 907 ------------------
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 559 -----------
 .../flink/yarn/YarnApplicationMasterRunner.java |   7 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 577 ++++++++++++
 .../flink/yarn/YarnClusterDescriptor.java       |  28 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 606 ++++++++++++
 .../apache/flink/yarn/ApplicationClient.scala   |   8 +-
 .../org/apache/flink/yarn/YarnMessages.scala    |   7 +-
 63 files changed, 3799 insertions(+), 3580 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index ac10074..29a7e58 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,19 +19,12 @@
 package org.apache.flink.api.avro;
 
 import java.io.File;
-import java.net.InetAddress;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.RemoteExecutor;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.Assert;
@@ -64,10 +57,10 @@ public class AvroExternalJarProgramITCase {
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
 
-			Client client = new Client(config);
+			ClusterClient client = new StandaloneClusterClient(config);
 
 			client.setPrintStatusDuringExecution(false);
-			client.runBlocking(program, 4);
+			client.run(program, 4);
 
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 6d972bc..cf7a8c2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -20,8 +20,6 @@ package org.apache.flink.client;
 
 import akka.actor.ActorSystem;
 
-import org.apache.commons.cli.CommandLine;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -31,18 +29,21 @@ import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.InfoOptions;
 import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.cli.SavepointOptions;
 import org.apache.flink.client.cli.StopOptions;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -53,7 +54,6 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -68,8 +68,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSucc
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
@@ -81,10 +79,8 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.text.SimpleDateFormat;
@@ -93,10 +89,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
@@ -121,20 +115,6 @@ public class CliFrontend {
 	private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
 	private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 
-	// YARN-session related constants
-	public static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
-	public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
-	public static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
-	public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
-
-	public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
-
-	/**
-	 * A special host name used to run a job by deploying Flink into a YARN cluster,
-	 * if this string is specified as the JobManager address
-	 */
-	public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
-
 	// --------------------------------------------------------------------------------------------
 	// --------------------------------------------------------------------------------------------
 
@@ -149,12 +129,9 @@ public class CliFrontend {
 
 	private ActorSystem actorSystem;
 
-	private AbstractFlinkYarnCluster yarnCluster;
-
 	/**
 	 *
-	 * @throws Exception Thrown if the configuration directory was not found, the configuration could not
-	 *                   be loaded, or the YARN properties could not be parsed.
+	 * @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded
 	 */
 	public CliFrontend() throws Exception {
 		this(getConfigurationDirectoryFromEnv());
@@ -171,61 +148,6 @@ public class CliFrontend {
 		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 		this.config = GlobalConfiguration.getConfiguration();
 
-		// load the YARN properties
-		File propertiesFile = new File(getYarnPropertiesLocation(config));
-		if (propertiesFile.exists()) {
-
-			logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
-
-			Properties yarnProperties = new Properties();
-			try {
-				try (InputStream is = new FileInputStream(propertiesFile)) {
-					yarnProperties.load(is);
-				}
-			}
-			catch (IOException e) {
-				throw new Exception("Cannot read the YARN properties file", e);
-			}
-
-			// configure the default parallelism from YARN
-			String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
-			if (propParallelism != null) { // maybe the property is not set
-				try {
-					int parallelism = Integer.parseInt(propParallelism);
-					this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
-
-					logAndSysout("YARN properties set default parallelism to " + parallelism);
-				}
-				catch (NumberFormatException e) {
-					throw new Exception("Error while parsing the YARN properties: " +
-							"Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
-				}
-			}
-
-			// get the JobManager address from the YARN properties
-			String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
-			InetSocketAddress jobManagerAddress;
-			if (address != null) {
-				try {
-					jobManagerAddress = ClientUtils.parseHostPortAddress(address);
-					// store address in config from where it is retrieved by the retrieval service
-					writeJobManagerAddressToConfig(jobManagerAddress);
-				}
-				catch (Exception e) {
-					throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
-				}
-
-				logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
-			}
-
-			// handle the YARN client's dynamic properties
-			String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
-			Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
-			for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
-				this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
-			}
-		}
-
 		try {
 			FileSystem.setDefaultScheme(config);
 		} catch (IOException e) {
@@ -301,61 +223,33 @@ public class CliFrontend {
 			return handleError(t);
 		}
 
-		int exitCode = 1;
+		ClusterClient client = null;
 		try {
-			int userParallelism = options.getParallelism();
-			LOG.debug("User parallelism is set to {}", userParallelism);
 
-			Client client = getClient(options, program.getMainClassName(), userParallelism, options.getDetachedMode());
+			client = getClient(options, program.getMainClassName());
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
+			client.setDetached(options.getDetachedMode());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
 
 			LOG.debug("Savepoint path is set to {}", options.getSavepointPath());
 
-			try {
-				if (client.getMaxSlots() != -1 && userParallelism == -1) {
-					logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
-							"To use another parallelism, set it at the ./bin/flink client.");
-					userParallelism = client.getMaxSlots();
-				}
-
-				// detached mode
-				if (options.getDetachedMode() || (yarnCluster != null && yarnCluster.isDetached())) {
-					exitCode = executeProgramDetached(program, client, userParallelism);
-				}
-				else {
-					exitCode = executeProgramBlocking(program, client, userParallelism);
-				}
-
-				// show YARN cluster status if its not a detached YARN cluster.
-				if (yarnCluster != null && !yarnCluster.isDetached()) {
-					List<String> msgs = yarnCluster.getNewMessages();
-					if (msgs != null && msgs.size() > 1) {
-
-						logAndSysout("The following messages were created by the YARN cluster while running the Job:");
-						for (String msg : msgs) {
-							logAndSysout(msg);
-						}
-					}
-					if (yarnCluster.hasFailed()) {
-						logAndSysout("YARN cluster is in failed state!");
-						logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
-					}
-				}
-
-				return exitCode;
-			}
-			finally {
-				client.shutdown();
+			int userParallelism = options.getParallelism();
+			LOG.debug("User parallelism is set to {}", userParallelism);
+			if (client.getMaxSlots() != -1 && userParallelism == -1) {
+				logAndSysout("Using the parallelism provided by the remote cluster ("
+					+ client.getMaxSlots()+"). "
+					+ "To use another parallelism, set it at the ./bin/flink client.");
+				userParallelism = client.getMaxSlots();
 			}
+
+			return executeProgram(program, client, userParallelism);
 		}
 		catch (Throwable t) {
 			return handleError(t);
 		}
 		finally {
-			if (yarnCluster != null && !yarnCluster.isDetached()) {
-				logAndSysout("Shutting down YARN cluster");
-				yarnCluster.shutdown(exitCode != 0);
+			if (client != null) {
+				client.shutdown();
 			}
 			if (program != null) {
 				program.deleteExtractedLibraries();
@@ -410,7 +304,7 @@ public class CliFrontend {
 			LOG.info("Creating program plan dump");
 
 			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
-			FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);
+			FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism);
 			
 			String jsonPlan = null;
 			if (flinkPlan instanceof OptimizedPlan) {
@@ -830,53 +724,30 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
-		LOG.info("Starting execution of program");
+	protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
+		logAndSysout("Starting execution of program");
 
 		JobSubmissionResult result;
 		try {
-			result = client.runDetached(program, parallelism);
+			result = client.run(program, parallelism);
 		} catch (ProgramInvocationException e) {
 			return handleError(e);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
 
-		if (yarnCluster != null) {
-			yarnCluster.stopAfterJob(result.getJobID());
-			yarnCluster.disconnect();
-		}
-		
-		System.out.println("Job has been submitted with JobID " + result.getJobID());
-
-		return 0;
-	}
-
-	protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
-		LOG.info("Starting execution of program");
-
-		JobSubmissionResult result;
-		try {
-			result = client.runBlocking(program, parallelism);
-		}
-		catch (ProgramInvocationException e) {
-			return handleError(e);
-		}
-		finally {
-			program.deleteExtractedLibraries();
-		}
-
-		LOG.info("Program execution finished");
-
-		if (result instanceof JobExecutionResult) {
-			JobExecutionResult execResult = (JobExecutionResult) result;
+		if(result.isJobExecutionResults()) {
+			logAndSysout("Program execution finished");
+			JobExecutionResult execResult = result.getJobExecutionResult();
 			System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
 			System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
 			Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
 			if (accumulatorsResult.size() > 0) {
-					System.out.println("Accumulator Results: ");
-					System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+				System.out.println("Accumulator Results: ");
+				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
 			}
+		} else {
+			logAndSysout("Job has been submitted with JobID " + result.getJobID());
 		}
 
 		return 0;
@@ -923,16 +794,6 @@ public class CliFrontend {
 	}
 
 	/**
-	 * Writes the given job manager address to the associated configuration object
-	 *
-	 * @param address Address to write to the configuration
-	 */
-	protected void writeJobManagerAddressToConfig(InetSocketAddress address) {
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
-	}
-
-	/**
 	 * Updates the associated configuration with the given command line options
 	 *
 	 * @param options Command line options
@@ -940,7 +801,7 @@ public class CliFrontend {
 	protected void updateConfig(CommandLineOptions options) {
 		if(options.getJobManagerAddress() != null){
 			InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
-			writeJobManagerAddressToConfig(jobManagerAddress);
+			writeJobManagerAddressToConfig(config, jobManagerAddress);
 		}
 	}
 
@@ -980,110 +841,65 @@ public class CliFrontend {
 	}
 
 	/**
-	 * Retrieves a {@link Client} object from the given command line options and other parameters.
+	 * Retrieves a {@link ClusterClient} object from the given command line options and other parameters.
 	 *
 	 * @param options Command line options which contain JobManager address
 	 * @param programName Program name
-	 * @param userParallelism Given user parallelism
 	 * @throws Exception
 	 */
-	protected Client getClient(
+	protected ClusterClient getClient(
 			CommandLineOptions options,
-			String programName,
-			int userParallelism,
-			boolean detachedMode)
+			String programName)
 		throws Exception {
 		InetSocketAddress jobManagerAddress;
-		int maxSlots = -1;
 
-		if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-			logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
+		// try to get the JobManager address via command-line args
+		if (options.getJobManagerAddress() != null) {
 
-			// Default yarn application name to use, if nothing is specified on the command line
-			String applicationName = "Flink Application: " + programName;
+			// Get the custom command-lines (e.g. Yarn/Mesos)
+			CustomCommandLine<?> activeCommandLine =
+				CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
 
-			// user wants to run Flink in YARN cluster.
-			CommandLine commandLine = options.getCommandLine();
-			AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser
-														.getFlinkYarnSessionCli()
-														.withDefaultApplicationName(applicationName)
-														.createFlinkYarnClient(commandLine);
+			if (activeCommandLine != null) {
+				logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console");
 
-			if (flinkYarnClient == null) {
-				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
-			}
+				// Default yarn application name to use, if nothing is specified on the command line
+				String applicationName = "Flink Application: " + programName;
 
-			// in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
-			// from yarn options.
-			if (detachedMode) {
-				flinkYarnClient.setDetachedMode(true);
-			}
+				ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine());
 
-			// the number of slots available from YARN:
-			int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-			if (yarnTmSlots == -1) {
-				yarnTmSlots = 1;
-			}
-			maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
-			if (userParallelism != -1) {
-				int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
-				logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
-						"but the user requested a parallelism of " + userParallelism + " on YARN. " +
-						"Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
-						"will get "+slotsPerTM+" slots.");
-				flinkYarnClient.setTaskManagerSlots(slotsPerTM);
-			}
+				logAndSysout("Cluster started");
+				logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
 
-			try {
-				yarnCluster = flinkYarnClient.deploy();
-				yarnCluster.connectToCluster();
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Error deploying the YARN cluster", e);
+				return client;
+			} else {
+				// job manager address supplied on the command-line
+				LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress());
+				jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
+				writeJobManagerAddressToConfig(config, jobManagerAddress);
+				return new StandaloneClusterClient(config);
 			}
 
-			jobManagerAddress = yarnCluster.getJobManagerAddress();
-			writeJobManagerAddressToConfig(jobManagerAddress);
-			
-			// overwrite the yarn client config (because the client parses the dynamic properties)
-			this.config.addAll(flinkYarnClient.getFlinkConfiguration());
-
-			logAndSysout("YARN cluster started");
-			logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
-			logAndSysout("Waiting until all TaskManagers have connected");
-
-			while(true) {
-				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-				if (status != null) {
-					if (status.numRegisteredTaskManagers() < flinkYarnClient.getTaskManagerCount()) {
-						logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
-							+ flinkYarnClient.getTaskManagerCount() + ")");
-					} else {
-						logAndSysout("All TaskManagers are connected");
-						break;
-					}
-				} else {
-					logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
-				}
-
-				try {
-					Thread.sleep(500);
-				}
-				catch (InterruptedException e) {
-					LOG.error("Interrupted while waiting for TaskManagers");
-					System.err.println("Thread is interrupted");
-					Thread.currentThread().interrupt();
+		// try to get the JobManager address via resuming of a cluster
+		} else {
+			for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) {
+				ClusterClient client = cli.retrieveCluster(config);
+				if (client != null) {
+					LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
+					return client;
 				}
 			}
 		}
-		else {
-			if(options.getJobManagerAddress() != null) {
-				jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
-				writeJobManagerAddressToConfig(jobManagerAddress);
-			}
-		}
 
-		return new Client(config, maxSlots);
+		// read JobManager address from the config
+		if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) {
+			return new StandaloneClusterClient(config);
+		// We tried hard but couldn't find a JobManager address
+		} else {
+			throw new IllegalConfigurationException(
+				"The JobManager address is neither provided at the command-line, " +
+					"nor configured in flink-conf.yaml.");
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1275,33 +1091,16 @@ public class CliFrontend {
 		return location;
 	}
 
-	public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
-		if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
-			Map<String, String> properties = new HashMap<>();
-			
-			String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-			for (String propLine : propertyLines) {
-				if (propLine == null) {
-					continue;
-				}
-				
-				String[] kv = propLine.split("=");
-				if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
-					properties.put(kv[0], kv[1]);
-				}
-			}
-			return properties;
-		}
-		else {
-			return Collections.emptyMap();
-		}
-	}
-
-	public static String getYarnPropertiesLocation(Configuration conf) {
-		String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
-		String currentUser = System.getProperty("user.name");
-		String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
 
-		return propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser;
+	/**
+	 * Writes the given job manager address to the associated configuration object
+	 *
+	 * @param address Address to write to the configuration
+	 * @param config The config to write to
+	 */
+	public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) {
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
deleted file mode 100644
index bb61ffb..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.client;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Class handling the command line interface to the YARN session.
- */
-public class FlinkYarnSessionCli {
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
-
-	//------------------------------------ Constants   -------------------------
-
-	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-	public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
-	public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
-
-	private static final int CLIENT_POLLING_INTERVALL = 3;
-
-
-	//------------------------------------ Command Line argument options -------------------------
-	// the prefix transformation is used by the CliFrontend static constructor.
-	private final Option QUERY;
-	// --- or ---
-	private final Option QUEUE;
-	private final Option SHIP_PATH;
-	private final Option FLINK_JAR;
-	private final Option JM_MEMORY;
-	private final Option TM_MEMORY;
-	private final Option CONTAINER;
-	private final Option SLOTS;
-	private final Option DETACHED;
-	private final Option STREAMING;
-	private final Option NAME;
-
-	/**
-	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
-	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
-	 */
-	private final Option DYNAMIC_PROPERTIES;
-
-	private final boolean acceptInteractiveInput;
-	
-	//------------------------------------ Internal fields -------------------------
-	private AbstractFlinkYarnCluster yarnCluster = null;
-	private boolean detachedMode = false;
-
-	/** Default yarn application name. */
-	private String defaultApplicationName = null;
-
-	public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) {
-		this.acceptInteractiveInput = acceptInteractiveInput;
-		
-		QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
-		QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
-		SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
-		FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
-		JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
-		TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
-		CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
-		SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
-		DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
-		DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
-		STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
-		NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
-	}
-
-	/**
-	 * Creates a new Yarn Client.
-	 * @param cmd the command line to parse options from
-	 * @return an instance of the client or null if there was an error
-	 */
-	public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
-
-		AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-		if (flinkYarnClient == null) {
-			return null;
-		}
-
-		if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
-			LOG.error("Missing required argument " + CONTAINER.getOpt());
-			printUsage();
-			return null;
-		}
-		flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
-
-		// Jar Path
-		Path localJarPath;
-		if (cmd.hasOption(FLINK_JAR.getOpt())) {
-			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
-			if(!userPath.startsWith("file://")) {
-				userPath = "file://" + userPath;
-			}
-			localJarPath = new Path(userPath);
-		} else {
-			LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar");
-			localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
-		}
-
-		flinkYarnClient.setLocalJarPath(localJarPath);
-
-		// Conf Path
-		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
-		GlobalConfiguration.loadConfiguration(confDirPath);
-		Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
-		flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-		File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
-		if (!confFile.exists()) {
-			LOG.error("Unable to locate configuration file in "+confFile);
-			return null;
-		}
-		Path confPath = new Path(confFile.getAbsolutePath());
-
-		flinkYarnClient.setConfigurationFilePath(confPath);
-
-		List<File> shipFiles = new ArrayList<>();
-		// path to directory to ship
-		if (cmd.hasOption(SHIP_PATH.getOpt())) {
-			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
-			File shipDir = new File(shipPath);
-			if (shipDir.isDirectory()) {
-				shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
-					@Override
-					public boolean accept(File dir, String name) {
-						return !(name.equals(".") || name.equals(".."));
-					}
-				})));
-			} else {
-				LOG.warn("Ship directory is not a directory. Ignoring it.");
-			}
-		}
-
-		//check if there is a logback or log4j file
-		if (confDirPath.length() > 0) {
-			File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
-			if (logback.exists()) {
-				shipFiles.add(logback);
-				flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
-			}
-			File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
-			if (log4j.exists()) {
-				shipFiles.add(log4j);
-				if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
-					// this means there is already a logback configuration file --> fail
-					LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and " +
-							"Logback configuration files. Please delete or rename one of them.");
-				} // else
-				flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
-			}
-		}
-
-		flinkYarnClient.setShipFiles(shipFiles);
-
-		// queue
-		if (cmd.hasOption(QUEUE.getOpt())) {
-			flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
-		}
-
-		// JobManager Memory
-		if (cmd.hasOption(JM_MEMORY.getOpt())) {
-			int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
-			flinkYarnClient.setJobManagerMemory(jmMemory);
-		}
-
-		// Task Managers memory
-		if (cmd.hasOption(TM_MEMORY.getOpt())) {
-			int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
-			flinkYarnClient.setTaskManagerMemory(tmMemory);
-		}
-
-		if (cmd.hasOption(SLOTS.getOpt())) {
-			int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
-			flinkYarnClient.setTaskManagerSlots(slots);
-		}
-
-		String[] dynamicProperties = null;
-		if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
-			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
-		}
-		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties,
-				CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
-		flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
-		if (cmd.hasOption(DETACHED.getOpt())) {
-			this.detachedMode = true;
-			flinkYarnClient.setDetachedMode(detachedMode);
-		}
-
-		if(cmd.hasOption(NAME.getOpt())) {
-			flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
-		} else {
-			// set the default application name, if none is specified
-			if(defaultApplicationName != null) {
-				flinkYarnClient.setName(defaultApplicationName);
-			}
-		}
-
-		return flinkYarnClient;
-	}
-
-
-	private void printUsage() {
-		System.out.println("Usage:");
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.setWidth(200);
-		formatter.setLeftPadding(5);
-		formatter.setSyntaxPrefix("   Required");
-		Options req = new Options();
-		req.addOption(CONTAINER);
-		formatter.printHelp(" ", req);
-
-		formatter.setSyntaxPrefix("   Optional");
-		Options opt = new Options();
-		opt.addOption(JM_MEMORY);
-		opt.addOption(TM_MEMORY);
-		opt.addOption(QUERY);
-		opt.addOption(QUEUE);
-		opt.addOption(SLOTS);
-		opt.addOption(DYNAMIC_PROPERTIES);
-		opt.addOption(DETACHED);
-		opt.addOption(STREAMING);
-		opt.addOption(NAME);
-		formatter.printHelp(" ", opt);
-	}
-
-	public static AbstractFlinkYarnClient getFlinkYarnClient() {
-		AbstractFlinkYarnClient yarnClient;
-		try {
-			Class<? extends AbstractFlinkYarnClient> yarnClientClass =
-					Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
-			yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
-		}
-		catch (ClassNotFoundException e) {
-			System.err.println("Unable to locate the Flink YARN Client. " +
-					"Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: " +
-					e.getMessage());
-			e.printStackTrace(System.err);
-			return null; // make it obvious
-		}
-		return yarnClient;
-	}
-
-	private static void writeYarnProperties(Properties properties, File propertiesFile) {
-		try {
-			OutputStream out = new FileOutputStream(propertiesFile);
-			properties.store(out, "Generated YARN properties file");
-			out.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error writing the properties file", e);
-		}
-		propertiesFile.setReadable(true, false); // readable for all.
-	}
-
-	public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster, boolean readConsoleInput) {
-		final String HELP = "Available commands:\n" +
-				"help - show these commands\n" +
-				"stop - stop the YARN session";
-		int numTaskmanagers = 0;
-		try {
-			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-			label:
-			while (true) {
-				// ------------------ check if there are updates by the cluster -----------
-
-				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-				LOG.debug("Received status message: {}", status);
-
-				if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
-					System.err.println("Number of connected TaskManagers changed to " +
-							status.numRegisteredTaskManagers() + ". " +
-						"Slots available: " + status.totalNumberOfSlots());
-					numTaskmanagers = status.numRegisteredTaskManagers();
-				}
-
-				List<String> messages = yarnCluster.getNewMessages();
-				if (messages != null && messages.size() > 0) {
-					System.err.println("New messages from the YARN cluster: ");
-					for (String msg : messages) {
-						System.err.println(msg);
-					}
-				}
-
-				if (yarnCluster.hasFailed()) {
-					System.err.println("The YARN cluster has failed");
-					yarnCluster.shutdown(true);
-				}
-
-				// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
-				long startTime = System.currentTimeMillis();
-				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
-						&& (!readConsoleInput || !in.ready()))
-				{
-					Thread.sleep(200);
-				}
-				//------------- handle interactive command by user. ----------------------
-				
-				if (readConsoleInput && in.ready()) {
-					String command = in.readLine();
-					switch (command) {
-						case "quit":
-						case "stop":
-							break label;
-
-						case "help":
-							System.err.println(HELP);
-							break;
-						default:
-							System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP);
-							break;
-					}
-				}
-				
-				if (yarnCluster.hasBeenStopped()) {
-					LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
-					break;
-				}
-			}
-		} catch(Exception e) {
-			LOG.warn("Exception while running the interactive command line interface", e);
-		}
-	}
-
-	public static void main(String[] args) {
-		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session
-		System.exit(cli.run(args));
-	}
-
-	public void getYARNSessionCLIOptions(Options options) {
-		options.addOption(FLINK_JAR);
-		options.addOption(JM_MEMORY);
-		options.addOption(TM_MEMORY);
-		options.addOption(CONTAINER);
-		options.addOption(QUEUE);
-		options.addOption(QUERY);
-		options.addOption(SHIP_PATH);
-		options.addOption(SLOTS);
-		options.addOption(DYNAMIC_PROPERTIES);
-		options.addOption(DETACHED);
-		options.addOption(STREAMING);
-		options.addOption(NAME);
-	}
-
-	public int run(String[] args) {
-		//
-		//	Command Line Options
-		//
-		Options options = new Options();
-		getYARNSessionCLIOptions(options);
-
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd;
-		try {
-			cmd = parser.parse(options, args);
-		} catch(Exception e) {
-			System.out.println(e.getMessage());
-			printUsage();
-			return 1;
-		}
-		
-		// Query cluster for metrics
-		if (cmd.hasOption(QUERY.getOpt())) {
-			AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-			String description;
-			try {
-				description = flinkYarnClient.getClusterDescription();
-			} catch (Exception e) {
-				System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
-				e.printStackTrace(System.err);
-				return 1;
-			}
-			System.out.println(description);
-			return 0;
-		} else {
-			AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd);
-
-			if (flinkYarnClient == null) {
-				System.err.println("Error while starting the YARN Client. Please check log output!");
-				return 1;
-			}
-
-			try {
-				yarnCluster = flinkYarnClient.deploy();
-				// only connect to cluster if its not a detached session.
-				if(!flinkYarnClient.isDetached()) {
-					yarnCluster.connectToCluster();
-				}
-			} catch (Exception e) {
-				System.err.println("Error while deploying YARN cluster: "+e.getMessage());
-				e.printStackTrace(System.err);
-				return 1;
-			}
-			//------------------ Cluster deployed, handle connection details
-			String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
-			System.out.println("Flink JobManager is now running on " + jobManagerAddress);
-			System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
-
-			// file that we write into the conf/ dir containing the jobManager address and the dop.
-			File yarnPropertiesFile = new File(CliFrontend.getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
-
-			Properties yarnProps = new Properties();
-			yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
-			if (flinkYarnClient.getTaskManagerSlots() != -1) {
-				String parallelism =
-						Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
-				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_PARALLELISM, parallelism);
-			}
-			// add dynamic properties
-			if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
-				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-						flinkYarnClient.getDynamicPropertiesEncoded());
-			}
-			writeYarnProperties(yarnProps, yarnPropertiesFile);
-
-			//------------------ Cluster running, let user control it ------------
-
-			if (detachedMode) {
-				// print info and quit:
-				LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
-						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
-						"Please also note that the temporary files of the YARN session in {} will not be removed.",
-						flinkYarnClient.getSessionFilesDir());
-			} else {
-				runInteractiveCli(yarnCluster, acceptInteractiveInput);
-
-				if (!yarnCluster.hasBeenStopped()) {
-					LOG.info("Command Line Interface requested session shutdown");
-					yarnCluster.shutdown(false);
-				}
-
-				try {
-					yarnPropertiesFile.delete();
-				} catch (Exception e) {
-					LOG.warn("Exception while deleting the JobManager address file", e);
-				}
-			}
-		}
-		return 0;
-	}
-
-	/**
-	 * Sets the default Yarn Application Name.
-	 * @param defaultApplicationName the name of the yarn application to use
-	 * @return FlinkYarnSessionCli instance, for chaining
-     */
-	public FlinkYarnSessionCli withDefaultApplicationName(String defaultApplicationName) {
-		this.defaultApplicationName = defaultApplicationName;
-		return this;
-	}
-
-	/**
-	 * Utility method for tests.
-	 */
-	public void stop() {
-		if (yarnCluster != null) {
-			LOG.info("Command line interface is shutting down the yarnCluster");
-			yarnCluster.shutdown(false);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index ab70453..86b36b3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -27,8 +27,9 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.ConfigConstants;
@@ -57,7 +58,7 @@ public class RemoteExecutor extends PlanExecutor {
 
 	private final Configuration clientConfiguration;
 
-	private Client client;
+	private ClusterClient client;
 
 	private int defaultParallelism = 1;
 
@@ -149,7 +150,7 @@ public class RemoteExecutor extends PlanExecutor {
 	public void start() throws Exception {
 		synchronized (lock) {
 			if (client == null) {
-				client = new Client(clientConfiguration);
+				client = new StandaloneClusterClient(clientConfiguration);
 				client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
 			}
 			else {
@@ -207,7 +208,7 @@ public class RemoteExecutor extends PlanExecutor {
 			}
 
 			try {
-				return client.runBlocking(program, defaultParallelism);
+				return client.run(program, defaultParallelism).getJobExecutionResult();
 			}
 			finally {
 				if (shutDownAtEnd) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index b75952e..f28d1b6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -24,8 +24,16 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  * A simple command line parser (based on Apache Commons CLI) that extracts command
@@ -33,9 +41,17 @@ import org.apache.flink.client.FlinkYarnSessionCli;
  */
 public class CliFrontendParser {
 
+	private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class);
+
+
 	/** command line interface of the YARN session, with a special initialization here
 	 *  to prefix all options with y/yarn. */
-	private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn", true);
+	private static final Map<String, CustomCommandLine> customCommandLine = new HashMap<>(1);
+
+	static {
+		// we could easily add more here in the future
+		loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
+	}
 
 
 	static final Option HELP_OPTION = new Option("h", "help", false,
@@ -43,7 +59,7 @@ public class CliFrontendParser {
 
 	static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
 
-	public static final Option CLASS_OPTION = new Option("c", "class", true,
+	static final Option CLASS_OPTION = new Option("c", "class", true,
 			"Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " +
 			"JAR file does not specify the class in its manifest.");
 
@@ -53,23 +69,23 @@ public class CliFrontendParser {
 					"times for specifying more than one URL. The protocol must be supported by the " +
 					"{@link java.net.URLClassLoader}.");
 
-	static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
+	public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
 			"The parallelism with which to run the program. Optional flag to override the default value " +
 			"specified in the configuration.");
 
 	static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
 			"supress logging output to standard out.");
 
-	static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
+	public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
 			"the job in detached mode");
 
 	static final Option ARGS_OPTION = new Option("a", "arguments", true,
 			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
 
 	static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
-			"Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER +
-			"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
-			"different JobManager than the one specified in the configuration.");
+			"Address of the JobManager (master) to which to connect. " +
+			"Specify " + getCliIdentifierString() +" as the JobManager to deploy a cluster for the job. " +
+			"Use this flag to connect to a different JobManager than the one specified in the configuration.");
 
 	static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
 			"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");
@@ -143,8 +159,10 @@ public class CliFrontendParser {
 		options.addOption(DETACHED_OPTION);
 		options.addOption(SAVEPOINT_PATH_OPTION);
 
-		// also add the YARN options so that the parser can parse them
-		yarnSessionCLi.getYARNSessionCLIOptions(options);
+		for (CustomCommandLine customCLI : customCommandLine.values()) {
+			customCLI.addOptions(options);
+		}
+
 		return options;
 	}
 
@@ -240,10 +258,16 @@ public class CliFrontendParser {
 		System.out.println("\n  Syntax: run [OPTIONS] <jar-file> <arguments>");
 		formatter.setSyntaxPrefix("  \"run\" action options:");
 		formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
-		formatter.setSyntaxPrefix("  Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
-		Options yarnOpts = new Options();
-		yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
-		formatter.printHelp(" ", yarnOpts);
+
+		// prints options from all available command-line classes
+		for (Map.Entry<String, CustomCommandLine> entry: customCommandLine.entrySet()) {
+			formatter.setSyntaxPrefix("  Additional arguments if -m " + entry.getKey() + " is set:");
+			Options customOpts = new Options();
+			entry.getValue().addOptions(customOpts);
+			formatter.printHelp(" ", customOpts);
+			System.out.println();
+		}
+
 		System.out.println();
 	}
 
@@ -376,7 +400,63 @@ public class CliFrontendParser {
 		}
 	}
 
-	public static FlinkYarnSessionCli getFlinkYarnSessionCli() {
-		return yarnSessionCLi;
+	public static Map<String, CustomCommandLine> getAllCustomCommandLine() {
+		if (customCommandLine.isEmpty()) {
+			LOG.warn("No custom command-line classes were loaded.");
+		}
+		return Collections.unmodifiableMap(customCommandLine);
+	}
+
+	private static String getCliIdentifierString() {
+		StringBuilder builder = new StringBuilder();
+		boolean first = true;
+		for (String identifier : customCommandLine.keySet()) {
+			if (!first) {
+				builder.append(", ");
+			}
+			first = false;
+			builder.append("'").append(identifier).append("'");
+		}
+		return builder.toString();
+	}
+
+	/**
+	 * Gets the custom command-line for this identifier.
+	 * @param identifier The unique identifier for this command-line implementation.
+	 * @return CustomCommandLine or null if none was found
+	 */
+	public static CustomCommandLine getActiveCustomCommandLine(String identifier) {
+		return CliFrontendParser.getAllCustomCommandLine().get(identifier);
 	}
+
+	private static void loadCustomCommandLine(String className, Object... params) {
+
+		try {
+			Class<? extends CustomCommandLine> customCliClass =
+				Class.forName(className).asSubclass(CustomCommandLine.class);
+
+			// construct class types from the parameters
+			Class<?>[] types = new Class<?>[params.length];
+			for (int i = 0; i < params.length; i++) {
+				Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
+				types[i] = params[i].getClass();
+			}
+
+			Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
+			final CustomCommandLine cli = constructor.newInstance(params);
+
+			String cliIdentifier = Preconditions.checkNotNull(cli.getIdentifier());
+			CustomCommandLine existing = customCommandLine.put(cliIdentifier, cli);
+
+			if (existing != null) {
+				throw new IllegalStateException("Attempted to register " + cliIdentifier +
+					" but there is already a command-line with this identifier.");
+			}
+		} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
+			| InvocationTargetException e) {
+			LOG.warn("Unable to locate custom CLI class {}. " +
+				"Flink is not compiled with support for this class.", className, e);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
new file mode 100644
index 0000000..cd5e0e6
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * Custom command-line interface to load hooks for the command-line interface.
+ */
+public interface CustomCommandLine<ClusterType extends ClusterClient> {
+
+	/**
+	 * Returns a unique identifier for this custom command-line.
+	 * @return An unique identifier string
+	 */
+	String getIdentifier();
+
+	/**
+	 * Adds custom options to the existing options.
+	 * @param baseOptions The existing options.
+	 */
+	void addOptions(Options baseOptions);
+
+	/**
+	 * Retrieves a client for a running cluster
+	 * @param config The Flink config
+	 * @return Client if a cluster could be retrieve, null otherwise
+	 */
+	ClusterClient retrieveCluster(Configuration config) throws Exception;
+
+	/**
+	 * Creates the client for the cluster
+	 * @param applicationName The application name to use
+	 * @param commandLine The command-line options parsed by the CliFrontend
+	 * @return The client to communicate with the cluster which the CustomCommandLine brought up.
+	 */
+	ClusterType createClient(String applicationName, CommandLine commandLine) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
new file mode 100644
index 0000000..cf0595b
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+
+import org.apache.flink.client.program.ClusterClient;
+
+/**
+ * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
+ */
+public interface ClusterDescriptor<ClientType extends ClusterClient> {
+
+	/**
+	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...)
+	 *
+	 */
+	String getClusterDescription() throws Exception;
+
+	/**
+	 * Triggers deployment of a cluster
+	 * @return Client for the cluster
+	 * @throws Exception
+	 */
+	ClientType deploy() throws Exception;
+}