You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/29 07:50:12 UTC

[flink] branch release-1.10 updated: [FLINK-17819][yarn] Fix error msg for yarn deployments when hadoop not in classpath

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 8730b5b  [FLINK-17819][yarn] Fix error msg for yarn deployments when hadoop not in classpath
8730b5b is described below

commit 8730b5b73a43a75a70627d1ab60231bb3e06afc2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu May 28 20:48:26 2020 +0200

    [FLINK-17819][yarn] Fix error msg for yarn deployments when hadoop not in classpath
    
    This closes #12395.
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 45 ++++++++-----
 .../DefaultClusterClientServiceLoader.java         | 10 ++-
 .../deployment/ClusterClientServiceLoaderTest.java |  4 +-
 .../execution/DefaultExecutorServiceLoader.java    |  6 +-
 .../org/apache/flink/api/scala/FlinkShell.scala    |  4 +-
 .../flink/yarn/YarnClusterClientFactory.java       |  5 ++
 .../flink/yarn/cli/FallbackYarnSessionCli.java     | 78 ++++++++++++++++++++++
 .../executors/YarnJobClusterExecutorFactory.java   |  7 +-
 .../YarnSessionClusterExecutorFactory.java         |  7 +-
 .../flink/yarn/FallbackYarnSessionCliTest.java     | 69 +++++++++++++++++++
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java |  2 +-
 11 files changed, 212 insertions(+), 25 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 38e4cd3..57b7a30 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -176,14 +176,17 @@ public class CliFrontend {
 		final Options commandOptions = CliFrontendParser.getRunCommandOptions();
 		final CommandLine commandLine = getCommandLine(commandOptions, args, true);
 
-		final ProgramOptions programOptions = new ProgramOptions(commandLine);
-
 		// evaluate help flag
 		if (commandLine.hasOption(HELP_OPTION.getOpt())) {
 			CliFrontendParser.printHelpForRun(customCommandLines);
 			return;
 		}
 
+		final CustomCommandLine activeCommandLine =
+				validateAndGetActiveCommandLine(checkNotNull(commandLine));
+
+		final ProgramOptions programOptions = new ProgramOptions(commandLine);
+
 		if (!programOptions.isPython()) {
 			// Java program should be specified a JAR file
 			if (programOptions.getJarFilePath() == null) {
@@ -201,8 +204,8 @@ public class CliFrontend {
 		}
 
 		final List<URL> jobJars = program.getJobJarAndDependencies();
-		final Configuration effectiveConfiguration =
-				getEffectiveConfiguration(commandLine, programOptions, jobJars);
+		final Configuration effectiveConfiguration = getEffectiveConfiguration(
+				activeCommandLine, commandLine, programOptions, jobJars);
 
 		LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
 
@@ -214,16 +217,18 @@ public class CliFrontend {
 	}
 
 	private Configuration getEffectiveConfiguration(
+			final CustomCommandLine activeCustomCommandLine,
 			final CommandLine commandLine,
 			final ProgramOptions programOptions,
 			final List<URL> jobJars) throws FlinkException {
 
-		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine));
 		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(
 				checkNotNull(programOptions),
 				checkNotNull(jobJars));
 
-		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		final Configuration executorConfig = checkNotNull(activeCustomCommandLine)
+				.applyCommandLineOptionsToConfiguration(commandLine);
+
 		final Configuration effectiveConfiguration = new Configuration(executorConfig);
 		return executionParameters.applyToConfiguration(effectiveConfiguration);
 	}
@@ -265,8 +270,11 @@ public class CliFrontend {
 
 			LOG.info("Creating program plan dump");
 
-			final Configuration effectiveConfiguration =
-					getEffectiveConfiguration(commandLine, programOptions, program.getJobJarAndDependencies());
+			final CustomCommandLine activeCommandLine =
+					validateAndGetActiveCommandLine(checkNotNull(commandLine));
+
+			final Configuration effectiveConfiguration = getEffectiveConfiguration(
+					activeCommandLine, commandLine, programOptions, program.getJobJarAndDependencies());
 
 			Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true);
 			String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
@@ -329,7 +337,7 @@ public class CliFrontend {
 			showAll = listOptions.showAll();
 		}
 
-		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
 
 		runClusterAction(
 			activeCommandLine,
@@ -446,7 +454,7 @@ public class CliFrontend {
 
 		logAndSysout((advanceToEndOfEventTime ? "Draining job " : "Suspending job ") + "\"" + jobId + "\" with a savepoint.");
 
-		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
 		runClusterAction(
 			activeCommandLine,
 			commandLine,
@@ -480,7 +488,7 @@ public class CliFrontend {
 			return;
 		}
 
-		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
 
 		final String[] cleanedArgs = cancelOptions.getArgs();
 
@@ -570,7 +578,7 @@ public class CliFrontend {
 			return;
 		}
 
-		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
 
 		if (savepointOptions.isDispose()) {
 			runClusterAction(
@@ -1032,7 +1040,14 @@ public class CliFrontend {
 					"y",
 					"yarn"));
 		} catch (NoClassDefFoundError | Exception e) {
-			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
+			final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
+			try {
+				LOG.info("Loading FallbackYarnSessionCli");
+				customCommandLines.add(
+						loadCustomCommandLine(errorYarnSessionCLI, configuration));
+			} catch (Exception exception) {
+				LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
+			}
 		}
 
 		//	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
@@ -1051,7 +1066,7 @@ public class CliFrontend {
 	 * @param commandLine The input to the command-line.
 	 * @return custom command-line which is active (may only be one at a time)
 	 */
-	public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+	public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
 		LOG.debug("Custom commandlines: {}", customCommandLines);
 		for (CustomCommandLine cli : customCommandLines) {
 			LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
@@ -1059,7 +1074,7 @@ public class CliFrontend {
 				return cli;
 			}
 		}
-		throw new IllegalStateException("No command-line ran.");
+		throw new IllegalStateException("No valid command-line found.");
 	}
 
 	/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index 574aeaf..b2c43a2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -70,6 +70,14 @@ public class DefaultClusterClientServiceLoader implements ClusterClientServiceLo
 			throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
 		}
 
-		return compatibleFactories.isEmpty() ? null : (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
+		if (compatibleFactories.isEmpty()) {
+			throw new IllegalStateException(
+					"No ClusterClientFactory found. If you were targeting a Yarn cluster, " +
+					"please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your " +
+					"classpath. For more information refer to the \"Deployment & Operations\" section of the official " +
+					"Apache Flink documentation.");
+		}
+
+		return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
 	}
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index 3450729..b941c12 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -32,7 +32,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -85,13 +84,12 @@ public class ClusterClientServiceLoaderTest {
 		fail();
 	}
 
-	@Test
+	@Test(expected = IllegalStateException.class)
 	public void testNoFactoriesFound() {
 		final Configuration config = new Configuration();
 		config.setString(DeploymentOptions.TARGET, NON_EXISTING_TARGET);
 
 		final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
-		assertNull(factory);
 	}
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index e146b08..6b07f7f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -81,7 +81,11 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
 			throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
 		}
 
-		return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
+		if (compatibleFactories.isEmpty()) {
+			throw new IllegalStateException("No ExecutorFactory found to execute the application.");
+		}
+
+		return compatibleFactories.get(0);
 	}
 
 	@Override
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 8f2dc9c..d3f65c2 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -244,7 +244,7 @@ object FlinkShell {
       frontend.getCustomCommandLineOptions)
     val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
 
-    val customCLI = frontend.getActiveCustomCommandLine(commandLine)
+    val customCLI = frontend.validateAndGetActiveCommandLine(commandLine)
     val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine)
 
     val serviceLoader = new DefaultClusterClientServiceLoader
