You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/10/25 14:47:50 UTC

[flink] branch to-merge created (now c5dc8ea)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git.


      at c5dc8ea  [FLINK-14377] Parse the ProgramOptions to a Configuration.

This branch includes the following new commits:

     new c16b032  [FLINK-14502] Change CustomCommandLine cluster client methods to get a configuration
     new e0a7123  [FLINK-14501] Change the log config file discovery
     new 8820f6d  [FLINK-14501] Add the DeploymentOptions.TARGET
     new 74e35f0  [FLINK-14501] Add the ClusterClientFactory and make it discoverable
     new 7777994  [FLINK-14501] Add Standalone and Yarn ClusterClientFactories
     new c5753bb  [FLINK-14501] Wired ClusterClientFactories to production code
     new d457bec  [hotfix] Do not expose Yarn dynamic options
     new c5dc8ea  [FLINK-14377] Parse the ProgramOptions to a Configuration.

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 01/08: [FLINK-14502] Change CustomCommandLine cluster client methods to get a configuration

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c16b03202d56d9d2de6bc394b6b0171cb3763c56
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Oct 13 21:09:15 2019 +0200

    [FLINK-14502] Change CustomCommandLine cluster client methods to get a configuration
---
 .../generated/deployment_configuration.html        |  16 +
 .../generated/yarn_config_configuration.html       |  45 +++
 docs/ops/config.md                                 |   4 +
 docs/ops/config.zh.md                              |   4 +
 .../client/cli/AbstractCustomCommandLine.java      |   9 +-
 .../org/apache/flink/client/cli/CliFrontend.java   |  19 +-
 .../apache/flink/client/cli/CustomCommandLine.java |  36 ++-
 .../org/apache/flink/client/cli/DefaultCLI.java    |  12 +-
 .../apache/flink/client/cli/DefaultCLITest.java    |  14 +-
 .../client/cli/util/DummyCustomCommandLine.java    |  12 +-
 .../client/program/rest/RestClusterClientTest.java |   6 +-
 .../flink/configuration/DeploymentOptions.java     |  35 +++
 .../org/apache/flink/api/scala/FlinkShell.scala    |  10 +-
 .../client/gateway/local/ExecutionContext.java     |  23 +-
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |  12 +-
 .../java/org/apache/flink/yarn/YARNITCase.java     |   2 -
 .../apache/flink/yarn/YarnConfigurationITCase.java |   2 -
 .../java/org/apache/flink/yarn/YarnTestBase.java   |   1 -
 .../util/NonDeployingYarnClusterDescriptor.java    |   3 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  42 +--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 347 ++++++++++++---------
 .../org/apache/flink/yarn/cli/YarnConfigUtils.java |  87 ++++++
 .../yarn/configuration/YarnConfigOptions.java      |  48 +++
 .../apache/flink/yarn/AbstractYarnClusterTest.java |   1 -
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java |  60 ++--
 .../flink/yarn/YarnClusterDescriptorTest.java      |   2 -
 26 files changed, 572 insertions(+), 280 deletions(-)

diff --git a/docs/_includes/generated/deployment_configuration.html b/docs/_includes/generated/deployment_configuration.html
new file mode 100644
index 0000000..bdac757
--- /dev/null
+++ b/docs/_includes/generated/deployment_configuration.html
@@ -0,0 +1,16 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>execution.attached</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Specifies if the pipeline is submitted in attached or detached mode.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index a28f15b..6cf3945 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -8,6 +8,11 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>$internal.yarn.dynamic-properties</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>**DO NOT USE** Specify YARN dynamic properties.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.application-attempt-failures-validity-interval</h5></td>
             <td style="word-wrap: break-word;">10000</td>
             <td>Time window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See <a href="https://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/">here</a> for more information.</td>
@@ -23,11 +28,36 @@
             <td>With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this optio [...]
         </tr>
         <tr>
+            <td><h5>yarn.application.id</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The YARN application id of the running yarn cluster. This is the YARN cluster where the pipeline is going to be executed.</td>
+        </tr>
+        <tr>
+            <td><h5>yarn.application.name</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>A custom name for your YARN application.</td>
+        </tr>
+        <tr>
+            <td><h5>yarn.application.node-label</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Specify YARN node label for the YARN application.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.application.priority</h5></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>A non-negative integer indicating the priority for submitting a Flink YARN application. It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority. If priority is negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority. Please refer to YARN's official documentation for specific settings required to enable priority scheduling for the targeted YARN version.</td>
         </tr>
         <tr>
+            <td><h5>yarn.application.queue</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The YARN queue on which to put the current pipeline.</td>
+        </tr>
+        <tr>
+            <td><h5>yarn.application.type</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>A custom type for your YARN application..</td>
+        </tr>
+        <tr>
             <td><h5>yarn.appmaster.rpc.address</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The hostname or address where the application master RPC system is listening.</td>
@@ -48,6 +78,11 @@
             <td>The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the <span markdown="span">`org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler`</span>.</td>
         </tr>
         <tr>
+            <td><h5>yarn.flink-dist-jar</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The location of the Flink dist jar.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.heartbeat.container-request-interval</h5></td>
             <td style="word-wrap: break-word;">500</td>
             <td>Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:<ul><li>The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.</li><li>The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.</li></ul>If you observe too many container allocations on the ResourceManager, t [...]
@@ -58,6 +93,11 @@
             <td>Time between heartbeats with the ResourceManager in seconds.</td>
         </tr>
         <tr>
+            <td><h5>yarn.log-config-file</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The location of the log config file, e.g. the path to your log4j.properties for log4j.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.maximum-failed-containers</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Maximum number of containers the system is going to reallocate in case of a failure.</td>
@@ -73,6 +113,11 @@
             <td>When a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users).</td>
         </tr>
         <tr>
+            <td><h5>yarn.ship-directories</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>A semicolon-separated list of directories to be shipped to the YARN cluster.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.tags</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>A comma-separated list of tags to apply to the Flink YARN application.</td>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 756153f..6a8f4ce 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -64,6 +64,10 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
 
 {% include generated/core_configuration.html %}
 
+### Execution
+
+{% include generated/deployment_configuration.html %}
+
 ### JobManager
 
 {% include generated/job_manager_configuration.html %}
diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md
index d965e83..1274f6d 100644
--- a/docs/ops/config.zh.md
+++ b/docs/ops/config.zh.md
@@ -64,6 +64,10 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
 
 {% include generated/core_configuration.html %}
 
+### Execution
+
+{% include generated/deployment_configuration.html %}
+
 ### JobManager
 
 {% include generated/job_manager_configuration.html %}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index 0cae6e7..805d58a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -69,13 +69,8 @@ public abstract class AbstractCustomCommandLine<T> implements CustomCommandLine<
 		baseOptions.addOption(zookeeperNamespaceOption);
 	}
 
-	/**
-	 * Override configuration settings by specified command line options.
-	 *
-	 * @param commandLine containing the overriding values
-	 * @return Effective configuration with the overridden configuration settings
-	 */
-	protected Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
+	@Override
+	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		final Configuration resultingConfiguration = new Configuration(configuration);
 
 		if (commandLine.hasOption(addressOption.getOpt())) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 8589227..c49374a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -195,9 +195,9 @@ public class CliFrontend {
 		}
 
 		final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
