You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/04/13 19:17:28 UTC

[3/4] flink git commit: [FLINK-8872][flip6] fix yarn detached mode command parsing

[FLINK-8872][flip6] fix yarn detached mode command parsing

The detached flag if given by "-yd" was not passed correctly into the
CliFrontend and resulted in the CLI waiting for submitted jobs to finish instead
of detaching from the execution.

[FLINK-8872][yarn] add tests for YARN detached mode command line parsing with CliFrontend

- create a test-jar of flink-clients
- create CliFrontendRunWithYarnTest based on CliFrontendRunTest that verifies
  CliFrontend's parsing in conjunction with FlinkYarnSessionCli
-> verify detached mode in this test (can be extended further in the future)

This closes #5672.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdd1c6ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdd1c6ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdd1c6ed

Branch: refs/heads/master
Commit: fdd1c6ed0fed229612ecde1565d90a06dbe6ff55
Parents: ca5573b
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Mar 5 18:24:17 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 13 10:38:26 2018 -0700

----------------------------------------------------------------------
 flink-clients/pom.xml                           |  14 +-
 .../flink/client/cli/CliFrontendParser.java     |   9 ++
 .../org/apache/flink/client/cli/DefaultCLI.java |   1 +
 .../apache/flink/client/cli/ProgramOptions.java |   4 +-
 .../flink/client/program/ClusterClient.java     |   2 +-
 .../client/program/rest/RestClusterClient.java  |   2 +-
 .../flink/client/cli/CliFrontendRunTest.java    |   2 +-
 flink-yarn-tests/pom.xml                        |   8 +
 .../flink/yarn/CliFrontendRunWithYarnTest.java  | 148 +++++++++++++++++++
 .../flink/yarn/util/FakeClusterClient.java      |  79 ++++++++++
 .../util/NonDeployingYarnClusterDescriptor.java |  98 ++++++++++++
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  22 +--
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  19 +++
 13 files changed, 392 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index d15e3ff..c96f275 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -96,6 +96,18 @@ under the License.
 	<build>
 		<plugins>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
 				<artifactId>maven-assembly-plugin</artifactId>
 				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
 				<executions>
@@ -120,7 +132,7 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
-			<!--Remove the external jar test code from the test-classes directory since it musn't be in the
+			<!--Remove the external jar test code from the test-classes directory since it mustn't be in the
 			classpath when running the tests to actually test whether the user code class loader
 			is properly used.-->
 			<plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 5a6c0ff..1588aac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -62,6 +62,13 @@ public class CliFrontendParser {
 	public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
 			"the job in detached mode");
 
