You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/29 07:50:12 UTC
[flink] branch release-1.10 updated: [FLINK-17819][yarn] Fix error
msg for yarn deployments when hadoop not in classpath
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 8730b5b [FLINK-17819][yarn] Fix error msg for yarn deployments when hadoop not in classpath
8730b5b is described below
commit 8730b5b73a43a75a70627d1ab60231bb3e06afc2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu May 28 20:48:26 2020 +0200
[FLINK-17819][yarn] Fix error msg for yarn deployments when hadoop not in classpath
This closes #12395.
---
.../org/apache/flink/client/cli/CliFrontend.java | 45 ++++++++-----
.../DefaultClusterClientServiceLoader.java | 10 ++-
.../deployment/ClusterClientServiceLoaderTest.java | 4 +-
.../execution/DefaultExecutorServiceLoader.java | 6 +-
.../org/apache/flink/api/scala/FlinkShell.scala | 4 +-
.../flink/yarn/YarnClusterClientFactory.java | 5 ++
.../flink/yarn/cli/FallbackYarnSessionCli.java | 78 ++++++++++++++++++++++
.../executors/YarnJobClusterExecutorFactory.java | 7 +-
.../YarnSessionClusterExecutorFactory.java | 7 +-
.../flink/yarn/FallbackYarnSessionCliTest.java | 69 +++++++++++++++++++
.../apache/flink/yarn/FlinkYarnSessionCliTest.java | 2 +-
11 files changed, 212 insertions(+), 25 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 38e4cd3..57b7a30 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -176,14 +176,17 @@ public class CliFrontend {
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
- final ProgramOptions programOptions = new ProgramOptions(commandLine);
-
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
+ final CustomCommandLine activeCommandLine =
+ validateAndGetActiveCommandLine(checkNotNull(commandLine));
+
+ final ProgramOptions programOptions = new ProgramOptions(commandLine);
+
if (!programOptions.isPython()) {
// Java program should be specified a JAR file
if (programOptions.getJarFilePath() == null) {
@@ -201,8 +204,8 @@ public class CliFrontend {
}
final List<URL> jobJars = program.getJobJarAndDependencies();
- final Configuration effectiveConfiguration =
- getEffectiveConfiguration(commandLine, programOptions, jobJars);
+ final Configuration effectiveConfiguration = getEffectiveConfiguration(
+ activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
@@ -214,16 +217,18 @@ public class CliFrontend {
}
private Configuration getEffectiveConfiguration(
+ final CustomCommandLine activeCustomCommandLine,
final CommandLine commandLine,
final ProgramOptions programOptions,
final List<URL> jobJars) throws FlinkException {
- final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine));
final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(
checkNotNull(programOptions),
checkNotNull(jobJars));
- final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+ final Configuration executorConfig = checkNotNull(activeCustomCommandLine)
+ .applyCommandLineOptionsToConfiguration(commandLine);
+
final Configuration effectiveConfiguration = new Configuration(executorConfig);
return executionParameters.applyToConfiguration(effectiveConfiguration);
}
@@ -265,8 +270,11 @@ public class CliFrontend {
LOG.info("Creating program plan dump");
- final Configuration effectiveConfiguration =
- getEffectiveConfiguration(commandLine, programOptions, program.getJobJarAndDependencies());
+ final CustomCommandLine activeCommandLine =
+ validateAndGetActiveCommandLine(checkNotNull(commandLine));
+
+ final Configuration effectiveConfiguration = getEffectiveConfiguration(
+ activeCommandLine, commandLine, programOptions, program.getJobJarAndDependencies());
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true);
String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
@@ -329,7 +337,7 @@ public class CliFrontend {
showAll = listOptions.showAll();
}
- final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
runClusterAction(
activeCommandLine,
@@ -446,7 +454,7 @@ public class CliFrontend {
logAndSysout((advanceToEndOfEventTime ? "Draining job " : "Suspending job ") + "\"" + jobId + "\" with a savepoint.");
- final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
runClusterAction(
activeCommandLine,
commandLine,
@@ -480,7 +488,7 @@ public class CliFrontend {
return;
}
- final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
final String[] cleanedArgs = cancelOptions.getArgs();
@@ -570,7 +578,7 @@ public class CliFrontend {
return;
}
- final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
if (savepointOptions.isDispose()) {
runClusterAction(
@@ -1032,7 +1040,14 @@ public class CliFrontend {
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
- LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
+ final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
+ try {
+ LOG.info("Loading FallbackYarnSessionCli");
+ customCommandLines.add(
+ loadCustomCommandLine(errorYarnSessionCLI, configuration));
+ } catch (Exception exception) {
+ LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
+ }
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
@@ -1051,7 +1066,7 @@ public class CliFrontend {
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
- public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+ public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
@@ -1059,7 +1074,7 @@ public class CliFrontend {
return cli;
}
}
- throw new IllegalStateException("No command-line ran.");
+ throw new IllegalStateException("No valid command-line found.");
}
/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index 574aeaf..b2c43a2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -70,6 +70,14 @@ public class DefaultClusterClientServiceLoader implements ClusterClientServiceLo
throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
}
- return compatibleFactories.isEmpty() ? null : (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
+ if (compatibleFactories.isEmpty()) {
+ throw new IllegalStateException(
+ "No ClusterClientFactory found. If you were targeting a Yarn cluster, " +
+ "please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your " +
+ "classpath. For more information refer to the \"Deployment & Operations\" section of the official " +
+ "Apache Flink documentation.");
+ }
+
+ return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
}
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index 3450729..b941c12 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -32,7 +32,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -85,13 +84,12 @@ public class ClusterClientServiceLoaderTest {
fail();
}
- @Test
+ @Test(expected = IllegalStateException.class)
public void testNoFactoriesFound() {
final Configuration config = new Configuration();
config.setString(DeploymentOptions.TARGET, NON_EXISTING_TARGET);
final ClusterClientFactory<Integer> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
- assertNull(factory);
}
/**
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index e146b08..6b07f7f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -81,7 +81,11 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad
throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
}
- return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
+ if (compatibleFactories.isEmpty()) {
+ throw new IllegalStateException("No ExecutorFactory found to execute the application.");
+ }
+
+ return compatibleFactories.get(0);
}
@Override
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 8f2dc9c..d3f65c2 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -244,7 +244,7 @@ object FlinkShell {
frontend.getCustomCommandLineOptions)
val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
- val customCLI = frontend.getActiveCustomCommandLine(commandLine)
+ val customCLI = frontend.validateAndGetActiveCommandLine(commandLine)
val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine)
val serviceLoader = new DefaultClusterClientServiceLoader
@@ -283,7 +283,7 @@ object FlinkShell {
frontend.getCustomCommandLineOptions)
val commandLine = CliFrontendParser.parse(commandLineOptions, args, true)
- val customCLI = frontend.getActiveCustomCommandLine(commandLine)
+ val customCLI = frontend.validateAndGetActiveCommandLine(commandLine)
val executorConfig = customCLI.applyCommandLineOptionsToConfiguration(commandLine);
(executorConfig, None)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 0b5beb2..90b1eca 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -42,6 +42,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class YarnClusterClientFactory extends AbstractContainerizedClusterClientFactory<ApplicationId> {
+ public static final String ERROR_MESSAGE =
+ "No Executor found. Please make sure to export the HADOOP_CLASSPATH environment variable " +
+ "or have hadoop in your classpath. For more information refer to the \"Deployment & Operations\" " +
+ "section of the official Apache Flink documentation.";
+
@Override
public boolean isCompatibleWith(Configuration configuration) {
checkNotNull(configuration);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FallbackYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FallbackYarnSessionCli.java
new file mode 100644
index 0000000..28c9816
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FallbackYarnSessionCli.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.cli.AbstractCustomCommandLine;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+/**
+ * A stub Yarn Command Line to throw an exception with the correct
+ * message when the {@code HADOOP_CLASSPATH} is not set.
+ */
+@Internal
+public class FallbackYarnSessionCli extends AbstractCustomCommandLine {
+
+ public static final String ID = "yarn-cluster";
+
+ private final Option applicationId;
+
+ public FallbackYarnSessionCli(Configuration configuration) {
+ super(configuration);
+ applicationId = new Option("yid", "yarnapplicationId", true, "Attach to running YARN session");
+ }
+
+ @Override
+ public void addGeneralOptions(Options baseOptions) {
+ super.addGeneralOptions(baseOptions);
+ baseOptions.addOption(applicationId);
+ }
+
+ @Override
+ public boolean isActive(CommandLine commandLine) {
+ if (originalIsActive(commandLine)) {
+ throw new IllegalStateException(YarnClusterClientFactory.ERROR_MESSAGE);
+ }
+ return false;
+ }
+
+ private boolean originalIsActive(CommandLine commandLine) {
+ final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
+ final boolean yarnJobManager = ID.equals(jobManagerOption);
+ final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
+ || configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
+ final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
+ || YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
+ return hasYarnExecutor || yarnJobManager || hasYarnAppId;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
index 48b8473..3054941 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.yarn.YarnClusterClientFactory;
import javax.annotation.Nonnull;
@@ -44,6 +45,10 @@ public class YarnJobClusterExecutorFactory implements PipelineExecutorFactory {
@Override
public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
- return new YarnJobClusterExecutor();
+ try {
+ return new YarnJobClusterExecutor();
+ } catch (NoClassDefFoundError e) {
+ throw new IllegalStateException(YarnClusterClientFactory.ERROR_MESSAGE);
+ }
}
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
index e5bf2ee..da54f1f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.yarn.YarnClusterClientFactory;
import javax.annotation.Nonnull;
@@ -44,6 +45,10 @@ public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactor
@Override
public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
- return new YarnSessionClusterExecutor();
+ try {
+ return new YarnSessionClusterExecutor();
+ } catch (NoClassDefFoundError e) {
+ throw new IllegalStateException(YarnClusterClientFactory.ERROR_MESSAGE);
+ }
}
}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FallbackYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FallbackYarnSessionCliTest.java
new file mode 100644
index 0000000..94429d6
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FallbackYarnSessionCliTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.yarn.cli.FallbackYarnSessionCli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for the {@link FallbackYarnSessionCliTest}.
+ */
+public class FallbackYarnSessionCliTest {
+
+ @Test(expected = IllegalStateException.class)
+ public void testExceptionWhenActiveWithYarnApplicationId() throws ParseException {
+ checkIfYarnFallbackCLIisActiveWithCLIArgs(
+ "run",
+ "-yid", ApplicationId.newInstance(0L, 0).toString());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testExceptionWhenActiveWithExplicitClusterType() throws ParseException {
+ checkIfYarnFallbackCLIisActiveWithCLIArgs(
+ "run",
+ "-m", FallbackYarnSessionCli.ID);
+ }
+
+ @Test
+ public void testFalseWhenNotActive() throws ParseException {
+ final boolean isActive = checkIfYarnFallbackCLIisActiveWithCLIArgs("run");
+ assertFalse(isActive);
+ }
+
+ private boolean checkIfYarnFallbackCLIisActiveWithCLIArgs(final String... args) throws ParseException {
+ final Options options = new Options();
+ final FallbackYarnSessionCli cliUnderTest =
+ new FallbackYarnSessionCli(new Configuration());
+ cliUnderTest.addGeneralOptions(options);
+
+ final CommandLineParser parser = new DefaultParser();
+ final CommandLine cmd = parser.parse(options, args);
+ return cliUnderTest.isActive(cmd);
+ }
+}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a3831d6..0f328fa 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -193,7 +193,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
argsUnderTest,
true);
- final CustomCommandLine customCommandLine = cli.getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine customCommandLine = cli.validateAndGetActiveCommandLine(commandLine);
assertTrue(customCommandLine instanceof ExecutorCLI);
}