-
+		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
 		try {
-			runProgram(customCommandLine, commandLine, runOptions, program);
+			runProgram(customCommandLine, executorConfig, runOptions, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
@@ -205,13 +205,13 @@ public class CliFrontend {
 
 	private <T> void runProgram(
 			CustomCommandLine<T> customCommandLine,
-			CommandLine commandLine,
+			Configuration executorConfig,
 			RunOptions runOptions,
 			PackagedProgram program) throws ProgramInvocationException, FlinkException {
-		final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
+		final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(executorConfig);
 
 		try {
-			final T clusterId = customCommandLine.getClusterId(commandLine);
+			final T clusterId = customCommandLine.getClusterId(executorConfig);
 
 			final ClusterClient<T> client;
 
@@ -221,7 +221,7 @@ public class CliFrontend {
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
-				final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
+				final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(executorConfig);
 				client = clusterDescriptor.deployJobCluster(
 					clusterSpecification,
 					jobGraph,
@@ -242,7 +242,7 @@ public class CliFrontend {
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
+					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(executorConfig);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
@@ -912,9 +912,10 @@ public class CliFrontend {
 	 * @throws FlinkException if something goes wrong
 	 */
 	private <T> void runClusterAction(CustomCommandLine<T> activeCommandLine, CommandLine commandLine, ClusterAction<T> clusterAction) throws FlinkException {
-		final ClusterDescriptor<T> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+		final Configuration executorConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterDescriptor<T> clusterDescriptor = activeCommandLine.createClusterDescriptor(executorConfig);
 
-		final T clusterId = activeCommandLine.getClusterId(commandLine);
+		final T clusterId = activeCommandLine.getClusterId(executorConfig);
 
 		if (clusterId == null) {
 			throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index e939974..9458de2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.cli.CommandLine;
@@ -59,37 +60,40 @@ public interface CustomCommandLine<T> {
 	void addGeneralOptions(Options baseOptions);
 
 	/**
-	 * Create a {@link ClusterDescriptor} from the given configuration, configuration directory
-	 * and the command line.
+	 * Override configuration settings by specified command line options.
 	 *
-	 * @param commandLine containing command line options relevant for the ClusterDescriptor
-	 * @return ClusterDescriptor
-	 * @throws FlinkException if the ClusterDescriptor could not be created
+	 * @param commandLine containing the overriding values
+	 * @return the effective configuration with the overridden configuration settings
 	 */
-	ClusterDescriptor<T> createClusterDescriptor(CommandLine commandLine) throws FlinkException;
+	Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException;
 
 	/**
-	 * Returns the cluster id if a cluster id was specified on the command line, otherwise it
-	 * returns null.
+	 * Create a {@link ClusterDescriptor} from the given configuration.
 	 *
-	 * <p>A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink
-	 * cluster running on Yarn.
+	 * @param configuration containing the configuration options relevant for the {@link ClusterDescriptor}
+	 * @return the corresponding {@link ClusterDescriptor}.
+	 */
+	ClusterDescriptor<T> createClusterDescriptor(Configuration configuration);
+
+	/**
+	 * Returns the cluster id if a cluster id is specified in the provided configuration, otherwise it returns {@code null}.
+	 *
+	 * <p>A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink cluster running on Yarn.
 	 *
-	 * @param commandLine containing command line options relevant for the cluster id retrieval
+	 * @param configuration containing the configuration options relevant for the cluster id retrieval
 	 * @return Cluster id identifying the cluster to deploy jobs to or null
 	 */
 	@Nullable
-	T getClusterId(CommandLine commandLine);
+	T getClusterId(Configuration configuration);
 
 	/**
 	 * Returns the {@link ClusterSpecification} specified by the configuration and the command
 	 * line options. This specification can be used to deploy a new Flink cluster.
 	 *
-	 * @param commandLine containing command line options relevant for the ClusterSpecification
-	 * @return ClusterSpecification for a new Flink cluster
-	 * @throws FlinkException if the ClusterSpecification could not be created
+	 * @param configuration containing the configuration options relevant for the {@link ClusterSpecification}
+	 * @return the corresponding {@link ClusterSpecification} for a new Flink cluster
 	 */
-	ClusterSpecification getClusterSpecification(CommandLine commandLine) throws FlinkException;
+	ClusterSpecification getClusterSpecification(Configuration configuration);
 
 	default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) throws CliArgsException {
 		final Options options = new Options();
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index e9ed9af..f21f755 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -22,7 +22,6 @@ import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -55,21 +54,18 @@ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
 	}
 
 	@Override
-	public StandaloneClusterDescriptor createClusterDescriptor(
-			CommandLine commandLine) throws FlinkException {
-		final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
-
-		return new StandaloneClusterDescriptor(effectiveConfiguration);
+	public StandaloneClusterDescriptor createClusterDescriptor(Configuration configuration) {
+		return new StandaloneClusterDescriptor(configuration);
 	}
 
 	@Override
 	@Nullable
-	public StandaloneClusterId getClusterId(CommandLine commandLine) {
+	public StandaloneClusterId getClusterId(Configuration configuration) {
 		return StandaloneClusterId.getInstance();
 	}
 
 	@Override
-	public ClusterSpecification getClusterSpecification(CommandLine commandLine) {
+	public ClusterSpecification getClusterSpecification(Configuration configuration) {
 		return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index 23fe69c..73f967d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -64,11 +64,10 @@ public class DefaultCLITest extends CliFrontendTestBase {
 		final String[] args = {};
 
 		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+		final Configuration executorConfig = defaultCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor =
-			defaultCLI.createClusterDescriptor(commandLine);
-
-		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = defaultCLI.createClusterDescriptor(executorConfig);
+		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(executorConfig));
 
 		final URL webInterfaceUrl = new URL(clusterClient.getWebInterfaceURL());
 
@@ -97,11 +96,10 @@ public class DefaultCLITest extends CliFrontendTestBase {
 		final String[] args = {"-m", manualHostname + ':' + manualPort};
 
 		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+		final Configuration executorConfig = defaultCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor =
-			defaultCLI.createClusterDescriptor(commandLine);
-
-		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = defaultCLI.createClusterDescriptor(executorConfig);
+		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(executorConfig));
 
 		final URL webInterfaceUrl = new URL(clusterClient.getWebInterfaceURL());
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
index 12bea74..0bd7dc1 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
@@ -60,18 +61,23 @@ public class DummyCustomCommandLine<T> implements CustomCommandLine {
 	}
 
 	@Override
-	public ClusterDescriptor<T> createClusterDescriptor(CommandLine commandLine) {
+	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) {
+		return new Configuration();
+	}
+
+	@Override
+	public ClusterDescriptor<T> createClusterDescriptor(Configuration configuration) {
 		return new DummyClusterDescriptor<>(clusterClient);
 	}
 
 	@Override
 	@Nullable
-	public String getClusterId(CommandLine commandLine) {
+	public String getClusterId(Configuration configuration) {
 		return "dummy";
 	}
 
 	@Override
-	public ClusterSpecification getClusterSpecification(CommandLine commandLine) {
+	public ClusterSpecification getClusterSpecification(Configuration configuration) {
 		return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f234e07..2ffa0b6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -554,10 +554,10 @@ public class RestClusterClientTest extends TestLogger {
 		final String[] args = {"-m", manualHostname + ':' + manualPort};
 
 		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+		final Configuration executorConfig = defaultCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
-
-		final RestClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+		final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(executorConfig);
+		final RestClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(executorConfig));
 
 		URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
 		assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname));
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
new file mode 100644
index 0000000..1addcc4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The {@link ConfigOption configuration options} relevant for all Executors.
+ */
+@PublicEvolving
+public class DeploymentOptions {
+
+	public static final ConfigOption<Boolean> ATTACHED =
+			key("execution.attached")
+					.defaultValue(false)
+					.withDescription("Specifies if the pipeline is submitted in attached or detached mode.");
+}
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index ab388c7..04f68ff 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -258,10 +258,11 @@ object FlinkShell {
     val commandLine = CliFrontendParser.parse(commandLineOptions, args.toArray, true)
 
     val customCLI = frontend.getActiveCustomCommandLine(commandLine)
+    val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-    val clusterDescriptor = customCLI.createClusterDescriptor(commandLine)
+    val clusterDescriptor = customCLI.createClusterDescriptor(executorConfig)
 
-    val clusterSpecification = customCLI.getClusterSpecification(commandLine)
+    val clusterSpecification = customCLI.getClusterSpecification(executorConfig)
 
     val cluster = clusterDescriptor.deploySessionCluster(clusterSpecification)
 
@@ -288,12 +289,13 @@ object FlinkShell {
       configuration,
       CliFrontend.loadCustomCommandLines(configuration, configurationDirectory))
     val customCLI = frontend.getActiveCustomCommandLine(commandLine)
+    val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
     val clusterDescriptor = customCLI
-      .createClusterDescriptor(commandLine)
+      .createClusterDescriptor(executorConfig)
       .asInstanceOf[ClusterDescriptor[Any]]
 
-    val clusterId = customCLI.getClusterId(commandLine)
+    val clusterId = customCLI.getClusterId(executorConfig)
 
     val cluster = clusterDescriptor.retrieve(clusterId)
 
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 73c9cce..ff54d35 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -110,14 +110,14 @@ public class ExecutionContext<T> {
 	private final Map<String, TableSink<?>> tableSinks;
 	private final Map<String, UserDefinedFunction> functions;
 	private final Configuration flinkConfig;
-	private final CommandLine commandLine;
+	private final Configuration executorConfig;
 	private final CustomCommandLine<T> activeCommandLine;
 	private final RunOptions runOptions;
 	private final T clusterId;
 	private final ClusterSpecification clusterSpec;
 
 	public ExecutionContext(Environment defaultEnvironment, SessionContext sessionContext, List<URL> dependencies,
-			Configuration flinkConfig, Options commandLineOptions, List<CustomCommandLine<?>> availableCommandLines) {
+			Configuration flinkConfig, Options commandLineOptions, List<CustomCommandLine<?>> availableCommandLines) throws FlinkException {
 		this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable
 		this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment());
 		this.dependencies = dependencies;
@@ -154,11 +154,12 @@ public class ExecutionContext<T> {
 		});
 
 		// convert deployment options into command line options that describe a cluster
-		commandLine = createCommandLine(mergedEnv.getDeployment(), commandLineOptions);
+		final CommandLine commandLine = createCommandLine(mergedEnv.getDeployment(), commandLineOptions);
 		activeCommandLine = findActiveCommandLine(availableCommandLines, commandLine);
+		executorConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
 		runOptions = createRunOptions(commandLine);
-		clusterId = activeCommandLine.getClusterId(commandLine);
-		clusterSpec = createClusterSpecification(activeCommandLine, commandLine);
+		clusterId = activeCommandLine.getClusterId(executorConfig);
+		clusterSpec = activeCommandLine.getClusterSpecification(executorConfig);
 	}
 
 	public SessionContext getSessionContext() {
@@ -181,8 +182,8 @@ public class ExecutionContext<T> {
 		return clusterId;
 	}
 
-	public ClusterDescriptor<T> createClusterDescriptor() throws Exception {
-		return activeCommandLine.createClusterDescriptor(commandLine);
+	public ClusterDescriptor<T> createClusterDescriptor() {
+		return activeCommandLine.createClusterDescriptor(executorConfig);
 	}
 
 	public EnvironmentInstance createEnvironmentInstance() {
@@ -243,14 +244,6 @@ public class ExecutionContext<T> {
 		}
 	}
 
-	private static ClusterSpecification createClusterSpecification(CustomCommandLine<?> activeCommandLine, CommandLine commandLine) {
-		try {
-			return activeCommandLine.getClusterSpecification(commandLine);
-		} catch (FlinkException e) {
-			throw new SqlExecutionException("Could not create cluster specification for the given deployment.", e);
-		}
-	}
-
 	private Catalog createCatalog(String name, Map<String, String> catalogProperties, ClassLoader classLoader) {
 		final CatalogFactory factory =
 			TableFactoryService.find(CatalogFactory.class, catalogProperties, classLoader);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index 28a8d5a..e16abc0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -23,12 +23,10 @@ import org.apache.flink.client.cli.CliFrontendTestUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.util.FakeClusterClient;
 import org.apache.flink.yarn.util.NonDeployingYarnClusterDescriptor;
 
-import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
@@ -90,7 +88,6 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 
 	private static class TestingFlinkYarnSessionCli extends FlinkYarnSessionCli {
 		private final ClusterClient<ApplicationId> clusterClient;
-		private final String configurationDirectory;
 
 		private TestingFlinkYarnSessionCli(
 				Configuration configuration,
@@ -99,18 +96,15 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 				String longPrefix) throws Exception {
 			super(configuration, configurationDirectory, shortPrefix, longPrefix);
 
-			this.clusterClient = new FakeClusterClient();
-			this.configurationDirectory = configurationDirectory;
+			this.clusterClient = new FakeClusterClient(configuration);
 		}
 
 		@Override
-		public YarnClusterDescriptor createClusterDescriptor(CommandLine commandLine)
-			throws FlinkException {
-			YarnClusterDescriptor parent = super.createClusterDescriptor(commandLine);
+		public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
+			YarnClusterDescriptor parent = super.createClusterDescriptor(configuration);
 			return new NonDeployingYarnClusterDescriptor(
 					parent.getFlinkConfiguration(),
 					(YarnConfiguration) parent.getYarnClient().getConfig(),
-					configurationDirectory,
 					parent.getYarnClient(),
 					clusterClient);
 		}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index e894f65..6cf6fb6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -72,7 +71,6 @@ public class YARNITCase extends YarnTestBase {
 			try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
 				configuration,
 				getYarnConfiguration(),
-				System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
 				yarnClient,
 				true)) {
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 7459bac..2aa476a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -98,7 +97,6 @@ public class YarnConfigurationITCase extends YarnTestBase {
 			final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 				configuration,
 				yarnConfiguration,
-				CliFrontend.getConfigurationDirectoryFromEnv(),
 				yarnClient,
 				true);
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 6481420..80f6efc 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -307,7 +307,6 @@ public abstract class YarnTestBase extends TestLogger {
 		final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
 			flinkConfiguration,
 			YARN_CONFIGURATION,
-			CliFrontend.getConfigurationDirectoryFromEnv(),
 			yarnClient,
 			true);
 		yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.toURI()));
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
index 2671936..48e05f8 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
@@ -40,10 +40,9 @@ public class NonDeployingYarnClusterDescriptor extends YarnClusterDescriptor {
 	public NonDeployingYarnClusterDescriptor(
 			Configuration flinkConfiguration,
 			YarnConfiguration yarnConfiguration,
-			String configurationDirectory,
 			YarnClient yarnClient,
 			ClusterClient<ApplicationId> clusterClient) {
-		super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, true);
+		super(flinkConfiguration, yarnConfiguration, yarnClient, true);
 
 		this.clusterClient = Preconditions.checkNotNull(clusterClient);
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 85d420f..68eea73 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.yarn.cli.YarnConfigUtils;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
@@ -129,8 +130,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 
 	private String yarnQueue;
 
-	private String configurationDirectory;
-
 	private Path flinkJarPath;
 
 	private String dynamicPropertiesEncoded;
@@ -152,7 +151,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 	public YarnClusterDescriptor(
 			Configuration flinkConfiguration,
 			YarnConfiguration yarnConfiguration,
-			String configurationDirectory,
 			YarnClient yarnClient,
 			boolean sharedYarnClient) {
 
@@ -162,8 +160,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 
 		this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
 		this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
-
-		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 	}
 
 	@VisibleForTesting
@@ -235,9 +231,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		if (this.flinkJarPath == null) {
 			throw new YarnDeploymentException("The Flink jar path is null");
 		}
-		if (this.configurationDirectory == null) {
-			throw new YarnDeploymentException("Configuration directory not set");
-		}
 		if (this.flinkConfiguration == null) {
 			throw new YarnDeploymentException("Flink configuration object has not been set");
 		}
@@ -674,6 +667,15 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		}
 	}
 
+	private boolean containsFileWithEnding(final Set<File> files, final String suffix) {
+		for (File file: files) {
+			if (file.getPath().endsWith(suffix)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	private ApplicationReport startAppMaster(
 			Configuration configuration,
 			String applicationName,
@@ -709,22 +711,10 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 			systemShipFiles.add(file.getAbsoluteFile());
 		}
 
-		//check if there is a logback or log4j file
-		File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
-		final boolean hasLogback = logbackFile.exists();
-		if (hasLogback) {
-			systemShipFiles.add(logbackFile);
-		}
-
-		File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
-		final boolean hasLog4j = log4jFile.exists();
-		if (hasLog4j) {
-			systemShipFiles.add(log4jFile);
-			if (hasLogback) {
-				// this means there is already a logback configuration file --> fail
-				LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
-						"Logback configuration files. Please delete or rename one of them.");
-			}
+		final List<File> logConfigFiles = YarnConfigUtils
+				.decodeListFromConfig(configuration, YarnConfigOptions.APPLICATION_LOG_CONFIG_FILES, File::new);
+		if (logConfigFiles != null) {
+			systemShipFiles.addAll(logConfigFiles);
 		}
 
 		addEnvironmentFoldersToShipFiles(systemShipFiles);
@@ -944,8 +934,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 
 		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
 				yarnClusterEntrypoint,
-				hasLogback,
-				hasLog4j,
+				containsFileWithEnding(systemShipFiles, CONFIG_FILE_LOGBACK_NAME),
+				containsFileWithEnding(systemShipFiles, CONFIG_FILE_LOG4J_NAME),
 				hasKrb5,
 				clusterSpecification.getMasterMemoryMB());
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 8d6916a..61e3d58 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
@@ -72,12 +71,13 @@ import java.io.UnsupportedEncodingException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.URLDecoder;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -87,6 +87,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -158,8 +159,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private final String yarnPropertiesFileLocation;
 
-	private final YarnConfiguration yarnConfiguration;
-
 	public FlinkYarnSessionCli(
 			Configuration configuration,
 			String configurationDirectory,
@@ -175,7 +174,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 			String longPrefix,
 			boolean acceptInteractiveInput) throws FlinkException {
 		super(configuration);
-		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
+		this.configurationDirectory = checkNotNull(configurationDirectory);
 		this.acceptInteractiveInput = acceptInteractiveInput;
 
 		// Create the command line options
@@ -256,126 +255,118 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		} else {
 			yarnApplicationIdFromYarnProperties = null;
 		}
-
-		this.yarnConfiguration = new YarnConfiguration();
 	}
 
-	private YarnClusterDescriptor createDescriptor(
-			Configuration configuration,
-			YarnConfiguration yarnConfiguration,
-			String configurationDirectory,
-			CommandLine cmd) {
-
-		YarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(
-			configuration,
-			yarnConfiguration,
-			configurationDirectory);
-
-		// Jar Path
-		final Path localJarPath;
-		if (cmd.hasOption(flinkJar.getOpt())) {
-			String userPath = cmd.getOptionValue(flinkJar.getOpt());
-			if (!userPath.startsWith("file://")) {
-				userPath = "file://" + userPath;
-			}
-			localJarPath = new Path(userPath);
-		} else {
-			LOG.info("No path for the flink jar passed. Using the location of "
-				+ yarnClusterDescriptor.getClass() + " to locate the jar");
-			String encodedJarPath =
-				yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-
-			final String decodedPath;
-			try {
-				// we have to decode the url encoded parts of the path
-				decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
-			} catch (UnsupportedEncodingException e) {
-				throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
-					" Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
-			}
-
-			// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
-			if (decodedPath.endsWith(".jar")) {
-				localJarPath = new Path(new File(decodedPath).toURI());
-			} else {
-				localJarPath = null;
-			}
-		}
+	private YarnClusterDescriptor createDescriptor(Configuration configuration) {
+		YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration);
 
+		final Path localJarPath = getLocalFlinkDistPath(configuration, yarnClusterDescriptor);
 		if (localJarPath != null) {
 			yarnClusterDescriptor.setLocalJarPath(localJarPath);
 		}
 
-		List<File> shipFiles = new ArrayList<>();
-		// path to directories to ship
-		if (cmd.hasOption(shipPath.getOpt())) {
-			String[] shipPaths = cmd.getOptionValues(this.shipPath.getOpt());
-			for (String shipPath : shipPaths) {
-				File shipDir = new File(shipPath);
-				if (shipDir.isDirectory()) {
-					shipFiles.add(shipDir);
-				} else {
-					LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
-				}
-			}
-		}
-
+		final List<File> shipFiles = decodeDirsToShipToCluster(configuration);
 		yarnClusterDescriptor.addShipFiles(shipFiles);
 
-		// queue
-		if (cmd.hasOption(queue.getOpt())) {
-			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
+		final String queueName = configuration.getString(YarnConfigOptions.APPLICATION_QUEUE);
+		if (queueName != null) {
+			yarnClusterDescriptor.setQueue(queueName);
 		}
 
-		final Properties properties = cmd.getOptionProperties(dynamicproperties.getOpt());
+		final String dynamicPropertiesEncoded = configuration.getString(YarnConfigOptions.DYNAMIC_PROPERTIES);
+		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
-		for (String key : properties.stringPropertyNames()) {
-			LOG.info("Dynamic Property set: {}={}", key, GlobalConfiguration.isSensitive(key) ? GlobalConfiguration.HIDDEN_CONTENT : properties.getProperty(key));
+		final boolean detached = !configuration.getBoolean(DeploymentOptions.ATTACHED);
+		yarnClusterDescriptor.setDetachedMode(detached);
+
+		final String appName = configuration.getString(YarnConfigOptions.APPLICATION_NAME);
+		if (appName != null) {
+			yarnClusterDescriptor.setName(appName);
 		}
 
-		String[] dynamicProperties = properties.stringPropertyNames().stream()
-			.flatMap(
-				(String key) -> {
-					final String value = properties.getProperty(key);
+		final String appType = configuration.getString(YarnConfigOptions.APPLICATION_TYPE);
+		if (appType != null) {
+			yarnClusterDescriptor.setApplicationType(appType);
+		}
 
-					if (value != null) {
-						return Stream.of(key + dynamicproperties.getValueSeparator() + value);
-					} else {
-						return Stream.empty();
-					}
-				})
-			.toArray(String[]::new);
+		final String zkNamespace = configuration.getString(HA_CLUSTER_ID);
+		if (zkNamespace != null) {
+			yarnClusterDescriptor.setZookeeperNamespace(zkNamespace);
+		}
 
-		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+		final String nodeLabel = configuration.getString(YarnConfigOptions.NODE_LABEL);
+		if (nodeLabel != null) {
+			yarnClusterDescriptor.setNodeLabel(nodeLabel);
+		}
 
-		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
+		return yarnClusterDescriptor;
+	}
 
-		if (cmd.hasOption(YARN_DETACHED_OPTION.getOpt()) || cmd.hasOption(DETACHED_OPTION.getOpt())) {
-			yarnClusterDescriptor.setDetachedMode(true);
+	private Path getLocalFlinkDistPath(final Configuration configuration, final YarnClusterDescriptor yarnClusterDescriptor) {
+		final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
+		if (localJarPath != null) {
+			return new Path(localJarPath);
 		}
 
-		if (cmd.hasOption(name.getOpt())) {
-			yarnClusterDescriptor.setName(cmd.getOptionValue(name.getOpt()));
-		}
+		LOG.info("No path for the flink jar passed. Using the location of " + yarnClusterDescriptor.getClass() + " to locate the jar");
+
+		// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
+		final String decodedPath = getDecodedJarPath(yarnClusterDescriptor);
+		return decodedPath.endsWith(".jar")
+				? new Path(new File(decodedPath).toURI())
+				: null;
+	}
 
-		if (cmd.hasOption(applicationType.getOpt())) {
-			yarnClusterDescriptor.setApplicationType(cmd.getOptionValue(applicationType.getOpt()));
+	private String getDecodedJarPath(final YarnClusterDescriptor yarnClusterDescriptor) {
+		final String encodedJarPath = yarnClusterDescriptor
+				.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+		try {
+			return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+		} catch (UnsupportedEncodingException e) {
+			throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
+					" Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
 		}
+	}
 
-		if (cmd.hasOption(zookeeperNamespace.getOpt())) {
-			String zookeeperNamespaceValue = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
-			yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespaceValue);
+	private Path getLocalFlinkDistPathFromCmd(final CommandLine cmd) {
+		final String flinkJarOptionName = flinkJar.getOpt();
+		if (!cmd.hasOption(flinkJarOptionName)) {
+			return null;
 		}
 
-		if (cmd.hasOption(nodeLabel.getOpt())) {
-			String nodeLabelValue = cmd.getOptionValue(this.nodeLabel.getOpt());
-			yarnClusterDescriptor.setNodeLabel(nodeLabelValue);
+		String userPath = cmd.getOptionValue(flinkJarOptionName);
+		if (!userPath.startsWith("file://")) {
+			userPath = "file://" + userPath;
 		}
+		return new Path(userPath);
+	}
 
-		return yarnClusterDescriptor;
+	private List<File> decodeDirsToShipToCluster(final Configuration configuration) {
+		checkNotNull(configuration);
+		return YarnConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
+	}
+
+	private void encodeDirsToShipToCluster(final Configuration configuration, final CommandLine cmd) {
+		checkNotNull(cmd);
+		checkNotNull(configuration);
+
+		if (cmd.hasOption(shipPath.getOpt())) {
+			YarnConfigUtils.encodeListToConfig(
+					configuration,
+					YarnConfigOptions.SHIP_DIRECTORIES,
+					cmd.getOptionValues(this.shipPath.getOpt()),
+					(String path) -> {
+						final File shipDir = new File(path);
+						if (shipDir.isDirectory()) {
+							return path;
+						}
+						LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
+						return null;
+					});
+		}
 	}
 
-	private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
+	private ClusterSpecification createClusterSpecification(Configuration configuration) {
 		// JobManager Memory
 		final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
 
@@ -433,47 +424,30 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	}
 
 	@Override
-	public YarnClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException {
-		final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
-
-		return createDescriptor(
-			effectiveConfiguration,
-			yarnConfiguration,
-			configurationDirectory,
-			commandLine);
+	public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
+		return createDescriptor(configuration);
 	}
 
 	@Override
 	@Nullable
-	public ApplicationId getClusterId(CommandLine commandLine) {
-		if (commandLine.hasOption(applicationId.getOpt())) {
-			return ConverterUtils.toApplicationId(commandLine.getOptionValue(applicationId.getOpt()));
-		} else if (isYarnPropertiesFileMode(commandLine)) {
-			return yarnApplicationIdFromYarnProperties;
-		} else {
-			return null;
-		}
+	public ApplicationId getClusterId(Configuration configuration) {
+		final String clusterId = configuration.getString(YarnConfigOptions.APPLICATION_ID);
+		return clusterId != null ? ConverterUtils.toApplicationId(clusterId) : null;
 	}
 
 	@Override
-	public ClusterSpecification getClusterSpecification(CommandLine commandLine) throws FlinkException {
-		final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
-
-		return createClusterSpecification(effectiveConfiguration, commandLine);
+	public ClusterSpecification getClusterSpecification(Configuration configuration) {
+		return createClusterSpecification(configuration);
 	}
 
 	@Override
-	protected Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
+	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		// we ignore the addressOption because it can only contain "yarn-cluster"
 		final Configuration effectiveConfiguration = new Configuration(configuration);
 
-		if (commandLine.hasOption(zookeeperNamespaceOption.getOpt())) {
-			String zkNamespace = commandLine.getOptionValue(zookeeperNamespaceOption.getOpt());
-			effectiveConfiguration.setString(HA_CLUSTER_ID, zkNamespace);
-		}
-
-		final ApplicationId applicationId = getClusterId(commandLine);
+		applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
 
+		final ApplicationId applicationId = getApplicationId(commandLine);
 		if (applicationId != null) {
 			final String zooKeeperNamespace;
 			if (commandLine.hasOption(zookeeperNamespace.getOpt())){
@@ -483,6 +457,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 			}
 
 			effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
+			effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId));
 		}
 
 		if (commandLine.hasOption(jmMemory.getOpt())) {
@@ -512,6 +487,89 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 	}
 
+	private ApplicationId getApplicationId(CommandLine commandLine) {
+		if (commandLine.hasOption(applicationId.getOpt())) {
+			return ConverterUtils.toApplicationId(commandLine.getOptionValue(applicationId.getOpt()));
+		} else if (isYarnPropertiesFileMode(commandLine)) {
+			return yarnApplicationIdFromYarnProperties;
+		}
+		return null;
+	}
+
+	private void applyDescriptorOptionToConfig(final CommandLine commandLine, final Configuration configuration) {
+		checkNotNull(commandLine);
+		checkNotNull(configuration);
+
+		final Path localJarPath = getLocalFlinkDistPathFromCmd(commandLine);
+		if (localJarPath != null) {
+			configuration.setString(YarnConfigOptions.FLINK_DIST_JAR, localJarPath.toString());
+		}
+
+		encodeDirsToShipToCluster(configuration, commandLine);
+
+		if (commandLine.hasOption(queue.getOpt())) {
+			final String queueName = commandLine.getOptionValue(queue.getOpt());
+			configuration.setString(YarnConfigOptions.APPLICATION_QUEUE, queueName);
+		}
+
+		final String dynamicPropertiesEncoded = encodeDynamicProperties(commandLine);
+		configuration.setString(YarnConfigOptions.DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+
+		final boolean detached = commandLine.hasOption(YARN_DETACHED_OPTION.getOpt()) || commandLine.hasOption(DETACHED_OPTION.getOpt());
+		configuration.setBoolean(DeploymentOptions.ATTACHED, !detached);
+
+		if (commandLine.hasOption(name.getOpt())) {
+			final String appName = commandLine.getOptionValue(name.getOpt());
+			configuration.setString(YarnConfigOptions.APPLICATION_NAME, appName);
+		}
+
+		if (commandLine.hasOption(applicationType.getOpt())) {
+			final String appType = commandLine.getOptionValue(applicationType.getOpt());
+			configuration.setString(YarnConfigOptions.APPLICATION_TYPE, appType);
+		}
+
+		if (commandLine.hasOption(zookeeperNamespace.getOpt())) {
+			String zookeeperNamespaceValue = commandLine.getOptionValue(zookeeperNamespace.getOpt());
+			configuration.setString(HA_CLUSTER_ID, zookeeperNamespaceValue);
+		} else if (commandLine.hasOption(zookeeperNamespaceOption.getOpt())) {
+			String zookeeperNamespaceValue = commandLine.getOptionValue(zookeeperNamespaceOption.getOpt());
+			configuration.setString(HA_CLUSTER_ID, zookeeperNamespaceValue);
+		}
+
+		if (commandLine.hasOption(nodeLabel.getOpt())) {
+			final String nodeLabelValue = commandLine.getOptionValue(this.nodeLabel.getOpt());
+			configuration.setString(YarnConfigOptions.NODE_LABEL, nodeLabelValue);
+		}
+
+		discoverAndEncodeLogConfigFiles(configuration);
+	}
+
+	private void discoverAndEncodeLogConfigFiles(final Configuration configuration) {
+		final Set<File> logFiles = discoverLogConfigFiles();
+		YarnConfigUtils.encodeListToConfig(configuration, YarnConfigOptions.APPLICATION_LOG_CONFIG_FILES, logFiles, File::getPath);
+	}
+
+	private Set<File> discoverLogConfigFiles() {
+		final Set<File> logConfigFiles = new HashSet<>();
+
+		File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
+		final boolean hasLogback = logbackFile.exists();
+		if (hasLogback) {
+			logConfigFiles.add(logbackFile);
+		}
+
+		File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
+		final boolean hasLog4j = log4jFile.exists();
+		if (hasLog4j) {
+			logConfigFiles.add(log4jFile);
+			if (hasLogback) {
+				LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
+						"Logback configuration files. Please delete or rename one of them.");
+			}
+		}
+		return logConfigFiles;
+	}
+
 	private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
 		boolean canApplyYarnProperties = !commandLine.hasOption(addressOption.getOpt());
 
@@ -573,7 +631,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 			return 0;
 		}
 
-		final YarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd);
+		final Configuration configuration = applyCommandLineOptionsToConfiguration(cmd);
+		final YarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(configuration);
 
 		try {
 			// Query cluster for metrics
@@ -590,7 +649,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 					clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId);
 				} else {
-					final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
+					final ClusterSpecification clusterSpecification = getClusterSpecification(configuration);
 
 					clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
 
@@ -772,6 +831,26 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		System.out.println(message);
 	}
 
+	private String encodeDynamicProperties(final CommandLine cmd) {
+		final Properties properties = cmd.getOptionProperties(dynamicproperties.getOpt());
+		final String[] dynamicProperties = properties.stringPropertyNames().stream()
+				.flatMap(
+						(String key) -> {
+							final String value = properties.getProperty(key);
+
+							LOG.info("Dynamic Property set: {}={}", key, GlobalConfiguration.isSensitive(key) ? GlobalConfiguration.HIDDEN_CONTENT : value);
+
+							if (value != null) {
+								return Stream.of(key + dynamicproperties.getValueSeparator() + value);
+							} else {
+								return Stream.empty();
+							}
+						})
+				.toArray(String[]::new);
+
+		return StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+	}
+
 	public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
 		if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
 			Map<String, String> properties = new HashMap<>();
@@ -958,20 +1037,4 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
 	}
-
-	private YarnClusterDescriptor getClusterDescriptor(
-			Configuration configuration,
-			YarnConfiguration yarnConfiguration,
-			String configurationDirectory) {
-		final YarnClient yarnClient = YarnClient.createYarnClient();
-		yarnClient.init(yarnConfiguration);
-		yarnClient.start();
-
-		return new YarnClusterDescriptor(
-			configuration,
-			yarnConfiguration,
-			configurationDirectory,
-			yarnClient,
-			false);
-	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnConfigUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnConfigUtils.java
new file mode 100644
index 0000000..edb0209
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnConfigUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities for parsing {@link org.apache.flink.configuration.ConfigOption configuration options}.
+ */
+public class YarnConfigUtils {
+	// TODO: 16.10.19 test this and test LOG FILES discovery.
+	private static final String COLLECTION_DELIMITER = ";";
+
+	public static <T> void encodeListToConfig(
+			final Configuration configuration,
+			final ConfigOption<String> key,
+			final Collection<T> value,
+			final Function<T, String> mapper) {
+		encodeListToConfig(configuration, key, value.stream(), mapper);
+	}
+
+	public static <T> void encodeListToConfig(
+			final Configuration configuration,
+			final ConfigOption<String> key,
+			final T[] value,
+			final Function<T, String> mapper) {
+		encodeListToConfig(configuration, key, Arrays.stream(value), mapper);
+	}
+
+	private static <T> void encodeListToConfig(
+			final Configuration configuration,
+			final ConfigOption<String> key,
+			final Stream<T> values,
+			final Function<T, String> mapper) {
+
+		checkNotNull(values);
+		checkNotNull(key);
+		checkNotNull(configuration);
+
+		final String encodedString = values.map(mapper).filter(Objects::nonNull).collect(Collectors.joining(COLLECTION_DELIMITER));
+		if (encodedString != null && !encodedString.isEmpty()) {
+			configuration.setString(key, encodedString);
+		}
+	}
+
+	public static <R> List<R> decodeListFromConfig(
+			final Configuration configuration,
+			final ConfigOption<String> key,
+			final Function<String, R> mapper) {
+
+		checkNotNull(configuration);
+		checkNotNull(key);
+
+		final String encodedString = configuration.getString(key);
+		return encodedString != null
+				? Arrays.stream(encodedString.split(COLLECTION_DELIMITER)).map(mapper).collect(Collectors.toList())
+				: Collections.emptyList();
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 035bb04..d8611cc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -204,6 +204,54 @@ public class YarnConfigOptions {
 		.defaultValue("")
 		.withDescription("A comma-separated list of tags to apply to the Flink YARN application.");
 
+	// ----------------------- YARN CLI OPTIONS ------------------------------------
+
+	public static final ConfigOption<String> APPLICATION_LOG_CONFIG_FILES =
+			key("yarn.log-config-file")
+				.noDefaultValue()
+				.withDescription("The location of the log config file, e.g. the path to your log4j.properties for log4j.");
+
+	public static final ConfigOption<String> DYNAMIC_PROPERTIES =
+			key("$internal.yarn.dynamic-properties")
+				.noDefaultValue()
+				.withDescription("**DO NOT USE** Specify YARN dynamic properties.");
+
+	public static final ConfigOption<String> SHIP_DIRECTORIES =
+			key("yarn.ship-directories")
+				.noDefaultValue()
+				.withDescription("A semicolon-separated list of directories to be shipped to the YARN cluster.");
+
+	public static final ConfigOption<String> FLINK_DIST_JAR =
+			key("yarn.flink-dist-jar")
+				.noDefaultValue()
+				.withDescription("The location of the Flink dist jar.");
+
+	public static final ConfigOption<String> APPLICATION_ID =
+			key("yarn.application.id")
+				.noDefaultValue()
+				.withDescription("The YARN application id of the running yarn cluster." +
+						" This is the YARN cluster where the pipeline is going to be executed.");
+
+	public static final ConfigOption<String> APPLICATION_QUEUE =
+			key("yarn.application.queue")
+				.noDefaultValue()
+				.withDescription("The YARN queue on which to put the current pipeline.");
+
+	public static final ConfigOption<String> APPLICATION_NAME =
+			key("yarn.application.name")
+				.noDefaultValue()
+				.withDescription("A custom name for your YARN application.");
+
+	public static final ConfigOption<String> APPLICATION_TYPE =
+			key("yarn.application.type")
+				.noDefaultValue()
+				.withDescription("A custom type for your YARN application..");
+
+	public static final ConfigOption<String> NODE_LABEL =
+			key("yarn.application.node-label")
+				.noDefaultValue()
+				.withDescription("Specify YARN node label for the YARN application.");
+
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated. */
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
index 9009a53..1ba2008 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
@@ -67,7 +67,6 @@ public class AbstractYarnClusterTest extends TestLogger {
 		final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			new Configuration(),
 			yarnConfiguration,
-			temporaryFolder.newFolder().getAbsolutePath(),
 			yarnClient,
 			false);
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 2338875..ae566b4 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -85,7 +85,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar",
 				"-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar", "-D", "security.ssl.internal.key-password=changeit"});
 
-		YarnClusterDescriptor flinkYarnDescriptor = cli.createClusterDescriptor(cmd);
+		Configuration executorConfig = cli.applyCommandLineOptionsToConfiguration(cmd);
+		YarnClusterDescriptor flinkYarnDescriptor = cli.createClusterDescriptor(executorConfig);
 
 		Assert.assertNotNull(flinkYarnDescriptor);
 
@@ -109,10 +110,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+		final Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
-
-		final ClusterSpecification clusterSpecification = yarnCLI.getClusterSpecification(commandLine);
+		final YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
+		final ClusterSpecification clusterSpecification = yarnCLI.getClusterSpecification(executorConfig);
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
 		assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
@@ -131,8 +132,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+		final Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
+		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
 		assertTrue(descriptor.isDetachedMode());
@@ -151,8 +153,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+		Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
+		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
 
 		assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
 	}
@@ -170,8 +173,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+		Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
+		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
 
 		assertEquals(nodeLabelCliInput, descriptor.getNodeLabel());
 	}
@@ -194,8 +198,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
+		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(executorConfig);
 
 		assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
 	}
@@ -229,8 +234,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
+		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(executorConfig);
 
 		assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
 	}
@@ -245,8 +251,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final YarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
+		final YarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(executorConfig);
 
 		final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
 
@@ -266,8 +273,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final String overrideZkNamespace = "my_cluster";
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace}, true);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final YarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
+		final YarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(executorConfig);
 
 		final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
 
@@ -288,7 +296,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"y",
 			"yarn");
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
-		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+
+		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(executorConfig);
 		assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
 	}
 
@@ -315,8 +325,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+		Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(jobManagerMemory));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(taskManagerMemory));
@@ -345,8 +356,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+		Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(jobManagerMemory));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(taskManagerMemory));
@@ -366,8 +378,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
@@ -385,7 +398,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"y",
 			"yarn");
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+
+		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
@@ -403,7 +418,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"y",
 			"yarn");
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+
+		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
@@ -425,8 +442,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096));
@@ -444,8 +462,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);
+		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
@@ -461,8 +480,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
+		YarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(executorConfig);
 
 		assertEquals(2, flinkYarnDescriptor.getShipFiles().size());
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 143749b04..69af7c3 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -525,7 +525,6 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		yarnClusterDescriptor = new YarnClusterDescriptor(
 			new Configuration(),
 			yarnConfiguration,
-			temporaryFolder.getRoot().getAbsolutePath(),
 			closableYarnClient,
 			false);
 
@@ -542,7 +541,6 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		return new YarnClusterDescriptor(
 			configuration,
 			yarnConfiguration,
-			temporaryFolder.getRoot().getAbsolutePath(),
 			yarnClient,
 			true);
 	}


[flink] 05/08: [FLINK-14501] Add Standalone and Yarn ClusterClientFactories

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7777994df82c6fbae7fe101551a742f13822de00
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 20:38:28 2019 +0200

    [FLINK-14501] Add Standalone and Yarn ClusterClientFactories
---
 .../client/deployment/StandaloneClientFactory.java |  59 +++++++
 ...he.flink.client.deployment.ClusterClientFactory |  16 ++
 .../deployment/ClusterClientServiceLoaderTest.java | 167 ++++++++++++++++++++
 ...he.flink.client.deployment.ClusterClientFactory |  18 +++
 .../flink/yarn/YarnClusterClientFactory.java       | 173 +++++++++++++++++++++
 ...he.flink.client.deployment.ClusterClientFactory |  16 ++
 .../flink/yarn/YarnClusterClientFactoryTest.java   |  47 ++++++
 7 files changed, 496 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
new file mode 100644
index 0000000..b441a63
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ClusterClientFactory} for a standalone cluster, i.e. Flink on bare-metal.
+ */
+public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
+
+	public static final String ID = "default";
+
+	@Override
+	public boolean isCompatibleWith(Configuration configuration) {
+		checkNotNull(configuration);
+		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+	}
+
+	@Override
+	public StandaloneClusterDescriptor createClusterDescriptor(Configuration configuration) {
+		checkNotNull(configuration);
+		return new StandaloneClusterDescriptor(configuration);
+	}
+
+	@Override
+	@Nullable
+	public StandaloneClusterId getClusterId(Configuration configuration) {
+		checkNotNull(configuration);
+		return StandaloneClusterId.getInstance();
+	}
+
+	@Override
+	public ClusterSpecification getClusterSpecification(Configuration configuration) {
+		checkNotNull(configuration);
+		return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
+	}
+}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..fd9e4fa
--- /dev/null
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.StandaloneClientFactory
\ No newline at end of file
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
new file mode 100644
index 0000000..45157a2
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link DefaultClusterClientServiceLoader}.
+ */
+public class ClusterClientServiceLoaderTest {
+
+	private static final String VALID_TARGET = "existing";
+	private static final String AMBIGUOUS_TARGET = "duplicate";
+	private static final String NON_EXISTING_TARGET = "non-existing";
+
+	private static final int VALID_ID = 42;
+
+	private ClusterClientServiceLoader serviceLoaderUnderTest;
+
+	@Before
+	public void init() {
+		serviceLoaderUnderTest = new DefaultClusterClientServiceLoader();
+	}
+
+	@Test
+	public void testStandaloneClusterClientFactoryDiscovery() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID);
+
+		ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+		assertTrue(factory instanceof StandaloneClientFactory);
+	}
+
+	@Test
+	public void testFactoryDiscovery() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, VALID_TARGET);
+
+		final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+		assertNotNull(factory);
+
+		final Integer id = factory.getClusterId(config);
+		assertThat(id, allOf(is(notNullValue()), equalTo(VALID_ID)));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testMoreThanOneCompatibleFactoriesException() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, AMBIGUOUS_TARGET);
+
+		serviceLoaderUnderTest.getClusterClientFactory(config);
+		fail();
+	}
+
+	@Test
+	public void testNoFactoriesFound() {
+		final Configuration config = new Configuration();
+		config.setString(DeploymentOptions.TARGET, NON_EXISTING_TARGET);
+
+		final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
+		assertNull(factory);
+	}
+
+	/**
+	 * Test {@link ClusterClientFactory} that is successfully discovered.
+	 */
+	public static class ValidClusterClientFactory extends DummyClusterClientFactory {
+
+		public static final String ID = VALID_TARGET;
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			return configuration.getString(DeploymentOptions.TARGET).equals(VALID_TARGET);
+		}
+
+		@Nullable
+		@Override
+		public Integer getClusterId(Configuration configuration) {
+			return VALID_ID;
+		}
+	}
+
+	/**
+	 * Test {@link ClusterClientFactory} that has a duplicate.
+	 */
+	public static class FirstCollidingClusterClientFactory extends DummyClusterClientFactory {
+
+		public static final String ID = AMBIGUOUS_TARGET;
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET);
+		}
+	}
+
+	/**
+	 * Test {@link ClusterClientFactory} that has a duplicate.
+	 */
+	public static class SecondCollidingClusterClientFactory extends DummyClusterClientFactory {
+
+		public static final String ID = AMBIGUOUS_TARGET;
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET);
+		}
+	}
+
+	/**
+	 * A base test {@link ClusterClientFactory} that supports no operation and is meant to be extended.
+	 */
+	public static class DummyClusterClientFactory implements ClusterClientFactory<Integer> {
+
+		@Override
+		public boolean isCompatibleWith(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public ClusterDescriptor<Integer> createClusterDescriptor(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Nullable
+		@Override
+		public Integer getClusterId(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public ClusterSpecification getClusterSpecification(Configuration configuration) {
+			throw new UnsupportedOperationException();
+		}
+	}
+}
diff --git a/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..930cea9
--- /dev/null
+++ b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$ValidClusterClientFactory
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$FirstCollidingClusterClientFactory
+org.apache.flink.client.deployment.ClusterClientServiceLoaderTest$SecondCollidingClusterClientFactory
\ No newline at end of file
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
new file mode 100644
index 0000000..a0d9ab2
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.yarn.cli.YarnConfigUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ClusterClientFactory} for a YARN cluster.
+ */
+public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientFactory.class);
+
+	public static final String ID = "yarn-cluster";
+
+	@Override
+	public boolean isCompatibleWith(Configuration configuration) {
+		checkNotNull(configuration);
+		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+	}
+
+	@Override
+	public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
+		checkNotNull(configuration);
+
+		final YarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(configuration);
+		yarnClusterDescriptor.setDetachedMode(!configuration.getBoolean(DeploymentOptions.ATTACHED));
+
+		getLocalFlinkDistPath(configuration, yarnClusterDescriptor)
+				.ifPresent(yarnClusterDescriptor::setLocalJarPath);
+
+		decodeDirsToShipToCluster(configuration)
+				.ifPresent(yarnClusterDescriptor::addShipFiles);
+
+		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_QUEUE, yarnClusterDescriptor::setQueue);
+		handleConfigOption(configuration, YarnConfigOptions.DYNAMIC_PROPERTIES, yarnClusterDescriptor::setDynamicPropertiesEncoded);
+		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_NAME, yarnClusterDescriptor::setName);
+		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_TYPE, yarnClusterDescriptor::setApplicationType);
+		handleConfigOption(configuration, YarnConfigOptions.NODE_LABEL, yarnClusterDescriptor::setNodeLabel);
+		handleConfigOption(configuration, HighAvailabilityOptions.HA_CLUSTER_ID, yarnClusterDescriptor::setZookeeperNamespace);
+		return yarnClusterDescriptor;
+	}
+
+	@Nullable
+	@Override
+	public ApplicationId getClusterId(Configuration configuration) {
+		checkNotNull(configuration);
+		final String clusterId = configuration.getString(YarnConfigOptions.APPLICATION_ID);
+		return clusterId != null ? ConverterUtils.toApplicationId(clusterId) : null;
+	}
+
+	@Override
+	public ClusterSpecification getClusterSpecification(Configuration configuration) {
+		checkNotNull(configuration);
+
+		// JobManager Memory
+		final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
+
+		// Task Managers memory
+		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+
+		int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+
+		return new ClusterSpecification.ClusterSpecificationBuilder()
+				.setMasterMemoryMB(jobManagerMemoryMB)
+				.setTaskManagerMemoryMB(taskManagerMemoryMB)
+				.setSlotsPerTaskManager(slotsPerTaskManager)
+				.createClusterSpecification();
+	}
+
+	private Optional<List<File>> decodeDirsToShipToCluster(final Configuration configuration) {
+		checkNotNull(configuration);
+
+		final List<File> files = YarnConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
+		return files.isEmpty() ? Optional.empty() : Optional.of(files);
+	}
+
+	private Optional<Path> getLocalFlinkDistPath(final Configuration configuration, final YarnClusterDescriptor yarnClusterDescriptor) {
+		final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
+		if (localJarPath != null) {
+			return Optional.of(new Path(localJarPath));
+		}
+
+		LOG.info("No path for the flink jar passed. Using the location of " + yarnClusterDescriptor.getClass() + " to locate the jar");
+
+		// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
+		final String decodedPath = getDecodedJarPath(yarnClusterDescriptor);
+		return decodedPath.endsWith(".jar")
+				? Optional.of(new Path(new File(decodedPath).toURI()))
+				: Optional.empty();
+	}
+
+	private String getDecodedJarPath(final YarnClusterDescriptor yarnClusterDescriptor) {
+		final String encodedJarPath = yarnClusterDescriptor
+				.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+		try {
+			return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+		} catch (UnsupportedEncodingException e) {
+			throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
+					" You can supply a path manually via the command line.");
+		}
+	}
+
+	private void handleConfigOption(final Configuration configuration, final ConfigOption<String> option, final Consumer<String> consumer) {
+		checkNotNull(configuration);
+		checkNotNull(option);
+		checkNotNull(consumer);
+
+		final String value = configuration.getString(option);
+		if (value != null) {
+			consumer.accept(value);
+		}
+	}
+
+	private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+		final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+
+		return new YarnClusterDescriptor(
+				configuration,
+				yarnConfiguration,
+				yarnClient,
+				false);
+	}
+}
diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 0000000..ea1c4de
--- /dev/null
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.YarnClusterClientFactory
\ No newline at end of file
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
new file mode 100644
index 0000000..931313a
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for the {@link YarnClusterClientFactory} discovery.
+ */
+public class YarnClusterClientFactoryTest {
+
+	@Test
+	public void testYarnClusterClientFactoryDiscovery() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID);
+
+		final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
+		final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);
+
+		assertTrue(factory instanceof YarnClusterClientFactory);
+	}
+}