@@ -283,7 +283,7 @@ object FlinkShell {
       frontend.getCustomCommandLineOptions)
     val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
 
-    val customCLI = frontend.getActiveCustomCommandLine(commandLine)
+    val customCLI = frontend.validateAndGetActiveCommandLine(commandLine)
     val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
 
     (executorConfig, None)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 0b5beb2..90b1eca 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -42,6 +42,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class YarnClusterClientFactory extends AbstractContainerizedClusterClientFactory<ApplicationId> {
 
+	public static final String ERROR_MESSAGE =
+			"No Executor found. Please make sure to export the HADOOP_CLASSPATH environment variable " +
+					"or have hadoop in your classpath. For more information refer to the \"Deployment & Operations\" " +
+					"section of the official Apache Flink documentation.";
+
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FallbackYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FallbackYarnSessionCli.java
new file mode 100644
index 0000000..28c9816
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FallbackYarnSessionCli.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.cli.AbstractCustomCommandLine;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+/**
+ * A stub Yarn Command Line to throw an exception with the correct
+ * message when the {@code HADOOP_CLASSPATH} is not set.
+ */
+@Internal
+public class FallbackYarnSessionCli extends AbstractCustomCommandLine {
+
+	public static final String ID = "yarn-cluster";
+
+	private final Option applicationId;
+
+	public FallbackYarnSessionCli(Configuration configuration) {
+		super(configuration);
+		applicationId = new Option("yid", "yarnapplicationId", true, "Attach to running YARN session");
+	}
+
+	@Override
+	public void addGeneralOptions(Options baseOptions) {
+		super.addGeneralOptions(baseOptions);
+		baseOptions.addOption(applicationId);
+	}
+
+	@Override
+	public boolean isActive(CommandLine commandLine) {
+		if (originalIsActive(commandLine)) {
+			throw new IllegalStateException(YarnClusterClientFactory.ERROR_MESSAGE);
+		}
+		return false;
+	}
+
+	private boolean originalIsActive(CommandLine commandLine) {
+		final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
+		final boolean yarnJobManager = ID.equals(jobManagerOption);
+		final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
+				|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
+		final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
+				|| YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
+		return hasYarnExecutor || yarnJobManager || hasYarnAppId;
+	}
+
+	@Override
+	public String getId() {
+		return ID;
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
index 48b8473..3054941 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.yarn.YarnClusterClientFactory;
 
 import javax.annotation.Nonnull;
 
@@ -44,6 +45,10 @@ public class YarnJobClusterExecutorFactory implements PipelineExecutorFactory {
 
 	@Override
 	public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
-		return new YarnJobClusterExecutor();
+		try {
+			return new YarnJobClusterExecutor();
+		} catch (NoClassDefFoundError e) {
+			throw new IllegalStateException(YarnClusterClientFactory.ERROR_MESSAGE);
+		}
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
index e5bf2ee..da54f1f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.yarn.YarnClusterClientFactory;
 
 import javax.annotation.Nonnull;
 
@@ -44,6 +45,10 @@ public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactor
 
 	@Override
 	public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
-		return new YarnSessionClusterExecutor();
+		try {
+			return new YarnSessionClusterExecutor();
+		} catch (NoClassDefFoundError e) {
+			throw new IllegalStateException(YarnClusterClientFactory.ERROR_MESSAGE);
+		}
 	}
 }
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FallbackYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FallbackYarnSessionCliTest.java
new file mode 100644
index 0000000..94429d6
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FallbackYarnSessionCliTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.yarn.cli.FallbackYarnSessionCli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for the {@link FallbackYarnSessionCliTest}.
+ */
+public class FallbackYarnSessionCliTest {
+
+	@Test(expected = IllegalStateException.class)
+	public void testExceptionWhenActiveWithYarnApplicationId() throws ParseException {
+		checkIfYarnFallbackCLIisActiveWithCLIArgs(
+				"run",
+				"-yid", ApplicationId.newInstance(0L, 0).toString());
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testExceptionWhenActiveWithExplicitClusterType() throws ParseException {
+		checkIfYarnFallbackCLIisActiveWithCLIArgs(
+				"run",
+				"-m", FallbackYarnSessionCli.ID);
+	}
+
+	@Test
+	public void testFalseWhenNotActive() throws ParseException {
+		final boolean isActive = checkIfYarnFallbackCLIisActiveWithCLIArgs("run");
+		assertFalse(isActive);
+	}
+
+	private boolean checkIfYarnFallbackCLIisActiveWithCLIArgs(final String... args) throws ParseException {
+		final Options options = new Options();
+		final FallbackYarnSessionCli cliUnderTest =
+				new FallbackYarnSessionCli(new Configuration());
+		cliUnderTest.addGeneralOptions(options);
+
+		final CommandLineParser parser = new DefaultParser();
+		final CommandLine cmd = parser.parse(options, args);
+		return cliUnderTest.isActive(cmd);
+	}
+}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a3831d6..0f328fa 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -193,7 +193,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 				argsUnderTest,
 				true);
 
-		final CustomCommandLine customCommandLine = cli.getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine customCommandLine = cli.validateAndGetActiveCommandLine(commandLine);
 		assertTrue(customCommandLine instanceof ExecutorCLI);
 	}