You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/18 07:46:20 UTC
[flink] branch master updated: [FLINK-18752][yarn] Allow shipping
single files for yarn execution
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8e663c4 [FLINK-18752][yarn] Allow shipping single files for yarn execution
8e663c4 is described below
commit 8e663c4a1f08e8d0223441a897932ce5a1acdcc9
Author: Gyula Fora <gy...@cloudera.com>
AuthorDate: Wed Jul 29 10:06:42 2020 +0200
[FLINK-18752][yarn] Allow shipping single files for yarn execution
This closes #13103.
---
.../generated/yarn_config_configuration.html | 4 +--
.../apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 29 ++++++++++--------
.../yarn/configuration/YarnConfigOptions.java | 7 +++--
.../apache/flink/yarn/FlinkYarnSessionCliTest.java | 35 ++++++++++++++++++++++
5 files changed, 58 insertions(+), 19 deletions(-)
diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 7173a0e..daea939 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -141,10 +141,10 @@
<td>A semicolon-separated list of archives to be shipped to the YARN cluster. These archives will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip".</td>
</tr>
<tr>
- <td><h5>yarn.ship-directories</h5></td>
+ <td><h5>yarn.ship-files</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List<String></td>
- <td>A semicolon-separated list of directories to be shipped to the YARN cluster.</td>
+ <td>A semicolon-separated list of files and/or directories to be shipped to the YARN cluster.</td>
</tr>
<tr>
<td><h5>yarn.staging-directory</h5></td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 5d6a2bc..5a1d0a2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -173,7 +173,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
- decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_DIRECTORIES).ifPresent(this::addShipFiles);
+ decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES).ifPresent(this::addShipFiles);
decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_ARCHIVES).ifPresent(this::addShipArchives);
this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 235c127..d4e43d0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
@@ -272,23 +273,25 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
return new Path(userPath);
}
- private void encodeDirsToShipToCluster(final Configuration configuration, final CommandLine cmd) {
+ private void encodeFilesToShipToCluster(final Configuration configuration, final CommandLine cmd) throws ConfigurationException {
checkNotNull(cmd);
checkNotNull(configuration);
if (cmd.hasOption(shipPath.getOpt())) {
+ String[] shipFiles = cmd.getOptionValues(this.shipPath.getOpt());
+
+ for (String path : shipFiles) {
+ final File shipFile = new File(path);
+ if (!shipFile.exists()) {
+ throw new ConfigurationException("Ship file " + path + " does not exist");
+ }
+ }
+
ConfigUtils.encodeArrayToConfig(
configuration,
- YarnConfigOptions.SHIP_DIRECTORIES,
- cmd.getOptionValues(this.shipPath.getOpt()),
- (String path) -> {
- final File shipDir = new File(path);
- if (shipDir.isDirectory()) {
- return path;
- }
- LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
- return null;
- });
+ YarnConfigOptions.SHIP_FILES,
+ shipFiles,
+ f -> f);
}
}
@@ -392,7 +395,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
return null;
}
- private void applyDescriptorOptionToConfig(final CommandLine commandLine, final Configuration configuration) {
+ private void applyDescriptorOptionToConfig(final CommandLine commandLine, final Configuration configuration) throws ConfigurationException {
checkNotNull(commandLine);
checkNotNull(configuration);
@@ -401,7 +404,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
configuration.setString(YarnConfigOptions.FLINK_DIST_JAR, localJarPath.toString());
}
- encodeDirsToShipToCluster(configuration, commandLine);
+ encodeFilesToShipToCluster(configuration, commandLine);
if (commandLine.hasOption(queue.getOpt())) {
final String queueName = commandLine.getOptionValue(queue.getOpt());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 6d92996..cbb699b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -202,12 +202,13 @@ public class YarnConfigOptions {
.noDefaultValue()
.withDescription("Staging directory used to store YARN files while submitting applications. Per default, it uses the home directory of the configured file system.");
- public static final ConfigOption<List<String>> SHIP_DIRECTORIES =
- key("yarn.ship-directories")
+ public static final ConfigOption<List<String>> SHIP_FILES =
+ key("yarn.ship-files")
.stringType()
.asList()
.noDefaultValue()
- .withDescription("A semicolon-separated list of directories to be shipped to the YARN cluster.");
+ .withDeprecatedKeys("yarn.ship-directories")
+ .withDescription("A semicolon-separated list of files and/or directories to be shipped to the YARN cluster.");
public static final ConfigOption<List<String>> SHIP_ARCHIVES =
key("yarn.ship-archives")
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 5818023..a0ab312 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -36,12 +36,15 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
@@ -61,6 +64,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link FlinkYarnSessionCli}.
@@ -466,6 +470,37 @@ public class FlinkYarnSessionCliTest extends TestLogger {
}
+ @Test
+ public void testShipFiles() throws Exception {
+ File tmpFile = tmp.newFile();
+ final String[] args = new String[]{"run", "--yarnship", tmpFile.toString()};
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
+
+ final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+ final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+ final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+ YarnClusterDescriptor flinkYarnDescriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
+
+ assertEquals(Lists.newArrayList(tmpFile), flinkYarnDescriptor.getShipFiles());
+ }
+
+ @Test
+ public void testMissingShipFiles() throws Exception {
+ File tmpFile = tmp.newFile();
+ final String[] args = new String[]{"run", "--yarnship", tmpFile.toString(), "--yarnship", "missing.file"};
+ final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
+
+ final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+ try {
+ flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+ fail("Expected error for missing file");
+ } catch (ConfigurationException ce) {
+ assertEquals("Ship file missing.file does not exist", ce.getMessage());
+ }
+ }
+
///////////
// Utils //
///////////