[flink] 06/08: [FLINK-14501] Wired ClusterClientFactories to production code

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5753bbe9c0ae75754028b4a8bf3c24869a63935
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 22:37:11 2019 +0200

    [FLINK-14501] Wired ClusterClientFactories to production code
---
 .../client/cli/AbstractCustomCommandLine.java      |   2 +-
 .../org/apache/flink/client/cli/CliFrontend.java   |  74 +++++++----
 .../apache/flink/client/cli/CliFrontendParser.java |  14 +-
 .../apache/flink/client/cli/CustomCommandLine.java |  34 +----
 .../org/apache/flink/client/cli/DefaultCLI.java    |  26 +---
 .../flink/client/cli/CliFrontendRunTest.java       |  21 ++-
 .../flink/client/cli/CliFrontendTestBase.java      |   6 +-
 .../apache/flink/client/cli/DefaultCLITest.java    |  36 ++---
 ...andLine.java => DummyClusterClientFactory.java} |  47 +++----
 ...d.java => DummyClusterClientServiceLoader.java} |  25 ++--
 .../client/cli/util/DummyClusterDescriptor.java    |   3 +-
 .../client/cli/util/DummyCustomCommandLine.java    |  36 +----
 .../flink/client/cli/util/MockedCliFrontend.java   |   5 +-
 .../client/program/rest/RestClusterClientTest.java |  14 +-
 .../org/apache/flink/api/scala/FlinkShell.scala    |  18 +--
 .../client/gateway/local/ExecutionContext.java     |  35 +++--
 .../table/client/gateway/local/LocalExecutor.java  |  20 ++-
 .../table/client/gateway/local/DependencyTest.java |   4 +-
 .../client/gateway/local/LocalExecutorITCase.java  |  10 +-
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |  42 ++++--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 148 ++++-----------------
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java |  96 ++++++++-----
 22 files changed, 323 insertions(+), 393 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index cba1036..f32d4f8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -39,7 +39,7 @@ import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConf
  * a ZooKeeper namespace.
  *
  */
