You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/08/18 07:46:20 UTC

[flink] branch master updated: [FLINK-18752][yarn] Allow shipping single files for yarn execution

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e663c4  [FLINK-18752][yarn] Allow shipping single files for yarn execution
8e663c4 is described below

commit 8e663c4a1f08e8d0223441a897932ce5a1acdcc9
Author: Gyula Fora <gy...@cloudera.com>
AuthorDate: Wed Jul 29 10:06:42 2020 +0200

    [FLINK-18752][yarn] Allow shipping single files for yarn execution
    
    This closes #13103.
---
 .../generated/yarn_config_configuration.html       |  4 +--
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  2 +-
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 29 ++++++++++--------
 .../yarn/configuration/YarnConfigOptions.java      |  7 +++--
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 35 ++++++++++++++++++++++
 5 files changed, 58 insertions(+), 19 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 7173a0e..daea939 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -141,10 +141,10 @@
             <td>A semicolon-separated list of archives to be shipped to the YARN cluster. These archives will be un-packed when localizing and they can be any of the following types: ".tar.gz", ".tar", ".tgz", ".dst", ".jar", ".zip".</td>
         </tr>
         <tr>
-            <td><h5>yarn.ship-directories</h5></td>
+            <td><h5>yarn.ship-files</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>List&lt;String&gt;</td>
-            <td>A semicolon-separated list of directories to be shipped to the YARN cluster.</td>
+            <td>A semicolon-separated list of files and/or directories to be shipped to the YARN cluster.</td>
         </tr>
         <tr>
             <td><h5>yarn.staging-directory</h5></td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 5d6a2bc..5a1d0a2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -173,7 +173,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
 		getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
-		decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_DIRECTORIES).ifPresent(this::addShipFiles);
+		decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES).ifPresent(this::addShipFiles);
 		decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_ARCHIVES).ifPresent(this::addShipArchives);
 
 		this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 235c127..d4e43d0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
@@ -272,23 +273,25 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		return new Path(userPath);
 	}
 
-	private void encodeDirsToShipToCluster(final Configuration configuration, final CommandLine cmd) {
+	private void encodeFilesToShipToCluster(final Configuration configuration, final CommandLine cmd) throws ConfigurationException {
 		checkNotNull(cmd);
 		checkNotNull(configuration);
 
 		if (cmd.hasOption(shipPath.getOpt())) {
+			String[] shipFiles = cmd.getOptionValues(this.shipPath.getOpt());
+
+			for (String path : shipFiles) {
+				final File shipFile = new File(path);
+				if (!shipFile.exists()) {
+					throw new ConfigurationException("Ship file " + path + " does not exist");
+				}
+			}
+
 			ConfigUtils.encodeArrayToConfig(
 					configuration,
-					YarnConfigOptions.SHIP_DIRECTORIES,
-					cmd.getOptionValues(this.shipPath.getOpt()),
-					(String path) -> {
-						final File shipDir = new File(path);
-						if (shipDir.isDirectory()) {
-							return path;
-						}
-						LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
-						return null;
-					});
+					YarnConfigOptions.SHIP_FILES,
+					shipFiles,
+					f -> f);
 		}
 	}
 
@@ -392,7 +395,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		return null;
 	}
 
-	private void applyDescriptorOptionToConfig(final CommandLine commandLine, final Configuration configuration) {
+	private void applyDescriptorOptionToConfig(final CommandLine commandLine, final Configuration configuration) throws ConfigurationException {
 		checkNotNull(commandLine);
 		checkNotNull(configuration);
 
@@ -401,7 +404,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 			configuration.setString(YarnConfigOptions.FLINK_DIST_JAR, localJarPath.toString());
 		}
 
-		encodeDirsToShipToCluster(configuration, commandLine);
+		encodeFilesToShipToCluster(configuration, commandLine);
 
 		if (commandLine.hasOption(queue.getOpt())) {
 			final String queueName = commandLine.getOptionValue(queue.getOpt());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 6d92996..cbb699b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -202,12 +202,13 @@ public class YarnConfigOptions {
 			.noDefaultValue()
 			.withDescription("Staging directory used to store YARN files while submitting applications. Per default, it uses the home directory of the configured file system.");
 
-	public static final ConfigOption<List<String>> SHIP_DIRECTORIES =
-			key("yarn.ship-directories")
+	public static final ConfigOption<List<String>> SHIP_FILES =
+			key("yarn.ship-files")
 				.stringType()
 				.asList()
 				.noDefaultValue()
-				.withDescription("A semicolon-separated list of directories to be shipped to the YARN cluster.");
+				.withDeprecatedKeys("yarn.ship-directories")
+				.withDescription("A semicolon-separated list of files and/or directories to be shipped to the YARN cluster.");
 
 	public static final ConfigOption<List<String>> SHIP_ARCHIVES =
 			key("yarn.ship-archives")
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 5818023..a0ab312 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -36,12 +36,15 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -61,6 +64,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link FlinkYarnSessionCli}.
@@ -466,6 +470,37 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 	}
 
+	@Test
+	public void testShipFiles() throws Exception {
+		File tmpFile = tmp.newFile();
+		final String[] args = new String[]{"run", "--yarnship", tmpFile.toString()};
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
+
+		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+		final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+		final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
+		YarnClusterDescriptor flinkYarnDescriptor = (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
+
+		assertEquals(Lists.newArrayList(tmpFile), flinkYarnDescriptor.getShipFiles());
+	}
+
+	@Test
+	public void testMissingShipFiles() throws Exception {
+		File tmpFile = tmp.newFile();
+		final String[] args = new String[]{"run", "--yarnship", tmpFile.toString(), "--yarnship", "missing.file"};
+		final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli();
+
+		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+		try {
+			flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
+			fail("Expected error for missing file");
+		} catch (ConfigurationException ce) {
+			assertEquals("Ship file missing.file does not exist", ce.getMessage());
+		}
+	}
+
 	///////////
 	// Utils //
 	///////////