+	/**
+	 * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
+	 */
+	@Deprecated
+	public static final Option YARN_DETACHED_OPTION = new Option("yd", "yarndetached", false, "If present, runs " +
+		"the job in detached mode (deprecated; use non-YARN specific option instead)");
+
 	static final Option ARGS_OPTION = new Option("a", "arguments", true,
 			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
 
@@ -117,6 +124,7 @@ public class CliFrontendParser {
 
 		LOGGING_OPTION.setRequired(false);
 		DETACHED_OPTION.setRequired(false);
+		YARN_DETACHED_OPTION.setRequired(false);
 
 		ARGS_OPTION.setRequired(false);
 		ARGS_OPTION.setArgName("programArgs");
@@ -158,6 +166,7 @@ public class CliFrontendParser {
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(YARN_DETACHED_OPTION);
 		return options;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 43efc63..e9ed9af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -40,6 +40,7 @@ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
 
 	@Override
 	public boolean isActive(CommandLine commandLine) {
+		// always active because we can try to read a JobManager address from the config
 		return true;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index df25e67..1acda1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -38,6 +38,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
  * Base class for command line options that refer to a JAR file program.
@@ -112,7 +113,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		}
 
 		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
-		detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
+		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
+			YARN_DETACHED_OPTION.getOpt());
 
 		if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
 			String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index fbaa515..2ef0b2e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -385,7 +385,7 @@ public abstract class ClusterClient<T> {
 			return run(jobWithJars, parallelism, prog.getSavepointSettings());
 		}
 		else if (prog.isUsingInteractiveMode()) {
-			log.info("Starting program in interactive mode");
+			log.info("Starting program in interactive mode (detached: {})", isDetached());
 
 			final List<URL> libraries;
 			if (hasUserJarsInClassPath(prog.getAllLibraries())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 4a4f993..a6f676e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -235,7 +235,7 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 
 	@Override
 	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		log.info("Submitting job {}.", jobGraph.getJobID());
+		log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached());
 
 		final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index efa6a39..ba51b69 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -144,7 +144,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 
 	// --------------------------------------------------------------------------------------------
 
-	private static void verifyCliFrontend(
+	public static void verifyCliFrontend(
 			AbstractCustomCommandLine<?> cli,
 			String[] parameters,
 			int expectedParallelism,

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 0b05b28..a2d75c9 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -67,6 +67,14 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- Needed for the streaming wordcount example -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
new file mode 100644
index 0000000..d6a029f
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.cli.CliFrontendTestBase;
+import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.util.FakeClusterClient;
+import org.apache.flink.yarn.util.NonDeployingYarnClusterDescriptor;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
+
+/**
+ * Tests for the RUN command using a {@link org.apache.flink.yarn.cli.FlinkYarnSessionCli} inside
+ * the {@link org.apache.flink.client.cli.CliFrontend}.
+ *
+ * @see org.apache.flink.client.cli.CliFrontendRunTest
+ */
+@RunWith(Parameterized.class)
+public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	@BeforeClass
+	public static void init() {
+		CliFrontendTestUtils.pipeSystemOutToNull();
+	}
+
+	@AfterClass
+	public static void shutdown() {
+		CliFrontendTestUtils.restoreSystemOut();
+	}
+
+	@Test
+	public void testRun() throws Exception {
+		String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
+
+		Configuration configuration = new Configuration();
+		configuration.setString(CoreOptions.MODE, mode);
+		configuration.setString(JobManagerOptions.ADDRESS, "localhost");
+		configuration.setInteger(JobManagerOptions.PORT, 8081);
+
+		FlinkYarnSessionCli yarnCLI = new TestingFlinkYarnSessionCli(
+			configuration,
+			tmp.getRoot().getAbsolutePath(),
+			"y",
+			"yarn");
+
+		// test detached mode
+		{
+			String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-d", testJarPath};
+			verifyCliFrontend(yarnCLI, parameters, 2, true, true);
+		}
+
+		// test detached mode
+		{
+			String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-yd", testJarPath};
+			verifyCliFrontend(yarnCLI, parameters, 2, true, true);
+		}
+	}
+
+	private static class TestingFlinkYarnSessionCli extends FlinkYarnSessionCli {
+		@SuppressWarnings("unchecked")
+		private final ClusterClient<ApplicationId> clusterClient;
+		private final String configurationDirectory;
+
+		private TestingFlinkYarnSessionCli(
+				Configuration configuration,
+				String configurationDirectory,
+				String shortPrefix,
+				String longPrefix) throws Exception {
+			super(configuration, configurationDirectory, shortPrefix, longPrefix);
+
+			this.clusterClient = new FakeClusterClient(configuration);
+			this.configurationDirectory = configurationDirectory;
+		}
+
+		@Override
+		public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine commandLine)
+			throws FlinkException {
+			AbstractYarnClusterDescriptor parent = super.createClusterDescriptor(commandLine);
+			return new NonDeployingDetachedYarnClusterDescriptor(
+					parent.getFlinkConfiguration(),
+					(YarnConfiguration) parent.getYarnClient().getConfig(),
+					configurationDirectory,
+					parent.getYarnClient(),
+					clusterClient);
+		}
+	}
+
+	private static class NonDeployingDetachedYarnClusterDescriptor extends NonDeployingYarnClusterDescriptor {
+
+		NonDeployingDetachedYarnClusterDescriptor(
+			Configuration flinkConfiguration,
+			YarnConfiguration yarnConfiguration, String configurationDirectory,
+			YarnClient yarnClient,
+			ClusterClient<ApplicationId> clusterClient) {
+			super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient,
+				clusterClient);
+		}
+
+		@Override
+		public ClusterClient<ApplicationId> deployJobCluster(
+				ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) {
+			assertTrue(detached);
+			return super.deployJobCluster(clusterSpecification, jobGraph, true);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
new file mode 100644
index 0000000..f6f6a71
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.util;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Dummy {@link ClusterClient} for testing purposes (extend as needed).
+ */
+public class FakeClusterClient extends ClusterClient<ApplicationId> {
+
+	public FakeClusterClient(Configuration flinkConfig) throws Exception {
+		super(flinkConfig);
+	}
+
+	@Override
+	public void waitForClusterToBeReady() {
+	}
+
+	@Override
+	public String getWebInterfaceURL() {
+		return "";
+	}
+
+	@Override
+	public GetClusterStatusResponse getClusterStatus() {
+		throw new UnsupportedOperationException("Not needed in test.");
+	}
+
+	@Override
+	public List<String> getNewMessages() {
+		throw new UnsupportedOperationException("Not needed in test.");
+	}
+
+	@Override
+	public ApplicationId getClusterId() {
+		throw new UnsupportedOperationException("Not needed in test.");
+	}
+
+	@Override
+	public int getMaxSlots() {
+		return 10;
+	}
+
+	@Override
+	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+		return false;
+	}
+
+	@Override
+	public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) {
+		throw new UnsupportedOperationException("Not needed in test.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
new file mode 100644
index 0000000..4916b73
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.util;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Dummy {@link AbstractYarnClusterDescriptor} without an actual deployment for tests.
+ */
+public class NonDeployingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
+	private final ClusterClient<ApplicationId> clusterClient;
+
+	public NonDeployingYarnClusterDescriptor(
+			Configuration flinkConfiguration,
+			YarnConfiguration yarnConfiguration,
+			String configurationDirectory,
+			YarnClient yarnClient,
+			ClusterClient<ApplicationId> clusterClient) {
+		super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, true);
+
+		//noinspection unchecked
+		this.clusterClient = Preconditions.checkNotNull(clusterClient);
+	}
+
+	@Override
+	public String getClusterDescription() {
+		// return parent.getClusterDescription();
+		return "NonDeployingYarnClusterDescriptor";
+	}
+
+	@Override
+	protected ClusterClient<ApplicationId> createYarnClusterClient(
+			AbstractYarnClusterDescriptor descriptor,
+			int numberTaskManagers,
+			int slotsPerTaskManager,
+			ApplicationReport report,
+			Configuration flinkConfiguration,
+			boolean perJobCluster) {
+		return clusterClient;
+	}
+
+	@Override
+	public ClusterClient<ApplicationId> retrieve(ApplicationId clusterId) {
+		return clusterClient;
+	}
+
+	@Override
+	public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) {
+		return clusterClient;
+	}
+
+	@Override
+	public ClusterClient<ApplicationId> deployJobCluster(
+			ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) {
+		return clusterClient;
+	}
+
+	@Override
+	public void terminateCluster(ApplicationId clusterId) {
+	}
+
+	@Override
+	protected String getYarnSessionClusterEntrypoint() {
+		throw new UnsupportedOperationException("Not needed in test.");
+	}
+
+	@Override
+	protected String getYarnJobClusterEntrypoint() {
+		throw new UnsupportedOperationException("Not needed in test.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 16abffa..fe04662 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn.cli;
 import org.apache.flink.client.cli.AbstractCustomCommandLine;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
@@ -84,6 +83,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
 /**
@@ -127,7 +128,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	private final Option tmMemory;
 	private final Option container;
 	private final Option slots;
-	private final Option detached;
 	private final Option zookeeperNamespace;
 	private final Option help;
 
@@ -162,9 +162,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 	private final YarnConfiguration yarnConfiguration;
 
-	//------------------------------------ Internal fields -------------------------
-	private boolean detachedMode = false;
-
 	public FlinkYarnSessionCli(
 			Configuration configuration,
 			String configurationDirectory,
@@ -202,7 +199,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 			.valueSeparator()
 			.desc("use value for given property")
 			.build();
-		detached = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
 		streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
 		name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
 		zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
@@ -218,7 +214,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		allOptions.addOption(shipPath);
 		allOptions.addOption(slots);
 		allOptions.addOption(dynamicproperties);
-		allOptions.addOption(detached);
+		allOptions.addOption(DETACHED_OPTION);
+		allOptions.addOption(YARN_DETACHED_OPTION);
 		allOptions.addOption(streaming);
 		allOptions.addOption(name);
 		allOptions.addOption(applicationId);
@@ -348,8 +345,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
-		if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
-			this.detachedMode = true;
+		if (cmd.hasOption(YARN_DETACHED_OPTION.getOpt()) || cmd.hasOption(DETACHED_OPTION.getOpt())) {
 			yarnClusterDescriptor.setDetachedMode(true);
 		}
 
@@ -519,7 +515,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 		for (Option option : commandLine.getOptions()) {
 			if (allOptions.hasOption(option.getOpt())) {
-				if (!option.getOpt().equals(detached.getOpt())) {
+				if (!isDetachedOption(option)) {
 					// don't resume from properties file if yarn options have been specified
 					canApplyYarnProperties = false;
 					break;
@@ -530,6 +526,10 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		return canApplyYarnProperties;
 	}
 
+	private boolean isDetachedOption(Option option) {
+		return option.getOpt().equals(YARN_DETACHED_OPTION.getOpt()) || option.getOpt().equals(DETACHED_OPTION.getOpt());
+	}
+
 	private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
 		final Configuration effectiveConfiguration = new Configuration(configuration);
 
@@ -621,7 +621,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 					}
 				}
 
-				if (detachedMode) {
+				if (yarnClusterDescriptor.isDetachedMode()) {
 					LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
 						"yarn application -kill " + yarnApplicationId);

http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 62110ed..12c0354 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -119,6 +119,25 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	}
 
 	@Test
+	public void testCorrectSettingOfDetachedMode() throws Exception {
+		String[] params =
+			new String[] {"-yd"};
+
+		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
+			new Configuration(),
+			tmp.getRoot().getAbsolutePath(),
+			"y",
+			"yarn");
+
+		final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+
+		AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
+
+		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
+		assertTrue(descriptor.isDetachedMode());
+	}
+
+	@Test
 	public void testZookeeperNamespaceProperty() throws Exception {
 		String zkNamespaceCliInput = "flink_test_namespace";