-public abstract class AbstractCustomCommandLine<T> implements CustomCommandLine<T> {
+public abstract class AbstractCustomCommandLine implements CustomCommandLine {
 
 	protected final Option zookeeperNamespaceOption = new Option("z", "zookeeperNamespace", true,
 		"Namespace to create the Zookeeper sub-paths for high availability mode");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c49374a..e1a838f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -27,8 +27,11 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
@@ -81,6 +84,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Implementation of a simple command line frontend for executing programs.
  */
@@ -104,7 +110,7 @@ public class CliFrontend {
 
 	private final Configuration configuration;
 
-	private final List<CustomCommandLine<?>> customCommandLines;
+	private final List<CustomCommandLine> customCommandLines;
 
 	private final Options customCommandLineOptions;
 
@@ -112,17 +118,27 @@ public class CliFrontend {
 
 	private final int defaultParallelism;
 
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
 	public CliFrontend(
 			Configuration configuration,
-			List<CustomCommandLine<?>> customCommandLines) {
-		this.configuration = Preconditions.checkNotNull(configuration);
-		this.customCommandLines = Preconditions.checkNotNull(customCommandLines);
+			List<CustomCommandLine> customCommandLines) {
+		this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
+	}
+
+	public CliFrontend(
+			Configuration configuration,
+			ClusterClientServiceLoader clusterClientServiceLoader,
+			List<CustomCommandLine> customCommandLines) {
+		this.configuration = checkNotNull(configuration);
+		this.customCommandLines = checkNotNull(customCommandLines);
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
 
 		FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
 
 		this.customCommandLineOptions = new Options();
 
-		for (CustomCommandLine<?> customCommandLine : customCommandLines) {
+		for (CustomCommandLine customCommandLine : customCommandLines) {
 			customCommandLine.addGeneralOptions(customCommandLineOptions);
 			customCommandLine.addRunOptions(customCommandLineOptions);
 		}
@@ -194,24 +210,27 @@ public class CliFrontend {
 			throw new CliArgsException("Could not build the program from JAR file.", e);
 		}
 
-		final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
 		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
 		try {
-			runProgram(customCommandLine, executorConfig, runOptions, program);
+			runProgram(executorConfig, runOptions, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
 	}
 
 	private <T> void runProgram(
-			CustomCommandLine<T> customCommandLine,
 			Configuration executorConfig,
 			RunOptions runOptions,
 			PackagedProgram program) throws ProgramInvocationException, FlinkException {
-		final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(executorConfig);
+
+		final ClusterClientFactory<T> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+		checkState(clusterClientFactory != null);
+
+		final ClusterDescriptor<T> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);
 
 		try {
-			final T clusterId = customCommandLine.getClusterId(executorConfig);
+			final T clusterId = clusterClientFactory.getClusterId(executorConfig);
 
 			final ClusterClient<T> client;
 
@@ -221,7 +240,7 @@ public class CliFrontend {
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
-				final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(executorConfig);
+				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
 				client = clusterDescriptor.deployJobCluster(
 					clusterSpecification,
 					jobGraph,
@@ -242,7 +261,7 @@ public class CliFrontend {
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(executorConfig);
+					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
@@ -395,7 +414,7 @@ public class CliFrontend {
 			showAll = listOptions.showAll();
 		}
 
-		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
 
 		runClusterAction(
 			activeCommandLine,
@@ -513,7 +532,7 @@ public class CliFrontend {
 
 		logAndSysout((advanceToEndOfEventTime ? "Draining job " : "Suspending job ") + "\"" + jobId + "\" with a savepoint.");
 
-		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
 		runClusterAction(
 			activeCommandLine,
 			commandLine,
@@ -550,7 +569,7 @@ public class CliFrontend {
 			return;
 		}
 
-		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
 
 		final String[] cleanedArgs = cancelOptions.getArgs();
 
@@ -635,7 +654,7 @@ public class CliFrontend {
 			return;
 		}
 
-		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
 
 		if (savepointOptions.isDispose()) {
 			runClusterAction(
@@ -908,21 +927,22 @@ public class CliFrontend {
 	 * @param activeCommandLine to create the {@link ClusterDescriptor} from
 	 * @param commandLine containing the parsed command line options
 	 * @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}.
-	 * @param <T> type of the cluster id
+	 * @param <ClusterID> type of the cluster id
 	 * @throws FlinkException if something goes wrong
 	 */
-	private <T> void runClusterAction(CustomCommandLine<T> activeCommandLine, CommandLine commandLine, ClusterAction<T> clusterAction) throws FlinkException {
+	private <ClusterID> void runClusterAction(CustomCommandLine activeCommandLine, CommandLine commandLine, ClusterAction<ClusterID> clusterAction) throws FlinkException {
 		final Configuration executorConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-		final ClusterDescriptor<T> clusterDescriptor = activeCommandLine.createClusterDescriptor(executorConfig);
 
-		final T clusterId = activeCommandLine.getClusterId(executorConfig);
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);
+		final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig);
 
 		if (clusterId == null) {
 			throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
 				"you would like to connect.");
 		} else {
 			try {
-				final ClusterClient<T> clusterClient = clusterDescriptor.retrieve(clusterId);
+				final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterId);
 
 				try {
 					clusterAction.runAction(clusterClient);
@@ -1052,7 +1072,7 @@ public class CliFrontend {
 		final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
 
 		// 3. load the custom command lines
-		final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
+		final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
 			configuration,
 			configurationDirectory);
 
@@ -1117,8 +1137,8 @@ public class CliFrontend {
 		config.setInteger(RestOptions.PORT, address.getPort());
 	}
 
-	public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
-		List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
+	public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
+		List<CustomCommandLine> customCommandLines = new ArrayList<>(2);
 
 		//	Command line interface of the YARN session, with a special initialization here
 		//	to prefix all options with y/yarn.
@@ -1150,8 +1170,8 @@ public class CliFrontend {
 	 * @param commandLine The input to the command-line.
 	 * @return custom command-line which is active (may only be one at a time)
 	 */
-	public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) {
-		for (CustomCommandLine<?> cli : customCommandLines) {
+	public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+		for (CustomCommandLine cli : customCommandLines) {
 			if (cli.isActive(commandLine)) {
 				return cli;
 			}
@@ -1164,7 +1184,7 @@ public class CliFrontend {
 	 * @param className The fully-qualified class name to load.
 	 * @param params The constructor parameters
 	 */
-	private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
+	private static CustomCommandLine loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
 
 		Class<? extends CustomCommandLine> customCliClass =
 			Class.forName(className).asSubclass(CustomCommandLine.class);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 5639cb5..97ba33e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -306,7 +306,7 @@ public class CliFrontendParser {
 	/**
 	 * Prints the help for the client.
 	 */
-	public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelp(Collection<CustomCommandLine> customCommandLines) {
 		System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
 		System.out.println();
 		System.out.println("The following actions are available:");
@@ -321,7 +321,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForRun(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -349,7 +349,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForList(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -364,7 +364,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForStop(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -379,7 +379,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -394,7 +394,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -415,7 +415,7 @@ public class CliFrontendParser {
 	 * @param runOptions True if the run options should be printed, False to print only general options
 	 */
 	private static void printCustomCliOptions(
-			Collection<CustomCommandLine<?>> customCommandLines,
+			Collection<CustomCommandLine> customCommandLines,
 			HelpFormatter formatter,
 			boolean runOptions) {
 		// prints options from all available command-line classes
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index 9458de2..4241b1c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -18,20 +18,16 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 
-import javax.annotation.Nullable;
-
 /**
  * Custom command-line interface to load hooks for the command-line interface.
  */
-public interface CustomCommandLine<T> {
+public interface CustomCommandLine {
 
 	/**
 	 * Signals whether the custom command-line wants to execute or not.
@@ -67,34 +63,6 @@ public interface CustomCommandLine<T> {
 	 */
 	Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException;
 
-	/**
-	 * Create a {@link ClusterDescriptor} from the given configuration.
-	 *
-	 * @param configuration containing the configuration options relevant for the {@link ClusterDescriptor}
-	 * @return the corresponding {@link ClusterDescriptor}.
-	 */
-	ClusterDescriptor<T> createClusterDescriptor(Configuration configuration);
-
-	/**
-	 * Returns the cluster id if a cluster id is specified in the provided configuration, otherwise it returns {@code null}.
-	 *
-	 * <p>A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink cluster running on Yarn.
-	 *
-	 * @param configuration containing the configuration options relevant for the cluster id retrieval
-	 * @return Cluster id identifying the cluster to deploy jobs to or null
-	 */
-	@Nullable
-	T getClusterId(Configuration configuration);
-
-	/**
-	 * Returns the {@link ClusterSpecification} specified by the configuration and the command
-	 * line options. This specification can be used to deploy a new Flink cluster.
-	 *
-	 * @param configuration containing the configuration options relevant for the {@link ClusterSpecification}
-	 * @return the corresponding {@link ClusterSpecification} for a new Flink cluster
-	 */
-	ClusterSpecification getClusterSpecification(Configuration configuration);
-
 	default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) throws CliArgsException {
 		final Options options = new Options();
 		addGeneralOptions(options);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index f21f755..397d5dd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -18,20 +18,16 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
-import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
 import org.apache.flink.configuration.Configuration;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 
-import javax.annotation.Nullable;
-
 /**
  * The default CLI which is used for interaction with standalone clusters.
  */
-public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
+public class DefaultCLI extends AbstractCustomCommandLine {
 
 	public DefaultCLI(Configuration configuration) {
 		super(configuration);
@@ -45,27 +41,11 @@ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
 
 	@Override
 	public String getId() {
-		return "default";
+		return StandaloneClientFactory.ID;
 	}
 
 	@Override
 	public void addGeneralOptions(Options baseOptions) {
 		super.addGeneralOptions(baseOptions);
 	}
-
-	@Override
-	public StandaloneClusterDescriptor createClusterDescriptor(Configuration configuration) {
-		return new StandaloneClusterDescriptor(configuration);
-	}
-
-	@Override
-	@Nullable
-	public StandaloneClusterId getClusterId(Configuration configuration) {
-		return StandaloneClusterId.getInstance();
-	}
-
-	@Override
-	public ClusterSpecification getClusterSpecification(Configuration configuration) {
-		return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
-	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index 7bff564..2230917 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.client.cli;
 
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
@@ -145,12 +147,23 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 	// --------------------------------------------------------------------------------------------
 
 	public static void verifyCliFrontend(
-		AbstractCustomCommandLine<?> cli,
+		AbstractCustomCommandLine cli,
 		String[] parameters,
 		int expectedParallelism,
 		boolean isDetached) throws Exception {
 		RunTestingCliFrontend testFrontend =
-			new RunTestingCliFrontend(cli, expectedParallelism, isDetached);
+			new RunTestingCliFrontend(new DefaultClusterClientServiceLoader(), cli, expectedParallelism, isDetached);
+		testFrontend.run(parameters); // verifies the expected values (see below)
+	}
+
+	public static void verifyCliFrontend(
+		ClusterClientServiceLoader clusterClientServiceLoader,
+		AbstractCustomCommandLine cli,
+		String[] parameters,
+		int expectedParallelism,
+		boolean isDetached) throws Exception {
+		RunTestingCliFrontend testFrontend =
+			new RunTestingCliFrontend(clusterClientServiceLoader, cli, expectedParallelism, isDetached);
 		testFrontend.run(parameters); // verifies the expected values (see below)
 	}
 
@@ -160,11 +173,13 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		private final boolean isDetached;
 
 		private RunTestingCliFrontend(
-				AbstractCustomCommandLine<?> cli,
+				ClusterClientServiceLoader clusterClientServiceLoader,
+				AbstractCustomCommandLine cli,
 				int expectedParallelism,
 				boolean isDetached) {
 			super(
 				cli.getConfiguration(),
+				clusterClientServiceLoader,
 				Collections.singletonList(cli));
 			this.expectedParallelism = expectedParallelism;
 			this.isDetached = isDetached;
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
index 8ff426c..b384d49 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java
@@ -28,12 +28,10 @@ import org.apache.flink.util.TestLogger;
 public abstract class CliFrontendTestBase extends TestLogger {
 
 	protected Configuration getConfiguration() {
-		final Configuration configuration = GlobalConfiguration
-			.loadConfiguration(CliFrontendTestUtils.getConfigDir());
-		return configuration;
+		return GlobalConfiguration.loadConfiguration(CliFrontendTestUtils.getConfigDir());
 	}
 
-	static AbstractCustomCommandLine<?> getCli(Configuration configuration) {
+	static AbstractCustomCommandLine getCli(Configuration configuration) {
 		return new DefaultCLI(configuration);
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index 73f967d..ba15088 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.client.cli;
 
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.cli.CommandLine;
 import org.hamcrest.Matchers;
@@ -33,6 +37,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.net.URL;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -57,17 +62,12 @@ public class DefaultCLITest extends CliFrontendTestBase {
 		configuration.setString(RestOptions.ADDRESS, localhost);
 		configuration.setInteger(RestOptions.PORT, port);
 
-		@SuppressWarnings("unchecked")
-		final AbstractCustomCommandLine<StandaloneClusterId> defaultCLI =
-			(AbstractCustomCommandLine<StandaloneClusterId>) getCli(configuration);
+		final AbstractCustomCommandLine defaultCLI = getCli(configuration);
 
 		final String[] args = {};
 
-		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
-		final Configuration executorConfig = defaultCLI.applyCommandLineOptionsToConfiguration(commandLine);
-
-		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = defaultCLI.createClusterDescriptor(executorConfig);
-		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(executorConfig));
+		final CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+		final ClusterClient<?> clusterClient = getClusterClient(defaultCLI, commandLine);
 
 		final URL webInterfaceUrl = new URL(clusterClient.getWebInterfaceURL());
 
@@ -87,19 +87,14 @@ public class DefaultCLITest extends CliFrontendTestBase {
 		configuration.setString(JobManagerOptions.ADDRESS, localhost);
 		configuration.setInteger(JobManagerOptions.PORT, port);
 
-		@SuppressWarnings("unchecked")
-		final AbstractCustomCommandLine<StandaloneClusterId> defaultCLI =
-			(AbstractCustomCommandLine<StandaloneClusterId>) getCli(configuration);
+		final AbstractCustomCommandLine defaultCLI = getCli(configuration);
 
 		final String manualHostname = "123.123.123.123";
 		final int manualPort = 4321;
 		final String[] args = {"-m", manualHostname + ':' + manualPort};
 
-		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
-		final Configuration executorConfig = defaultCLI.applyCommandLineOptionsToConfiguration(commandLine);
-
-		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = defaultCLI.createClusterDescriptor(executorConfig);
-		final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(executorConfig));
+		final CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+		final ClusterClient<?> clusterClient = getClusterClient(defaultCLI, commandLine);
 
 		final URL webInterfaceUrl = new URL(clusterClient.getWebInterfaceURL());
 
@@ -107,4 +102,13 @@ public class DefaultCLITest extends CliFrontendTestBase {
 		assertThat(webInterfaceUrl.getPort(), Matchers.equalTo(manualPort));
 	}
 
+	private ClusterClient<?> getClusterClient(AbstractCustomCommandLine defaultCLI, CommandLine commandLine) throws FlinkException {
+		final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
+		final Configuration executorConfig = defaultCLI.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<StandaloneClusterId> clusterFactory = serviceLoader.getClusterClientFactory(executorConfig);
+		checkState(clusterFactory != null);
+
+		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = clusterFactory.createClusterDescriptor(executorConfig);
+		return clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig));
+	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientFactory.java
similarity index 56%
copy from flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
copy to flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientFactory.java
index 0bd7dc1..d067576 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientFactory.java
@@ -18,56 +18,39 @@
 
 package org.apache.flink.client.cli.util;
 
-import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
+import org.apache.flink.configuration.DeploymentOptions;
 
 import javax.annotation.Nullable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Dummy implementation of the {@link CustomCommandLine} for testing purposes.
+ * A {@link ClusterClientFactory} used for testing.
+ * @param <ClusterID> The type of the id of the cluster.
  */
-public class DummyCustomCommandLine<T> implements CustomCommandLine {
-	private final ClusterClient<T> clusterClient;
-
-	public DummyCustomCommandLine(ClusterClient<T> clusterClient) {
-		this.clusterClient = Preconditions.checkNotNull(clusterClient);
-	}
-
-	@Override
-	public boolean isActive(CommandLine commandLine) {
-		return true;
-	}
+public class DummyClusterClientFactory<ClusterID> implements ClusterClientFactory {
 
-	@Override
-	public String getId() {
-		return DummyCustomCommandLine.class.getSimpleName();
-	}
+	public static final String ID = "dummy-client-factory";
 
-	@Override
-	public void addRunOptions(Options baseOptions) {
-		// nothing to add
-	}
+	private final ClusterClient<ClusterID> clusterClient;
 
-	@Override
-	public void addGeneralOptions(Options baseOptions) {
-		// nothing to add
+	public DummyClusterClientFactory(ClusterClient<ClusterID> clusterClient) {
+		this.clusterClient = checkNotNull(clusterClient);
 	}
 
 	@Override
-	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) {
-		return new Configuration();
+	public boolean isCompatibleWith(Configuration configuration) {
+		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
 	}
 
 	@Override
-	public ClusterDescriptor<T> createClusterDescriptor(Configuration configuration) {
-		return new DummyClusterDescriptor<>(clusterClient);
+	public ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration) {
+		return new DummyClusterDescriptor<>(checkNotNull(clusterClient));
 	}
 
 	@Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java
similarity index 55%
copy from flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
copy to flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java
index 477293d..f58383c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientServiceLoader.java
@@ -18,22 +18,27 @@
 
 package org.apache.flink.client.cli.util;
 
-import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 
-import java.util.Collections;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Utility class for mocking the {@link ClusterClient} within a {@link CliFrontend}.
- *
- * <p>The mocking behavior can be defined in the constructor of the sub-class.
+ * A test {@link ClusterClientServiceLoader} that returns always a {@link DummyClusterClientFactory}.
  */
-public class MockedCliFrontend extends CliFrontend {
+public class DummyClusterClientServiceLoader<ClusterID> implements ClusterClientServiceLoader {
+
+	private final ClusterClient<ClusterID> clusterClient;
+
+	public DummyClusterClientServiceLoader(final ClusterClient<ClusterID> clusterClient) {
+		this.clusterClient = checkNotNull(clusterClient);
+	}
 
-	public MockedCliFrontend(ClusterClient clusterClient) throws Exception {
-		super(
-			new Configuration(),
-			Collections.singletonList(new DummyCustomCommandLine(clusterClient)));
+	@Override
+	public <C> ClusterClientFactory<C> getClusterClientFactory(final Configuration configuration) {
+		checkNotNull(configuration);
+		return new DummyClusterClientFactory<>(clusterClient);
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index 7620ae2..a7af09b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -22,7 +22,6 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -60,7 +59,7 @@ public class DummyClusterDescriptor<T> implements ClusterDescriptor<T> {
 	}
 
 	@Override
-	public void killCluster(T clusterId) throws FlinkException {
+	public void killCluster(T clusterId) {
 		throw new UnsupportedOperationException("Cannot terminate a dummy cluster.");
 	}
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
index 0bd7dc1..4a94656 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
@@ -19,26 +19,16 @@
 package org.apache.flink.client.cli.util;
 
 import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.configuration.DeploymentOptions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 
-import javax.annotation.Nullable;
-
 /**
  * Dummy implementation of the {@link CustomCommandLine} for testing purposes.
  */
-public class DummyCustomCommandLine<T> implements CustomCommandLine {
-	private final ClusterClient<T> clusterClient;
-
-	public DummyCustomCommandLine(ClusterClient<T> clusterClient) {
-		this.clusterClient = Preconditions.checkNotNull(clusterClient);
-	}
+public class DummyCustomCommandLine implements CustomCommandLine {
 
 	@Override
 	public boolean isActive(CommandLine commandLine) {
@@ -47,7 +37,7 @@ public class DummyCustomCommandLine<T> implements CustomCommandLine {
 
 	@Override
 	public String getId() {
-		return DummyCustomCommandLine.class.getSimpleName();
+		return DummyClusterClientFactory.ID;
 	}
 
 	@Override
@@ -62,22 +52,8 @@ public class DummyCustomCommandLine<T> implements CustomCommandLine {
 
 	@Override
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) {
-		return new Configuration();
-	}
-
-	@Override
-	public ClusterDescriptor<T> createClusterDescriptor(Configuration configuration) {
-		return new DummyClusterDescriptor<>(clusterClient);
-	}
-
-	@Override
-	@Nullable
-	public String getClusterId(Configuration configuration) {
-		return "dummy";
-	}
-
-	@Override
-	public ClusterSpecification getClusterSpecification(Configuration configuration) {
-		return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
+		final Configuration configuration = new Configuration();
+		configuration.setString(DeploymentOptions.TARGET, DummyClusterClientFactory.ID);
+		return configuration;
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
index 477293d..e76175e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
@@ -31,9 +31,10 @@ import java.util.Collections;
  */
 public class MockedCliFrontend extends CliFrontend {
 
-	public MockedCliFrontend(ClusterClient clusterClient) throws Exception {
+	public MockedCliFrontend(ClusterClient clusterClient) {
 		super(
 			new Configuration(),
-			Collections.singletonList(new DummyCustomCommandLine(clusterClient)));
+			new DummyClusterClientServiceLoader<>(clusterClient),
+			Collections.singletonList(new DummyCustomCommandLine()));
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 2ffa0b6..d11b046 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -22,7 +22,10 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.DefaultCLI;
-import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -554,10 +557,15 @@ public class RestClusterClientTest extends TestLogger {
 		final String[] args = {"-m", manualHostname + ':' + manualPort};
 
 		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+
+		final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
 		final Configuration executorConfig = defaultCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(executorConfig);
-		final RestClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(executorConfig));
+		final ClusterClientFactory<StandaloneClusterId> clusterFactory = serviceLoader.getClusterClientFactory(executorConfig);
+		checkState(clusterFactory != null);
+
+		final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = clusterFactory.createClusterDescriptor(executorConfig);
+		final RestClusterClient<?> clusterClient = (RestClusterClient<?>) clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig));
 
 		URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
 		assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname));
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 04f68ff..84d657e 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -22,7 +22,7 @@ import java.io._
 import java.net.URL
 
 import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
-import org.apache.flink.client.deployment.ClusterDescriptor
+import org.apache.flink.client.deployment.{ClusterDescriptor, DefaultClusterClientServiceLoader}
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration.{Configuration, GlobalConfiguration, JobManagerOptions}
 import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
@@ -258,11 +258,12 @@ object FlinkShell {
     val commandLine = CliFrontendParser.parse(commandLineOptions, args.toArray, true)
 
     val customCLI = frontend.getActiveCustomCommandLine(commandLine)
-    val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
-
-    val clusterDescriptor = customCLI.createClusterDescriptor(executorConfig)
+    val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine)
 
-    val clusterSpecification = customCLI.getClusterSpecification(executorConfig)
+    val serviceLoader = new DefaultClusterClientServiceLoader
+    val clientFactory = serviceLoader.getClusterClientFactory(executorConfig)
+    val clusterDescriptor = clientFactory.createClusterDescriptor(executorConfig)
+    val clusterSpecification = clientFactory.getClusterSpecification(executorConfig)
 
     val cluster = clusterDescriptor.deploySessionCluster(clusterSpecification)
 
@@ -291,11 +292,12 @@ object FlinkShell {
     val customCLI = frontend.getActiveCustomCommandLine(commandLine)
     val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-    val clusterDescriptor = customCLI
+    val serviceLoader = new DefaultClusterClientServiceLoader
+    val clientFactory = serviceLoader.getClusterClientFactory(executorConfig)
+    val clusterDescriptor = clientFactory
       .createClusterDescriptor(executorConfig)
       .asInstanceOf[ClusterDescriptor[Any]]
-
-    val clusterId = customCLI.getClusterId(executorConfig)
+    val clusterId = clientFactory.getClusterId(executorConfig)
 
     val cluster = clusterDescriptor.retrieve(clusterId)
 
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ff54d35..580b0be 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -28,8 +28,11 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
@@ -92,6 +95,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Context for executing table programs. This class caches everything that can be cached across
  * multiple queries as long as the session context does not change. This must be thread-safe as
@@ -111,13 +117,18 @@ public class ExecutionContext<T> {
 	private final Map<String, UserDefinedFunction> functions;
 	private final Configuration flinkConfig;
 	private final Configuration executorConfig;
-	private final CustomCommandLine<T> activeCommandLine;
+	private final ClusterClientFactory<T> clusterClientFactory;
 	private final RunOptions runOptions;
 	private final T clusterId;
 	private final ClusterSpecification clusterSpec;
 
 	public ExecutionContext(Environment defaultEnvironment, SessionContext sessionContext, List<URL> dependencies,
-			Configuration flinkConfig, Options commandLineOptions, List<CustomCommandLine<?>> availableCommandLines) throws FlinkException {
+				Configuration flinkConfig, Options commandLineOptions, List<CustomCommandLine> availableCommandLines) throws FlinkException {
+		this(defaultEnvironment, sessionContext, dependencies, flinkConfig, new DefaultClusterClientServiceLoader(), commandLineOptions, availableCommandLines);
+	}
+
+	public ExecutionContext(Environment defaultEnvironment, SessionContext sessionContext, List<URL> dependencies,
+			Configuration flinkConfig, ClusterClientServiceLoader clusterClientServiceLoader, Options commandLineOptions, List<CustomCommandLine> availableCommandLines) throws FlinkException {
 		this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable
 		this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment());
 		this.dependencies = dependencies;
@@ -154,12 +165,17 @@ public class ExecutionContext<T> {
 		});
 
 		// convert deployment options into command line options that describe a cluster
+		final ClusterClientServiceLoader serviceLoader = checkNotNull(clusterClientServiceLoader);
 		final CommandLine commandLine = createCommandLine(mergedEnv.getDeployment(), commandLineOptions);
-		activeCommandLine = findActiveCommandLine(availableCommandLines, commandLine);
+		final CustomCommandLine activeCommandLine = findActiveCommandLine(availableCommandLines, commandLine);
+
 		executorConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		clusterClientFactory = serviceLoader.getClusterClientFactory(executorConfig);
+		checkState(clusterClientFactory != null);
+
 		runOptions = createRunOptions(commandLine);
-		clusterId = activeCommandLine.getClusterId(executorConfig);
-		clusterSpec = activeCommandLine.getClusterSpecification(executorConfig);
+		clusterId = clusterClientFactory.getClusterId(executorConfig);
+		clusterSpec = clusterClientFactory.getClusterSpecification(executorConfig);
 	}
 
 	public SessionContext getSessionContext() {
@@ -183,7 +199,7 @@ public class ExecutionContext<T> {
 	}
 
 	public ClusterDescriptor<T> createClusterDescriptor() {
-		return activeCommandLine.createClusterDescriptor(executorConfig);
+		return clusterClientFactory.createClusterDescriptor(executorConfig);
 	}
 
 	public EnvironmentInstance createEnvironmentInstance() {
@@ -226,11 +242,10 @@ public class ExecutionContext<T> {
 		}
 	}
 
-	@SuppressWarnings("unchecked")
-	private static <T> CustomCommandLine<T> findActiveCommandLine(List<CustomCommandLine<?>> availableCommandLines, CommandLine commandLine) {
-		for (CustomCommandLine<?> cli : availableCommandLines) {
+	private static CustomCommandLine findActiveCommandLine(List<CustomCommandLine> availableCommandLines, CommandLine commandLine) {
+		for (CustomCommandLine cli : availableCommandLines) {
 			if (cli.isActive(commandLine)) {
-				return (CustomCommandLine<T>) cli;
+				return cli;
 			}
 		}
 		throw new SqlExecutionException("Could not find a matching deployment.");
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index aff7e7d..c21df6e 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -24,7 +24,9 @@ import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -73,6 +75,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Executor that performs the Flink communication locally. The calls are blocking depending on the
  * response time to the Flink cluster. Flink jobs are not blocking.
@@ -85,10 +89,11 @@ public class LocalExecutor implements Executor {
 
 	// deployment
 
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
 	private final Environment defaultEnvironment;
 	private final List<URL> dependencies;
 	private final Configuration flinkConfig;
-	private final List<CustomCommandLine<?>> commandLines;
+	private final List<CustomCommandLine> commandLines;
 	private final Options commandLineOptions;
 
 	// result maintenance
@@ -160,12 +165,14 @@ public class LocalExecutor implements Executor {
 
 		// prepare result store
 		resultStore = new ResultStore(flinkConfig);
+
+		clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
 	}
 
 	/**
 	 * Constructor for testing purposes.
 	 */
-	public LocalExecutor(Environment defaultEnvironment, List<URL> dependencies, Configuration flinkConfig, CustomCommandLine<?> commandLine) {
+	public LocalExecutor(Environment defaultEnvironment, List<URL> dependencies, Configuration flinkConfig, CustomCommandLine commandLine, ClusterClientServiceLoader clusterClientServiceLoader) {
 		this.defaultEnvironment = defaultEnvironment;
 		this.dependencies = dependencies;
 		this.flinkConfig = flinkConfig;
@@ -173,7 +180,8 @@ public class LocalExecutor implements Executor {
 		this.commandLineOptions = collectCommandLineOptions(commandLines);
 
 		// prepare result store
-		resultStore = new ResultStore(flinkConfig);
+		this.resultStore = new ResultStore(flinkConfig);
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
 	}
 
 	@Override
@@ -565,7 +573,7 @@ public class LocalExecutor implements Executor {
 		if (executionContext == null || !executionContext.getSessionContext().equals(session)) {
 			try {
 				executionContext = new ExecutionContext<>(defaultEnvironment, session, dependencies,
-					flinkConfig, commandLineOptions, commandLines);
+					flinkConfig, clusterClientServiceLoader, commandLineOptions, commandLines);
 			} catch (Throwable t) {
 				// catch everything such that a configuration does not crash the executor
 				throw new SqlExecutionException("Could not create execution context.", t);
@@ -617,9 +625,9 @@ public class LocalExecutor implements Executor {
 		return dependencies;
 	}
 
-	private static Options collectCommandLineOptions(List<CustomCommandLine<?>> commandLines) {
+	private static Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
 		final Options customOptions = new Options();
-		for (CustomCommandLine<?> customCommandLine : commandLines) {
+		for (CustomCommandLine customCommandLine : commandLines) {
 			customCommandLine.addRunOptions(customOptions);
 		}
 		return CliFrontendParser.mergeOptions(
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index aad2da1..64f5233 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
@@ -91,7 +92,8 @@ public class DependencyTest {
 			env,
 			Collections.singletonList(dependency),
 			new Configuration(),
-			new DefaultCLI(new Configuration()));
+			new DefaultCLI(new Configuration()),
+			new DefaultClusterClientServiceLoader());
 
 		final SessionContext session = new SessionContext("test-session", new Environment());
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 24799e0..294f876 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.util.DummyClusterClientServiceLoader;
 import org.apache.flink.client.cli.util.DummyCustomCommandLine;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
@@ -609,7 +610,8 @@ public class LocalExecutorITCase extends TestLogger {
 			EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
 			Collections.emptyList(),
 			clusterClient.getFlinkConfiguration(),
-			new DummyCustomCommandLine<T>(clusterClient));
+			new DummyCustomCommandLine(),
+			new DummyClusterClientServiceLoader(clusterClient));
 	}
 
 	private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception {
@@ -617,7 +619,8 @@ public class LocalExecutorITCase extends TestLogger {
 			EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
 			Collections.emptyList(),
 			clusterClient.getFlinkConfiguration(),
-			new DummyCustomCommandLine<T>(clusterClient));
+			new DummyCustomCommandLine(),
+			new DummyClusterClientServiceLoader(clusterClient));
 	}
 
 	private <T> LocalExecutor createModifiedExecutor(
@@ -626,7 +629,8 @@ public class LocalExecutorITCase extends TestLogger {
 			EnvironmentFileUtil.parseModified(yamlFile, replaceVars),
 			Collections.emptyList(),
 			clusterClient.getFlinkConfiguration(),
-			new DummyCustomCommandLine<T>(clusterClient));
+			new DummyCustomCommandLine(),
+			new DummyClusterClientServiceLoader(clusterClient));
 	}
 
 	private List<String> retrieveTableResult(
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index e16abc0..4e7ece3 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.cli.CliFrontendTestBase;
 import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -36,6 +38,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
 
 /**
@@ -67,36 +70,50 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 		configuration.setString(JobManagerOptions.ADDRESS, "localhost");
 		configuration.setInteger(JobManagerOptions.PORT, 8081);
 
-		FlinkYarnSessionCli yarnCLI = new TestingFlinkYarnSessionCli(
+		final ClusterClientServiceLoader testServiceLoader =
+			new TestingYarnClusterClientServiceLoader(new FakeClusterClient());
+
+		final FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
 			configuration,
+			testServiceLoader,
 			tmp.getRoot().getAbsolutePath(),
 			"y",
-			"yarn");
+			"yarn",
+			true);
 
 		// test detached mode
 		{
 			String[] parameters = {"-m", "yarn-cluster", "-p", "2", "-d", testJarPath};
-			verifyCliFrontend(yarnCLI, parameters, 2, true);
+			verifyCliFrontend(testServiceLoader, yarnCLI, parameters, 2, true);
 		}
 
 		// test detached mode
 		{
 			String[] parameters = {"-m", "yarn-cluster", "-p", "2", "-yd", testJarPath};
-			verifyCliFrontend(yarnCLI, parameters, 2, true);
+			verifyCliFrontend(testServiceLoader, yarnCLI, parameters, 2, true);
 		}
 	}
 
-	private static class TestingFlinkYarnSessionCli extends FlinkYarnSessionCli {
+	private static class TestingYarnClusterClientServiceLoader implements ClusterClientServiceLoader {
+
 		private final ClusterClient<ApplicationId> clusterClient;
 
-		private TestingFlinkYarnSessionCli(
-				Configuration configuration,
-				String configurationDirectory,
-				String shortPrefix,
-				String longPrefix) throws Exception {
-			super(configuration, configurationDirectory, shortPrefix, longPrefix);
+		TestingYarnClusterClientServiceLoader(ClusterClient<ApplicationId> clusterClient) {
+			this.clusterClient = checkNotNull(clusterClient);
+		}
 
-			this.clusterClient = new FakeClusterClient(configuration);
+		@Override
+		public ClusterClientFactory<ApplicationId> getClusterClientFactory(Configuration configuration) {
+			return new TestingYarnClusterClientFactory(clusterClient);
+		}
+	}
+
+	private static class TestingYarnClusterClientFactory extends YarnClusterClientFactory {
+
+		private final ClusterClient<ApplicationId> clusterClient;
+
+		TestingYarnClusterClientFactory(ClusterClient<ApplicationId> clusterClient) {
+			this.clusterClient = checkNotNull(clusterClient);
 		}
 
 		@Override
@@ -109,5 +126,4 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
 					clusterClient);
 		}
 	}
-
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 2aaa6fb..e301a74 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -21,10 +21,12 @@ package org.apache.flink.yarn.cli;
 import org.apache.flink.client.cli.AbstractCustomCommandLine;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -39,6 +41,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -67,13 +70,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.lang.reflect.UndeclaredThrowableException;
-import java.net.URLDecoder;
-import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -91,7 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Class handling the command line interface to the YARN session.
  */
-public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {
+public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
 
 	//------------------------------------ Constants   -------------------------
@@ -101,9 +100,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;
 
-	/** The id for the CommandLine interface. */
-	private static final String ID = "yarn-cluster";
-
 	// YARN-session related constants
 	private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
 	private static final String YARN_APPLICATION_ID_KEY = "applicationID";
@@ -158,12 +154,14 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private final String yarnPropertiesFileLocation;
 
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
 	public FlinkYarnSessionCli(
 			Configuration configuration,
 			String configurationDirectory,
 			String shortPrefix,
 			String longPrefix) throws FlinkException {
-		this(configuration, configurationDirectory, shortPrefix, longPrefix, true);
+		this(configuration, new DefaultClusterClientServiceLoader(), configurationDirectory, shortPrefix, longPrefix, true);
 	}
 
 	public FlinkYarnSessionCli(
@@ -172,7 +170,18 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 			String shortPrefix,
 			String longPrefix,
 			boolean acceptInteractiveInput) throws FlinkException {
+		this(configuration, new DefaultClusterClientServiceLoader(), configurationDirectory, shortPrefix, longPrefix, acceptInteractiveInput);
+	}
+
+	public FlinkYarnSessionCli(
+			Configuration configuration,
+			ClusterClientServiceLoader clusterClientServiceLoader,
+			String configurationDirectory,
+			String shortPrefix,
+			String longPrefix,
+			boolean acceptInteractiveInput) throws FlinkException {
 		super(configuration);
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
 		this.configurationDirectory = checkNotNull(configurationDirectory);
 		this.acceptInteractiveInput = acceptInteractiveInput;
 
@@ -256,77 +265,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 	}
 
-	private YarnClusterDescriptor createDescriptor(Configuration configuration) {
-		YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration);
-
-		final Path localJarPath = getLocalFlinkDistPath(configuration, yarnClusterDescriptor);
-		if (localJarPath != null) {
-			yarnClusterDescriptor.setLocalJarPath(localJarPath);
-		}
-
-		final List<File> shipFiles = decodeDirsToShipToCluster(configuration);
-		yarnClusterDescriptor.addShipFiles(shipFiles);
-
-		final String queueName = configuration.getString(YarnConfigOptions.APPLICATION_QUEUE);
-		if (queueName != null) {
-			yarnClusterDescriptor.setQueue(queueName);
-		}
-
-		final String dynamicPropertiesEncoded = configuration.getString(YarnConfigOptions.DYNAMIC_PROPERTIES);
-		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
-		final boolean detached = !configuration.getBoolean(DeploymentOptions.ATTACHED);
-		yarnClusterDescriptor.setDetachedMode(detached);
-
-		final String appName = configuration.getString(YarnConfigOptions.APPLICATION_NAME);
-		if (appName != null) {
-			yarnClusterDescriptor.setName(appName);
-		}
-
-		final String appType = configuration.getString(YarnConfigOptions.APPLICATION_TYPE);
-		if (appType != null) {
-			yarnClusterDescriptor.setApplicationType(appType);
-		}
-
-		final String zkNamespace = configuration.getString(HA_CLUSTER_ID);
-		if (zkNamespace != null) {
-			yarnClusterDescriptor.setZookeeperNamespace(zkNamespace);
-		}
-
-		final String nodeLabel = configuration.getString(YarnConfigOptions.NODE_LABEL);
-		if (nodeLabel != null) {
-			yarnClusterDescriptor.setNodeLabel(nodeLabel);
-		}
-
-		return yarnClusterDescriptor;
-	}
-
-	private Path getLocalFlinkDistPath(final Configuration configuration, final YarnClusterDescriptor yarnClusterDescriptor) {
-		final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
-		if (localJarPath != null) {
-			return new Path(localJarPath);
-		}
-
-		LOG.info("No path for the flink jar passed. Using the location of " + yarnClusterDescriptor.getClass() + " to locate the jar");
-
-		// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
-		final String decodedPath = getDecodedJarPath(yarnClusterDescriptor);
-		return decodedPath.endsWith(".jar")
-				? new Path(new File(decodedPath).toURI())
-				: null;
-	}
-
-	private String getDecodedJarPath(final YarnClusterDescriptor yarnClusterDescriptor) {
-		final String encodedJarPath = yarnClusterDescriptor
-				.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-		try {
-			return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
-		} catch (UnsupportedEncodingException e) {
-			throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
-					" Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
-		}
-	}
-
 	private Path getLocalFlinkDistPathFromCmd(final CommandLine cmd) {
 		final String flinkJarOptionName = flinkJar.getOpt();
 		if (!cmd.hasOption(flinkJarOptionName)) {
@@ -340,11 +278,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		return new Path(userPath);
 	}
 
-	private List<File> decodeDirsToShipToCluster(final Configuration configuration) {
-		checkNotNull(configuration);
-		return YarnConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.SHIP_DIRECTORIES, File::new);
-	}
-
 	private void encodeDirsToShipToCluster(final Configuration configuration, final CommandLine cmd) {
 		checkNotNull(cmd);
 		checkNotNull(configuration);
@@ -365,22 +298,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 	}
 
-	private ClusterSpecification createClusterSpecification(Configuration configuration) {
-		// JobManager Memory
-		final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
-
-		// Task Managers memory
-		final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
-
-		int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-
-		return new ClusterSpecification.ClusterSpecificationBuilder()
-			.setMasterMemoryMB(jobManagerMemoryMB)
-			.setTaskManagerMemoryMB(taskManagerMemoryMB)
-			.setSlotsPerTaskManager(slotsPerTaskManager)
-			.createClusterSpecification();
-	}
-
 	private void printUsage() {
 		System.out.println("Usage:");
 		HelpFormatter formatter = new HelpFormatter();
@@ -397,14 +314,14 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	@Override
 	public boolean isActive(CommandLine commandLine) {
 		String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
-		boolean yarnJobManager = ID.equals(jobManagerOption);
+		boolean yarnJobManager = YarnClusterClientFactory.ID.equals(jobManagerOption);
 		boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
 		return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
 	}
 
 	@Override
 	public String getId() {
-		return ID;
+		return YarnClusterClientFactory.ID;
 	}
 
 	@Override
@@ -423,23 +340,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	}
 
 	@Override
-	public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
-		return createDescriptor(configuration);
-	}
-
-	@Override
-	@Nullable
-	public ApplicationId getClusterId(Configuration configuration) {
-		final String clusterId = configuration.getString(YarnConfigOptions.APPLICATION_ID);
-		return clusterId != null ? ConverterUtils.toApplicationId(clusterId) : null;
-	}
-
-	@Override
-	public ClusterSpecification getClusterSpecification(Configuration configuration) {
-		return createClusterSpecification(configuration);
-	}
-
-	@Override
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		// we ignore the addressOption because it can only contain "yarn-cluster"
 		final Configuration effectiveConfiguration = new Configuration(configuration);
@@ -628,7 +528,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 
 		final Configuration configuration = applyCommandLineOptionsToConfiguration(cmd);
-		final YarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(configuration);
+		final ClusterClientFactory<ApplicationId> yarnClusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+		final YarnClusterDescriptor yarnClusterDescriptor = (YarnClusterDescriptor) yarnClusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
 			// Query cluster for metrics
@@ -645,7 +547,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 					clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId);
 				} else {
-					final ClusterSpecification clusterSpecification = getClusterSpecification(configuration);
+					final ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(configuration);
 
 					clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index ae566b4..a357126 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -86,7 +89,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 				"-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar", "-D", "security.ssl.internal.key-password=changeit"});
 
 		Configuration executorConfig = cli.applyCommandLineOptionsToConfiguration(cmd);
-		YarnClusterDescriptor flinkYarnDescriptor = cli.createClusterDescriptor(executorConfig);
+		ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		YarnClusterDescriptor flinkYarnDescriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
 
 		Assert.assertNotNull(flinkYarnDescriptor);
 
@@ -110,10 +114,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
-		final Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
-		final ClusterSpecification clusterSpecification = yarnCLI.getClusterSpecification(executorConfig);
+		final Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
 		assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
@@ -132,9 +136,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
-		final Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
+		final Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final YarnClusterDescriptor descriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
 		assertTrue(descriptor.isDetachedMode());
@@ -153,9 +158,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
-		Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
+		Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
+		ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		YarnClusterDescriptor descriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
 
 		assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
 	}
@@ -173,9 +179,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
-		Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(executorConfig);
+		Configuration executorConfig = yarnCLI.applyCommandLineOptionsToConfiguration(commandLine);
+		ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		YarnClusterDescriptor descriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
 
 		assertEquals(nodeLabelCliInput, descriptor.getNodeLabel());
 	}
@@ -198,9 +205,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ApplicationId clusterId = clientFactory.getClusterId(executorConfig);
 
 		assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
 	}
@@ -234,9 +242,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ApplicationId clusterId = clientFactory.getClusterId(executorConfig);
 
 		assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
 	}
@@ -251,9 +260,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final YarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
 
 		final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
 
@@ -274,11 +284,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace}, true);
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
 
-		final YarnClusterDescriptor clusterDescriptor = flinkYarnSessionCli.createClusterDescriptor(executorConfig);
+		final YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
 
 		final Configuration clusterDescriptorConfiguration = clusterDescriptor.getFlinkConfiguration();
-
 		final String clusterId = clusterDescriptorConfiguration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 		assertEquals(overrideZkNamespace, clusterId);
 	}
@@ -296,9 +306,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"y",
 			"yarn");
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
+
 		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ApplicationId clusterId = clientFactory.getClusterId(executorConfig);
 
-		final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(executorConfig);
 		assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
 	}
 
@@ -325,9 +337,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
+		Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(jobManagerMemory));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(taskManagerMemory));
@@ -356,9 +369,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
+		Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(jobManagerMemory));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(taskManagerMemory));
@@ -378,9 +392,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
@@ -398,9 +413,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"y",
 			"yarn");
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
@@ -418,9 +434,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"y",
 			"yarn");
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
@@ -442,9 +459,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096));
@@ -462,9 +480,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig);
 
 		assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
@@ -480,19 +499,24 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			"yarn");
 
 		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
-		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
 
-		YarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(executorConfig);
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		YarnClusterDescriptor flinkYarnDescriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
 
 		assertEquals(2, flinkYarnDescriptor.getShipFiles().size());
 
 	}
 
-
 	///////////
 	// Utils //
 	///////////
 
+	private ClusterClientFactory<ApplicationId> getClusterClientFactory(final Configuration executorConfig) {
+		final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
+		return clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+	}
+
 	private File writeYarnPropertiesFile(String contents) throws IOException {
 		File tmpFolder = tmp.newFolder();
 		String currentUser = System.getProperty("user.name");


[flink] 02/08: [FLINK-14501] Change the log config file discovery

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0a7123569efbae6767a31d23a5622deb037cf6b
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 22:24:27 2019 +0200

    [FLINK-14501] Change the log config file discovery
---
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 12 +++----
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 39 ++++++++++------------
 .../yarn/configuration/YarnConfigOptions.java      |  2 +-
 3 files changed, 23 insertions(+), 30 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 68eea73..c2045f9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -46,7 +46,6 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
-import org.apache.flink.yarn.cli.YarnConfigUtils;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
@@ -711,10 +710,9 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 			systemShipFiles.add(file.getAbsoluteFile());
 		}
 
-		final List<File> logConfigFiles = YarnConfigUtils
-				.decodeListFromConfig(configuration, YarnConfigOptions.APPLICATION_LOG_CONFIG_FILES, File::new);
-		if (logConfigFiles != null) {
-			systemShipFiles.addAll(logConfigFiles);
+		final String logConfigFilePath = configuration.getString(YarnConfigOptions.APPLICATION_LOG_CONFIG_FILE);
+		if (logConfigFilePath != null) {
+			systemShipFiles.add(new File(logConfigFilePath));
 		}
 
 		addEnvironmentFoldersToShipFiles(systemShipFiles);
@@ -934,8 +932,8 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 
 		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
 				yarnClusterEntrypoint,
-				containsFileWithEnding(systemShipFiles, CONFIG_FILE_LOGBACK_NAME),
-				containsFileWithEnding(systemShipFiles, CONFIG_FILE_LOG4J_NAME),
+				logConfigFilePath != null && logConfigFilePath.endsWith(CONFIG_FILE_LOGBACK_NAME),
+				logConfigFilePath != null && logConfigFilePath.endsWith(CONFIG_FILE_LOG4J_NAME),
 				hasKrb5,
 				clusterSpecification.getMasterMemoryMB());
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 61e3d58..a831ab2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -73,11 +73,10 @@ import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -541,33 +540,29 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 			configuration.setString(YarnConfigOptions.NODE_LABEL, nodeLabelValue);
 		}
 
-		discoverAndEncodeLogConfigFiles(configuration);
+		discoverLogConfigFile().ifPresent(
+				file -> configuration.setString(YarnConfigOptions.APPLICATION_LOG_CONFIG_FILE, file.getPath())
+		);
 	}
 
-	private void discoverAndEncodeLogConfigFiles(final Configuration configuration) {
-		final Set<File> logFiles = discoverLogConfigFiles();
-		YarnConfigUtils.encodeListToConfig(configuration, YarnConfigOptions.APPLICATION_LOG_CONFIG_FILES, logFiles, File::getPath);
-	}
-
-	private Set<File> discoverLogConfigFiles() {
-		final Set<File> logConfigFiles = new HashSet<>();
+	private Optional<File> discoverLogConfigFile() {
+		Optional<File> logConfigFile = Optional.empty();
 
-		File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
-		final boolean hasLogback = logbackFile.exists();
-		if (hasLogback) {
-			logConfigFiles.add(logbackFile);
+		final File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
+		if (log4jFile.exists()) {
+			logConfigFile = Optional.of(log4jFile);
 		}
 
-		File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
-		final boolean hasLog4j = log4jFile.exists();
-		if (hasLog4j) {
-			logConfigFiles.add(log4jFile);
-			if (hasLogback) {
-				LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
-						"Logback configuration files. Please delete or rename one of them.");
+		final File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
+		if (logbackFile.exists()) {
+			if (logConfigFile.isPresent()) {
+				LOG.warn("The configuration directory ('" + configurationDirectory + "') already contains a LOG4J config file." +
+						"If you want to use logback, then please delete or rename the log configuration file.");
+			} else {
+				logConfigFile = Optional.of(logbackFile);
 			}
 		}
-		return logConfigFiles;
+		return logConfigFile;
 	}
 
 	private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index d8611cc..8ef7293 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -206,7 +206,7 @@ public class YarnConfigOptions {
 
 	// ----------------------- YARN CLI OPTIONS ------------------------------------
 
-	public static final ConfigOption<String> APPLICATION_LOG_CONFIG_FILES =
+	public static final ConfigOption<String> APPLICATION_LOG_CONFIG_FILE =
 			key("yarn.log-config-file")
 				.noDefaultValue()
 				.withDescription("The location of the log config file, e.g. the path to your log4j.properties for log4j.");


[flink] 07/08: [hotfix] Do not expose Yarn dynamic options

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d457becca501e1bc70bc0e569754ea87bd8c240a
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Oct 25 11:46:26 2019 +0200

    [hotfix] Do not expose Yarn dynamic options
---
 .../generated/yarn_config_configuration.html       |  5 ---
 .../flink/yarn/YarnClusterClientFactory.java       |  3 +-
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  3 +-
 .../yarn/configuration/YarnConfigOptions.java      |  5 ---
 .../configuration/YarnConfigOptionsInternal.java   | 37 ++++++++++++++++++++++
 5 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 6cf3945..98655bf 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -8,11 +8,6 @@
     </thead>
     <tbody>
         <tr>
-            <td><h5>$internal.yarn.dynamic-properties</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>**DO NOT USE** Specify YARN dynamic properties.</td>
-        </tr>
-        <tr>
             <td><h5>yarn.application-attempt-failures-validity-interval</h5></td>
             <td style="word-wrap: break-word;">10000</td>
             <td>Time window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See <a href="https://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/">here</a> for more information.</td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index a0d9ab2..a954be8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.yarn.cli.YarnConfigUtils;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -78,7 +79,7 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
 				.ifPresent(yarnClusterDescriptor::addShipFiles);
 
 		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_QUEUE, yarnClusterDescriptor::setQueue);
-		handleConfigOption(configuration, YarnConfigOptions.DYNAMIC_PROPERTIES, yarnClusterDescriptor::setDynamicPropertiesEncoded);
+		handleConfigOption(configuration, YarnConfigOptionsInternal.DYNAMIC_PROPERTIES, yarnClusterDescriptor::setDynamicPropertiesEncoded);
 		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_NAME, yarnClusterDescriptor::setName);
 		handleConfigOption(configuration, YarnConfigOptions.APPLICATION_TYPE, yarnClusterDescriptor::setApplicationType);
 		handleConfigOption(configuration, YarnConfigOptions.NODE_LABEL, yarnClusterDescriptor::setNodeLabel);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index e301a74..0efa610 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -44,6 +44,7 @@ import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -413,7 +414,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		}
 
 		final String dynamicPropertiesEncoded = encodeDynamicProperties(commandLine);
-		configuration.setString(YarnConfigOptions.DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		configuration.setString(YarnConfigOptionsInternal.DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
 
 		final boolean detached = commandLine.hasOption(YARN_DETACHED_OPTION.getOpt()) || commandLine.hasOption(DETACHED_OPTION.getOpt());
 		configuration.setBoolean(DeploymentOptions.ATTACHED, !detached);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 8ef7293..021552d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -211,11 +211,6 @@ public class YarnConfigOptions {
 				.noDefaultValue()
 				.withDescription("The location of the log config file, e.g. the path to your log4j.properties for log4j.");
 
-	public static final ConfigOption<String> DYNAMIC_PROPERTIES =
-			key("$internal.yarn.dynamic-properties")
-				.noDefaultValue()
-				.withDescription("**DO NOT USE** Specify YARN dynamic properties.");
-
 	public static final ConfigOption<String> SHIP_DIRECTORIES =
 			key("yarn.ship-directories")
 				.noDefaultValue()
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java
new file mode 100644
index 0000000..4910809
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Yarn configuration options that are not meant to be set by the user.
+ */
+@Internal
+public class YarnConfigOptionsInternal {
+
+	public static final ConfigOption<String> DYNAMIC_PROPERTIES =
+			key("$internal.yarn.dynamic-properties")
+					.noDefaultValue()
+					.withDescription("**DO NOT USE** Specify YARN dynamic properties.");
+
+}


[flink] 08/08: [FLINK-14377] Parse the ProgramOptions to a Configuration.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5dc8eae5c2b21d57bec0aef9e6b79d7af9b8eff
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Oct 10 11:39:51 2019 +0200

    [FLINK-14377] Parse the ProgramOptions to a Configuration.
---
 .../generated/deployment_configuration.html        |   5 +
 .../generated/pipeline_configuration.html          |  21 ++++
 ...on.html => savepoint_config_configuration.html} |   8 +-
 docs/ops/config.md                                 |   5 +
 docs/ops/config.zh.md                              |   5 +
 .../org/apache/flink/client/cli/CliFrontend.java   |  65 ++++++-----
 .../apache/flink/client/cli/CliFrontendParser.java |  13 +--
 .../flink/client/cli/ExecutionConfigAccessor.java  | 127 +++++++++++++++++++++
 .../client/cli/ExecutionConfigurationUtils.java    |  96 ++++++++++++++++
 .../org/apache/flink/client/cli/InfoOptions.java   |  31 -----
 .../apache/flink/client/cli/ProgramOptions.java    |   7 +-
 .../org/apache/flink/client/cli/RunOptions.java    |  31 -----
 .../client/cli/CliFrontendPackageProgramTest.java  |  93 +++++++++------
 .../flink/client/cli/CliFrontendRunTest.java       |  32 ++++--
 .../flink/configuration/DeploymentOptions.java     |   6 +
 ...DeploymentOptions.java => PipelineOptions.java} |  27 +++--
 .../configuration/ConfigOptionsDocGenerator.java   |   1 +
 .../runtime/jobgraph/SavepointConfigOptions.java   |  27 +++--
 .../runtime/jobgraph/SavepointRestoreSettings.java |  17 +++
 .../client/gateway/local/ExecutionContext.java     |  16 +--
 20 files changed, 449 insertions(+), 184 deletions(-)

diff --git a/docs/_includes/generated/deployment_configuration.html b/docs/_includes/generated/deployment_configuration.html
index 2c652e4..7f7bdcc 100644
--- a/docs/_includes/generated/deployment_configuration.html
+++ b/docs/_includes/generated/deployment_configuration.html
@@ -13,6 +13,11 @@
             <td>Specifies if the pipeline is submitted in attached or detached mode.</td>
         </tr>
         <tr>
+            <td><h5>execution.shutdown-on-attached-exit</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.</td>
+        </tr>
+        <tr>
             <td><h5>execution.target</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The deployment target for the execution, e.g. "local" for local execution.</td>
diff --git a/docs/_includes/generated/pipeline_configuration.html b/docs/_includes/generated/pipeline_configuration.html
new file mode 100644
index 0000000..36cf711
--- /dev/null
+++ b/docs/_includes/generated/pipeline_configuration.html
@@ -0,0 +1,21 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>pipeline.classpaths</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>A semicolon-separated list of the classpaths to package with the job jars to be sent to the cluster. These have to be valid URLs.</td>
+        </tr>
+        <tr>
+            <td><h5>pipeline.jars</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>A semicolon-separated list of the jars to package with the job jars to be sent to the cluster. These have to be valid paths.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/_includes/generated/deployment_configuration.html b/docs/_includes/generated/savepoint_config_configuration.html
similarity index 54%
copy from docs/_includes/generated/deployment_configuration.html
copy to docs/_includes/generated/savepoint_config_configuration.html
index 2c652e4..10318f4 100644
--- a/docs/_includes/generated/deployment_configuration.html
+++ b/docs/_includes/generated/savepoint_config_configuration.html
@@ -8,14 +8,14 @@
     </thead>
     <tbody>
         <tr>
-            <td><h5>execution.attached</h5></td>
+            <td><h5>execution.savepoint.ignore-unclaimed-state</h5></td>
             <td style="word-wrap: break-word;">false</td>
-            <td>Specifies if the pipeline is submitted in attached or detached mode.</td>
+            <td>Allow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered.</td>
         </tr>
         <tr>
-            <td><h5>execution.target</h5></td>
+            <td><h5>execution.savepoint.path</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
-            <td>The deployment target for the execution, e.g. "local" for local execution.</td>
+            <td>Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).</td>
         </tr>
     </tbody>
 </table>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 6a8f4ce..9390dbd 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -67,6 +67,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
 ### Execution
 
 {% include generated/deployment_configuration.html %}
+{% include generated/savepoint_config_configuration.html %}
 
 ### JobManager
 
@@ -188,6 +189,10 @@ The configuration keys in this section are independent of the used resource mana
 
 {% include generated/environment_configuration.html %}
 
+### Pipeline
+
+{% include generated/pipeline_configuration.html %}
+
 ### Checkpointing
 
 {% include generated/checkpointing_configuration.html %}
diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md
index 1274f6d..d3faea0 100644
--- a/docs/ops/config.zh.md
+++ b/docs/ops/config.zh.md
@@ -67,6 +67,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
 ### Execution
 
 {% include generated/deployment_configuration.html %}
+{% include generated/savepoint_config_configuration.html %}
 
 ### JobManager
 
@@ -188,6 +189,10 @@ The configuration keys in this section are independent of the used resource mana
 
 {% include generated/environment_configuration.html %}
 
+### Pipeline
+
+{% include generated/pipeline_configuration.html %}
+
 ### Checkpointing
 
 {% include generated/checkpointing_configuration.html %}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index e1a838f..b3a7860 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -56,7 +56,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
@@ -84,6 +83,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -186,17 +186,18 @@ public class CliFrontend {
 
 		final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
 
-		final RunOptions runOptions = new RunOptions(commandLine);
+		final ProgramOptions programOptions = new ProgramOptions(commandLine);
+		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
 
 		// evaluate help flag
-		if (runOptions.isPrintHelp()) {
+		if (commandLine.hasOption(HELP_OPTION.getOpt())) {
 			CliFrontendParser.printHelpForRun(customCommandLines);
 			return;
 		}
 
-		if (!runOptions.isPython()) {
+		if (!programOptions.isPython()) {
 			// Java program should be specified a JAR file
-			if (runOptions.getJarFilePath() == null) {
+			if (executionParameters.getJarFilePath() == null) {
 				throw new CliArgsException("Java program should be specified a JAR file.");
 			}
 		}
@@ -204,7 +205,7 @@ public class CliFrontend {
 		final PackagedProgram program;
 		try {
 			LOG.info("Building program from JAR file");
-			program = buildProgram(runOptions);
+			program = buildProgram(programOptions, executionParameters);
 		}
 		catch (FileNotFoundException e) {
 			throw new CliArgsException("Could not build the program from JAR file.", e);
@@ -212,8 +213,9 @@ public class CliFrontend {
 
 		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
 		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		final Configuration executionConfig = executionParameters.getConfiguration();
 		try {
-			runProgram(executorConfig, runOptions, program);
+			runProgram(executorConfig, executionConfig, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
@@ -221,7 +223,7 @@ public class CliFrontend {
 
 	private <T> void runProgram(
 			Configuration executorConfig,
-			RunOptions runOptions,
+			Configuration executionConfig,
 			PackagedProgram program) throws ProgramInvocationException, FlinkException {
 
 		final ClusterClientFactory<T> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
@@ -231,12 +233,12 @@ public class CliFrontend {
 
 		try {
 			final T clusterId = clusterClientFactory.getClusterId(executorConfig);
-
+			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig);
 			final ClusterClient<T> client;
 
 			// directly deploy the job if the cluster is started in job mode and detached
-			if (clusterId == null && runOptions.getDetachedMode()) {
-				int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
+			if (clusterId == null && executionParameters.getDetachedMode()) {
+				int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
@@ -244,7 +246,7 @@ public class CliFrontend {
 				client = clusterDescriptor.deployJobCluster(
 					clusterSpecification,
 					jobGraph,
-					runOptions.getDetachedMode());
+					executionParameters.getDetachedMode());
 
 				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
 
@@ -265,7 +267,7 @@ public class CliFrontend {
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
+					if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
 						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
 					} else {
 						shutdownHook = null;
@@ -273,11 +275,9 @@ public class CliFrontend {
 				}
 
 				try {
-					client.setDetached(runOptions.getDetachedMode());
-
-					LOG.debug("{}", runOptions.getSavepointRestoreSettings());
+					client.setDetached(executionParameters.getDetachedMode());
 
-					int userParallelism = runOptions.getParallelism();
+					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 					if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
 						userParallelism = defaultParallelism;
@@ -325,25 +325,26 @@ public class CliFrontend {
 
 		final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true);
 
-		InfoOptions infoOptions = new InfoOptions(commandLine);
+		final ProgramOptions programOptions = new ProgramOptions(commandLine);
+		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions);
 
 		// evaluate help flag
-		if (infoOptions.isPrintHelp()) {
+		if (commandLine.hasOption(HELP_OPTION.getOpt())) {
 			CliFrontendParser.printHelpForInfo();
 			return;
 		}
 
-		if (infoOptions.getJarFilePath() == null) {
+		if (programOptions.getJarFilePath() == null) {
 			throw new CliArgsException("The program JAR file was not specified.");
 		}
 
 		// -------- build the packaged program -------------
 
 		LOG.info("Building program from JAR file");
-		final PackagedProgram program = buildProgram(infoOptions);
+		final PackagedProgram program = buildProgram(programOptions, executionParameters);
 
 		try {
-			int parallelism = infoOptions.getParallelism();
+			int parallelism = programOptions.getParallelism();
 			if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
 				parallelism = defaultParallelism;
 			}
@@ -724,7 +725,7 @@ public class CliFrontend {
 	 * Sends a SavepointDisposalRequest to the job manager.
 	 */
 	private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath) throws FlinkException {
-		Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " +
+		checkNotNull(savepointPath, "Missing required argument: savepoint path. " +
 			"Usage: bin/flink savepoint -d <savepoint-path>");
 
 		logAndSysout("Disposing savepoint '" + savepointPath + "'.");
@@ -771,15 +772,17 @@ public class CliFrontend {
 	 *
 	 * @return A PackagedProgram (upon success)
 	 */
-	PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
-		String[] programArgs = options.getProgramArgs();
-		String jarFilePath = options.getJarFilePath();
-		List<URL> classpaths = options.getClasspaths();
+	PackagedProgram buildProgram(
+			final ProgramOptions runOptions,
+			final ExecutionConfigAccessor executionParameters) throws FileNotFoundException, ProgramInvocationException {
+		String[] programArgs = runOptions.getProgramArgs();
+		String jarFilePath = executionParameters.getJarFilePath();
+		List<URL> classpaths = executionParameters.getClasspaths();
 
 		// Get assembler class
-		String entryPointClass = options.getEntryPointClassName();
+		String entryPointClass = runOptions.getEntryPointClassName();
 		File jarFile = null;
-		if (options.isPython()) {
+		if (runOptions.isPython()) {
 			// If the job is specified a jar file
 			if (jarFilePath != null) {
 				jarFile = getJarFile(jarFilePath);
@@ -801,7 +804,7 @@ public class CliFrontend {
 				new PackagedProgram(jarFile, classpaths, programArgs) :
 				new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
 
-		program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
+		program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());
 
 		return program;
 	}
@@ -1192,7 +1195,7 @@ public class CliFrontend {
 		// construct class types from the parameters
 		Class<?>[] types = new Class<?>[params.length];
 		for (int i = 0; i < params.length; i++) {
-			Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
+			checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
 			types[i] = params[i].getClass();
 		}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 97ba33e..c0295c7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -192,7 +192,7 @@ public class CliFrontendParser {
 		PYMODULE_OPTION.setArgName("pyModule");
 	}
 
-	private static final Options RUN_OPTIONS = getRunCommandOptions();
+	static final Options RUN_OPTIONS = getRunCommandOptions();
 
 	private static Options buildGeneralOptions(Options options) {
 		options.addOption(HELP_OPTION);
@@ -445,17 +445,6 @@ public class CliFrontendParser {
 	//  Line Parsing
 	// --------------------------------------------------------------------------------------------
 
-	public static RunOptions parseRunCommand(String[] args) throws CliArgsException {
-		try {
-			DefaultParser parser = new DefaultParser();
-			CommandLine line = parser.parse(RUN_OPTIONS, args, true);
-			return new RunOptions(line);
-		}
-		catch (ParseException e) {
-			throw new CliArgsException(e.getMessage());
-		}
-	}
-
 	public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) throws CliArgsException {
 		final DefaultParser parser = new DefaultParser();
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
new file mode 100644
index 0000000..194a352
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Accessor that exposes config settings that are relevant for execution from an underlying {@link Configuration}.
+ */
+@Internal
+public class ExecutionConfigAccessor {
+
+	private final Configuration configuration;
+
+	private ExecutionConfigAccessor(final Configuration configuration) {
+		this.configuration = checkNotNull(configuration);
+	}
+
+	/**
+	 * Creates an {@link ExecutionConfigAccessor} based on the provided {@link Configuration}.
+	 */
+	public static ExecutionConfigAccessor fromConfiguration(final Configuration configuration) {
+		return new ExecutionConfigAccessor(checkNotNull(configuration));
+	}
+
+	/**
+	 * Creates an {@link ExecutionConfigAccessor} based on the provided {@link ProgramOptions} as provided by the user through the CLI.
+	 */
+	public static ExecutionConfigAccessor fromProgramOptions(final ProgramOptions options) {
+		checkNotNull(options);
+
+		final Configuration configuration = new Configuration();
+		configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, options.getParallelism());
+		configuration.setBoolean(DeploymentOptions.ATTACHED, !options.getDetachedMode());
+		configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, options.isShutdownOnAttachedExit());
+
+		parseClasspathURLsToConfig(options.getClasspaths(), configuration);
+		parseJarURLToConfig(options.getJarFilePath(), configuration);
+
+		SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration);
+
+		return new ExecutionConfigAccessor(configuration);
+	}
+
+	private static void parseClasspathURLsToConfig(final List<URL> classpathURLs, final Configuration configuration) {
+		ExecutionConfigurationUtils.urlListToConfig(
+				classpathURLs,
+				PipelineOptions.LIST_SEPARATOR,
+				configuration,
+				PipelineOptions.CLASSPATHS);
+	}
+
+	private static void parseJarURLToConfig(final String jarFile, final Configuration configuration) {
+		if (jarFile == null) {
+			return;
+		}
+
+		try {
+			final URL jarUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
+			final List<URL> jarUrlSingleton = Collections.singletonList(jarUrl);
+			ExecutionConfigurationUtils.urlListToConfig(jarUrlSingleton, PipelineOptions.LIST_SEPARATOR, configuration, PipelineOptions.JARS);
+		} catch (MalformedURLException e) {
+			throw new IllegalArgumentException("JAR file path invalid", e);
+		}
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	public String getJarFilePath() {
+		final List<URL> jarURL = ExecutionConfigurationUtils.urlListFromConfig(configuration, PipelineOptions.JARS, PipelineOptions.LIST_SEPARATOR);
+		if (jarURL != null && !jarURL.isEmpty()) {
+			return jarURL.get(0).getPath();
+		}
+		return null;
+	}
+
+	public List<URL> getClasspaths() {
+		return ExecutionConfigurationUtils.urlListFromConfig(configuration, PipelineOptions.CLASSPATHS, PipelineOptions.LIST_SEPARATOR);
+	}
+
+	public int getParallelism() {
+		return  configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+	}
+
+	public boolean getDetachedMode() {
+		return !configuration.getBoolean(DeploymentOptions.ATTACHED);
+	}
+
+	public SavepointRestoreSettings getSavepointRestoreSettings() {
+		return SavepointRestoreSettings.fromConfiguration(configuration);
+	}
+
+	public boolean isShutdownOnAttachedExit() {
+		return configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED);
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigurationUtils.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigurationUtils.java
new file mode 100644
index 0000000..099984a
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigurationUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities for parsing parameters in the {@link ExecutionConfigAccessor}.
+ */
+@Internal
+class ExecutionConfigurationUtils {
+
+	/**
+	 * Parses a list of {@link URL URLs} to a string and puts it in the provided {@code configuration} as the value of the provided {@code option}.
+	 * @param urls the list of URLs to parse
+	 * @param delimiter the delimiter to be used to separate the members of the list in the string
+	 * @param configuration the configuration object to put the list
+	 * @param option the {@link ConfigOption option} to serve as the key for the list in the configuration
+	 * @return the produced string to be put in the configuration.
+	 */
+	static String urlListToConfig(
+			final List<URL> urls,
+			final String delimiter,
+			final Configuration configuration,
+			final ConfigOption<String> option) {
+
+		checkNotNull(urls);
+		checkNotNull(delimiter);
+		checkNotNull(configuration);
+		checkNotNull(option);
+
+		final String str = urls.stream().map(URL::toString).collect(Collectors.joining(delimiter));
+		configuration.setString(option, str);
+		return str;
+	}
+
+	/**
+	 * Parses a string into a list of {@link URL URLs} from a given {@link Configuration}.
+	 * @param configuration the configuration containing the string-ified list of URLs
+	 * @param option the {@link ConfigOption option} whose value is the list of URLs
+	 * @param delimiter the delimiter used to separate the members of the list in the string
+	 * @return the produced list of URLs.
+	 */
+	static List<URL> urlListFromConfig(
+			final Configuration configuration,
+			final ConfigOption<String> option,
+			final String delimiter) {
+
+		checkNotNull(configuration);
+		checkNotNull(option);
+		checkNotNull(delimiter);
+
+		final String urls = configuration.getString(option);
+		if (urls == null || urls.length() == 0) {
+			return Collections.emptyList();
+		}
+
+		try (final Stream<String> urlTokens = Arrays.stream(urls.split(delimiter))) {
+			return urlTokens.map(str -> {
+				try {
+					return new URL(str);
+				} catch (MalformedURLException e) {
+					throw new IllegalArgumentException("Invalid URL", e);
+				}
+			}).collect(Collectors.toList());
+		}
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
deleted file mode 100644
index 559ce94..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.cli;
-
-import org.apache.commons.cli.CommandLine;
-
-/**
- * Command line options for the INFO command.
- */
-public class InfoOptions extends ProgramOptions {
-
-	public InfoOptions(CommandLine line) throws CliArgsException {
-		super(line);
-	}
-}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index e3a9907..be3b811 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -44,7 +44,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION
 /**
  * Base class for command line options that refer to a JAR file program.
  */
-public abstract class ProgramOptions extends CommandLineOptions {
+public class ProgramOptions extends CommandLineOptions {
 
 	private final String jarFilePath;
 
@@ -67,7 +67,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 	 */
 	private final boolean isPython;
 
-	protected ProgramOptions(CommandLine line) throws CliArgsException {
+	public ProgramOptions(CommandLine line) throws CliArgsException {
 		super(line);
 
 		String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
@@ -168,8 +168,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 			parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 		}
 
-		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
-			YARN_DETACHED_OPTION.getOpt());
+		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(YARN_DETACHED_OPTION.getOpt());
 		shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
 
 		this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
deleted file mode 100644
index 08a15d3..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.cli;
-
-import org.apache.commons.cli.CommandLine;
-
-/**
- * Command line options for the RUN command.
- */
-public class RunOptions extends ProgramOptions {
-
-	public RunOptions(CommandLine line) throws CliArgsException {
-		super(line);
-	}
-}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
index 873ba00..2efb8ca 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.commons.cli.CommandLine;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -78,11 +79,12 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 
 	@Test
 	public void testNonExistingJarFile() throws Exception {
-		ProgramOptions options = mock(ProgramOptions.class);
-		when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
+		ProgramOptions programOptions = mock(ProgramOptions.class);
+		ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class);
+		when(executionOptions.getJarFilePath()).thenReturn("/some/none/existing/path");
 
 		try {
-			frontend.buildProgram(options);
+			frontend.buildProgram(programOptions, executionOptions);
 			fail("should throw an exception");
 		}
 		catch (FileNotFoundException e) {
@@ -92,11 +94,12 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 
 	@Test
 	public void testFileNotJarFile() throws Exception {
-		ProgramOptions options = mock(ProgramOptions.class);
-		when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
+		ProgramOptions programOptions = mock(ProgramOptions.class);
+		ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class);
+		when(executionOptions.getJarFilePath()).thenReturn(getNonJarFilePath());
 
 		try {
-			frontend.buildProgram(options);
+			frontend.buildProgram(programOptions, executionOptions);
 			fail("should throw an exception");
 		}
 		catch (ProgramInvocationException e) {
@@ -114,12 +117,15 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 		URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 		String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
 
-		RunOptions options = CliFrontendParser.parseRunCommand(arguments);
-		assertEquals(getTestJarPath(), options.getJarFilePath());
-		assertArrayEquals(classpath, options.getClasspaths().toArray());
-		assertArrayEquals(reducedArguments, options.getProgramArgs());
+		CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
+		ProgramOptions programOptions = new ProgramOptions(commandLine);
+		ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
 
-		PackagedProgram prog = frontend.buildProgram(options);
+		assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
+		assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+		assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
+
+		PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions);
 
 		Assert.assertArrayEquals(reducedArguments, prog.getArguments());
 		Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
@@ -135,12 +141,15 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 		URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 		String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
 
-		RunOptions options = CliFrontendParser.parseRunCommand(arguments);
-		assertEquals(getTestJarPath(), options.getJarFilePath());
-		assertArrayEquals(classpath, options.getClasspaths().toArray());
-		assertArrayEquals(reducedArguments, options.getProgramArgs());
+		CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
+		ProgramOptions programOptions = new ProgramOptions(commandLine);
+		ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+
+		assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
+		assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+		assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
 
-		PackagedProgram prog = frontend.buildProgram(options);
+		PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions);
 
 		Assert.assertArrayEquals(reducedArguments, prog.getArguments());
 		Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
@@ -156,12 +165,15 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 		URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 		String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
 
-		RunOptions options = CliFrontendParser.parseRunCommand(arguments);
-		assertEquals(getTestJarPath(), options.getJarFilePath());
-		assertArrayEquals(classpath, options.getClasspaths().toArray());
-		assertArrayEquals(reducedArguments, options.getProgramArgs());
+		CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
+		ProgramOptions programOptions = new ProgramOptions(commandLine);
+		ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+
+		assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
+		assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+		assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
 
-		PackagedProgram prog = frontend.buildProgram(options);
+		PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions);
 
 		Assert.assertArrayEquals(reducedArguments, prog.getArguments());
 		Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
@@ -182,13 +194,16 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 		URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 		String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
 
-		RunOptions options = CliFrontendParser.parseRunCommand(arguments);
-		assertEquals(arguments[4], options.getJarFilePath());
-		assertArrayEquals(classpath, options.getClasspaths().toArray());
-		assertArrayEquals(reducedArguments, options.getProgramArgs());
+		CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
+		ProgramOptions programOptions = new ProgramOptions(commandLine);
+		ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+
+		assertEquals(arguments[4], executionOptions.getJarFilePath());
+		assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+		assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
 
 		try {
-			frontend.buildProgram(options);
+			frontend.buildProgram(programOptions, executionOptions);
 			fail("Should fail with an exception");
 		}
 		catch (FileNotFoundException e) {
@@ -200,12 +215,15 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 	public void testNonExistingFileWithoutArguments() throws Exception {
 		String[] arguments = {"/some/none/existing/path"};
 
-		RunOptions options = CliFrontendParser.parseRunCommand(arguments);
-		assertEquals(arguments[0], options.getJarFilePath());
-		assertArrayEquals(new String[0], options.getProgramArgs());
+		CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
+		ProgramOptions programOptions = new ProgramOptions(commandLine);
+		ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+
+		assertEquals(arguments[0], executionOptions.getJarFilePath());
+		assertArrayEquals(new String[0], programOptions.getProgramArgs());
 
 		try {
-			frontend.buildProgram(options);
+			frontend.buildProgram(programOptions, executionOptions);
 		}
 		catch (FileNotFoundException e) {
 			// that's what we want
@@ -258,13 +276,16 @@ public class CliFrontendPackageProgramTest extends TestLogger {
 			URL[] classpath = new URL[] { new URL("file:///tmp/foo"), new URL("file:///tmp/bar") };
 			String[] reducedArguments = { "true", "arg1", "arg2" };
 
-			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
-			assertEquals(getTestJarPath(), options.getJarFilePath());
-			assertArrayEquals(classpath, options.getClasspaths().toArray());
-			assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, options.getEntryPointClassName());
-			assertArrayEquals(reducedArguments, options.getProgramArgs());
+			CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true);
+			ProgramOptions programOptions = new ProgramOptions(commandLine);
+			ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+
+			assertEquals(getTestJarPath(), executionOptions.getJarFilePath());
+			assertArrayEquals(classpath, executionOptions.getClasspaths().toArray());
+			assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, programOptions.getEntryPointClassName());
+			assertArrayEquals(reducedArguments, programOptions.getProgramArgs());
 
-			PackagedProgram prog = spy(frontend.buildProgram(options));
+			PackagedProgram prog = spy(frontend.buildProgram(programOptions, executionOptions));
 
 			ClassLoader testClassLoader = new ClassLoader(prog.getUserCodeClassLoader()) {
 				@Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index 2230917..918b6db 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
+import org.apache.commons.cli.CommandLine;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -81,8 +82,12 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		// test configure savepoint path (no ignore flag)
 		{
 			String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()};
-			RunOptions options = CliFrontendParser.parseRunCommand(parameters);
-			SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings();
+
+			CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
+			ProgramOptions programOptions = new ProgramOptions(commandLine);
+			ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+
+			SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
 			assertTrue(savepointSettings.restoreSavepoint());
 			assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
 			assertFalse(savepointSettings.allowNonRestoredState());
@@ -91,8 +96,12 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		// test configure savepoint path (with ignore flag)
 		{
 			String[] parameters = {"-s", "expectedSavepointPath", "-n", getTestJarPath()};
-			RunOptions options = CliFrontendParser.parseRunCommand(parameters);
-			SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings();
+
+			CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
+			ProgramOptions programOptions = new ProgramOptions(commandLine);
+			ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions);
+
+			SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
 			assertTrue(savepointSettings.restoreSavepoint());
 			assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
 			assertTrue(savepointSettings.allowNonRestoredState());
@@ -102,12 +111,15 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		{
 			String[] parameters =
 				{ getTestJarPath(), "-arg1", "value1", "justavalue", "--arg2", "value2"};
-			RunOptions options = CliFrontendParser.parseRunCommand(parameters);
-			assertEquals("-arg1", options.getProgramArgs()[0]);
-			assertEquals("value1", options.getProgramArgs()[1]);
-			assertEquals("justavalue", options.getProgramArgs()[2]);
-			assertEquals("--arg2", options.getProgramArgs()[3]);
-			assertEquals("value2", options.getProgramArgs()[4]);
+
+			CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
+			ProgramOptions programOptions = new ProgramOptions(commandLine);
+
+			assertEquals("-arg1", programOptions.getProgramArgs()[0]);
+			assertEquals("value1", programOptions.getProgramArgs()[1]);
+			assertEquals("justavalue", programOptions.getProgramArgs()[2]);
+			assertEquals("--arg2", programOptions.getProgramArgs()[3]);
+			assertEquals("value2", programOptions.getProgramArgs()[4]);
 		}
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index a032205..a09fd68 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -37,4 +37,10 @@ public class DeploymentOptions {
 			key("execution.attached")
 					.defaultValue(false)
 					.withDescription("Specifies if the pipeline is submitted in attached or detached mode.");
+
+	public static final ConfigOption<Boolean> SHUTDOWN_IF_ATTACHED =
+			key("execution.shutdown-on-attached-exit")
+					.defaultValue(false)
+					.withDescription("If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
+							"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
 }
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
similarity index 51%
copy from flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
copy to flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index a032205..6ae8a31 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -23,18 +23,27 @@ import org.apache.flink.annotation.PublicEvolving;
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
- * The {@link ConfigOption configuration options} relevant for all Executors.
+ * The {@link ConfigOption configuration options} for job execution.
  */
 @PublicEvolving
-public class DeploymentOptions {
+public class PipelineOptions {
 
-	public static final ConfigOption<String> TARGET =
-			key("execution.target")
+	public static final String LIST_SEPARATOR = ";";
+
+	/**
+	 * A list of jar files that contain the user-defined function (UDF) classes and all classes used from within the UDFs.
+	 */
+	public static final ConfigOption<String> JARS =
+			key("pipeline.jars")
 					.noDefaultValue()
-					.withDescription("The deployment target for the execution, e.g. \"local\" for local execution.");
+					.withDescription("A semicolon-separated list of the jars to package with the job jars to be sent to the cluster. These have to be valid paths.");
 
-	public static final ConfigOption<Boolean> ATTACHED =
-			key("execution.attached")
-					.defaultValue(false)
-					.withDescription("Specifies if the pipeline is submitted in attached or detached mode.");
+	/**
+	 * A list of URLs that are added to the classpath of each user code classloader of the program.
+	 * Paths must specify a protocol (e.g. file://) and be accessible on all nodes
+	 */
+	public static final ConfigOption<String> CLASSPATHS =
+			key("pipeline.classpaths")
+					.noDefaultValue()
+					.withDescription("A semicolon-separated list of the classpaths to package with the job jars to be sent to the cluster. These have to be valid URLs.");
 }
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 999f85a..deb4e03 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -58,6 +58,7 @@ public class ConfigOptionsDocGenerator {
 	static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{
 		new OptionsClassLocation("flink-core", "org.apache.flink.configuration"),
 		new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.shuffle"),
+		new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.jobgraph"),
 		new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
similarity index 50%
copy from flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
index a032205..86fd389 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
@@ -16,25 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.configuration;
+package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
- * The {@link ConfigOption configuration options} relevant for all Executors.
+ * The {@link ConfigOption configuration options} used when restoring from a savepoint.
  */
 @PublicEvolving
-public class DeploymentOptions {
+public class SavepointConfigOptions {
 
-	public static final ConfigOption<String> TARGET =
-			key("execution.target")
+	/**
+	 * The path to a savepoint that will be used to bootstrap the pipeline's state.
+	 */
+	public static final ConfigOption<String> SAVEPOINT_PATH =
+			key("execution.savepoint.path")
 					.noDefaultValue()
-					.withDescription("The deployment target for the execution, e.g. \"local\" for local execution.");
+					.withDescription("Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");
 
-	public static final ConfigOption<Boolean> ATTACHED =
-			key("execution.attached")
+	/**
+	 * A flag indicating if we allow Flink to skip savepoint state that cannot be restored,
+	 * e.g. because the corresponding operator has been removed.
+	 */
+	public static final ConfigOption<Boolean> SAVEPOINT_IGNORE_UNCLAIMED_STATE =
+			key("execution.savepoint.ignore-unclaimed-state")
 					.defaultValue(false)
-					.withDescription("Specifies if the pipeline is submitted in attached or detached mode.");
+					.withDescription("Allow to skip savepoint state that cannot be restored. " +
+							"Allow this if you removed an operator from your pipeline after the savepoint was triggered.");
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
index 953d11c..b7fee4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -131,4 +133,19 @@ public class SavepointRestoreSettings implements Serializable {
 		return new SavepointRestoreSettings(savepointPath, allowNonRestoredState);
 	}
 
+	// -------------------------- Parsing to and from a configuration object ------------------------------------
+
+	public static void toConfiguration(final SavepointRestoreSettings savepointRestoreSettings, final Configuration configuration) {
+		configuration.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, savepointRestoreSettings.allowNonRestoredState());
+		final String savepointPath = savepointRestoreSettings.getRestorePath();
+		if (savepointPath != null) {
+			configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
+		}
+	}
+
+	public static SavepointRestoreSettings fromConfiguration(final Configuration configuration) {
+		final String savepointPath = configuration.getString(SavepointConfigOptions.SAVEPOINT_PATH);
+		final boolean allowNonRestored = configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+		return savepointPath == null ? SavepointRestoreSettings.none() : SavepointRestoreSettings.forPath(savepointPath, allowNonRestored);
+	}
 }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 580b0be..287c9da 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -27,7 +27,8 @@ import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
@@ -118,7 +119,7 @@ public class ExecutionContext<T> {
 	private final Configuration flinkConfig;
 	private final Configuration executorConfig;
 	private final ClusterClientFactory<T> clusterClientFactory;
-	private final RunOptions runOptions;
+	private final ExecutionConfigAccessor executionParameters;
 	private final T clusterId;
 	private final ClusterSpecification clusterSpec;
 
@@ -173,7 +174,7 @@ public class ExecutionContext<T> {
 		clusterClientFactory = serviceLoader.getClusterClientFactory(executorConfig);
 		checkState(clusterClientFactory != null);
 
-		runOptions = createRunOptions(commandLine);
+		executionParameters = createExecutionParameterProvider(commandLine);
 		clusterId = clusterClientFactory.getClusterId(executorConfig);
 		clusterSpec = clusterClientFactory.getClusterSpecification(executorConfig);
 	}
@@ -251,9 +252,10 @@ public class ExecutionContext<T> {
 		throw new SqlExecutionException("Could not find a matching deployment.");
 	}
 
-	private static RunOptions createRunOptions(CommandLine commandLine) {
+	private static ExecutionConfigAccessor createExecutionParameterProvider(CommandLine commandLine) {
 		try {
-			return new RunOptions(commandLine);
+			final ProgramOptions programOptions = new ProgramOptions(commandLine);
+			return ExecutionConfigAccessor.fromProgramOptions(programOptions);
 		} catch (CliArgsException e) {
 			throw new SqlExecutionException("Invalid deployment run options.", e);
 		}
@@ -462,8 +464,8 @@ public class ExecutionContext<T> {
 					parallelism);
 
 			ClientUtils.addJarFiles(jobGraph, dependencies);
-			jobGraph.setClasspaths(runOptions.getClasspaths());
-			jobGraph.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings());
+			jobGraph.setClasspaths(executionParameters.getClasspaths());
+			jobGraph.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());
 
 			return jobGraph;
 		}


[flink] 03/08: [FLINK-14501] Add the DeploymentOptions.TARGET

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8820f6dd7f3fb55e7f1b10aad0beebd7caf615f6
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 20:12:51 2019 +0200

    [FLINK-14501] Add the DeploymentOptions.TARGET
---
 docs/_includes/generated/deployment_configuration.html               | 5 +++++
 .../java/org/apache/flink/client/cli/AbstractCustomCommandLine.java  | 2 ++
 .../main/java/org/apache/flink/configuration/DeploymentOptions.java  | 5 +++++
 .../src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 1 +
 4 files changed, 13 insertions(+)

diff --git a/docs/_includes/generated/deployment_configuration.html b/docs/_includes/generated/deployment_configuration.html
index bdac757..2c652e4 100644
--- a/docs/_includes/generated/deployment_configuration.html
+++ b/docs/_includes/generated/deployment_configuration.html
@@ -12,5 +12,10 @@
             <td style="word-wrap: break-word;">false</td>
             <td>Specifies if the pipeline is submitted in attached or detached mode.</td>
         </tr>
+        <tr>
+            <td><h5>execution.target</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The deployment target for the execution, e.g. "local" for local execution.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index 805d58a..cba1036 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.util.FlinkException;
@@ -72,6 +73,7 @@ public abstract class AbstractCustomCommandLine<T> implements CustomCommandLine<
 	@Override
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		final Configuration resultingConfiguration = new Configuration(configuration);
+		resultingConfiguration.setString(DeploymentOptions.TARGET, getId());
 
 		if (commandLine.hasOption(addressOption.getOpt())) {
 			String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index 1addcc4..a032205 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -28,6 +28,11 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 @PublicEvolving
 public class DeploymentOptions {
 
+	public static final ConfigOption<String> TARGET =
+			key("execution.target")
+					.noDefaultValue()
+					.withDescription("The deployment target for the execution, e.g. \"local\" for local execution.");
+
 	public static final ConfigOption<Boolean> ATTACHED =
 			key("execution.attached")
 					.defaultValue(false)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index a831ab2..2aaa6fb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -443,6 +443,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		// we ignore the addressOption because it can only contain "yarn-cluster"
 		final Configuration effectiveConfiguration = new Configuration(configuration);
+		effectiveConfiguration.setString(DeploymentOptions.TARGET, getId());
 
 		applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
 


[flink] 04/08: [FLINK-14501] Add the ClusterClientFactory and make it discoverable

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch to-merge
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74e35f00b2daa7afd73675b2a53ce5cd09979b70
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Oct 22 20:22:36 2019 +0200

    [FLINK-14501] Add the ClusterClientFactory and make it discoverable
---
 .../client/deployment/ClusterClientFactory.java    | 63 ++++++++++++++++++
 .../deployment/ClusterClientServiceLoader.java     | 36 +++++++++++
 .../DefaultClusterClientServiceLoader.java         | 75 ++++++++++++++++++++++
 3 files changed, 174 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
new file mode 100644
index 0000000..36647b6
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory containing all the necessary information for creating clients to Flink clusters.
+ */
+public interface ClusterClientFactory<ClusterID> {
+
+	/**
+	 * Returns {@code true} if the current {@link ClusterClientFactory} is compatible with the provided configuration,
+	 * {@code false} otherwise.
+	 */
+	boolean isCompatibleWith(Configuration configuration);
+
+	/**
+	 * Create a {@link ClusterDescriptor} from the given configuration.
+	 *
+	 * @param configuration containing the configuration options relevant for the {@link ClusterDescriptor}
+	 * @return the corresponding {@link ClusterDescriptor}.
+	 */
+	ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
+
+	/**
+	 * Returns the cluster id if a cluster id is specified in the provided configuration, otherwise it returns {@code null}.
+	 *
+	 * <p>A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink cluster running on Yarn.
+	 *
+	 * @param configuration containing the configuration options relevant for the cluster id retrieval
+	 * @return Cluster id identifying the cluster to deploy jobs to or null
+	 */
+	@Nullable
+	ClusterID getClusterId(Configuration configuration);
+
+	/**
+	 * Returns the {@link ClusterSpecification} specified by the configuration and the command
+	 * line options. This specification can be used to deploy a new Flink cluster.
+	 *
+	 * @param configuration containing the configuration options relevant for the {@link ClusterSpecification}
+	 * @return the corresponding {@link ClusterSpecification} for a new Flink cluster
+	 */
+	ClusterSpecification getClusterSpecification(Configuration configuration);
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
new file mode 100644
index 0000000..51eef13
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientServiceLoader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An interface used to discover the appropriate {@link ClusterClientFactory cluster client factory} based on the
+ * provided {@link Configuration}.
+ */
+public interface ClusterClientServiceLoader {
+
+	/**
+	 * Discovers the appropriate {@link ClusterClientFactory} based on the provided configuration.
+	 *
+	 * @param configuration the configuration based on which the appropriate factory is going to be used.
+	 * @return the appropriate {@link ClusterClientFactory}.
+	 */
+	<ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration);
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
new file mode 100644
index 0000000..574aeaf
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A service provider for {@link ClusterClientFactory cluster client factories}.
+ */
+public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
+
+	private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class);
+
+	@Override
+	public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) {
+		checkNotNull(configuration);
+
+		final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
+		final Iterator<ClusterClientFactory> factories = defaultLoader.iterator();
+		while (factories.hasNext()) {
+			try {
+				final ClusterClientFactory factory = factories.next();
+				if (factory != null && factory.isCompatibleWith(configuration)) {
+					compatibleFactories.add(factory);
+				}
+			} catch (Throwable e) {
+				if (e.getCause() instanceof NoClassDefFoundError) {
+					LOG.info("Could not load factory due to missing dependencies.");
+				} else {
+					throw e;
+				}
+			}
+		}
+
+		if (compatibleFactories.size() > 1) {
+			final List<String> configStr =
+					configuration.toMap().entrySet().stream()
+							.map(e -> e.getKey() + "=" + e.getValue())
+							.collect(Collectors.toList());
+
+			throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
+		}
+
+		return compatibleFactories.isEmpty() ? null : (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
+	}
+}