You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/01/10 12:06:28 UTC

[flink] branch master updated: [FLINK-11272][flink-yarn] Support for parsing multiple --yarnship parameters

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f692d37  [FLINK-11272][flink-yarn] Support for parsing multiple --yarnship parameters
f692d37 is described below

commit f692d372f7cb1ecd04ad82d9d546fe68c702ce05
Author: leesf <49...@qq.com>
AuthorDate: Mon Jan 7 20:17:22 2019 +0800

    [FLINK-11272][flink-yarn] Support for parsing multiple --yarnship parameters
---
 docs/ops/cli.md                                         |  3 ++-
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java  | 16 +++++++++-------
 .../org/apache/flink/yarn/FlinkYarnSessionCliTest.java  | 17 +++++++++++++++++
 3 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 7da2a09..ec3596f 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -295,7 +295,8 @@ Action "run" compiles and runs a program.
      -ys,--yarnslots <arg>                Number of slots per TaskManager
      -yst,--yarnstreaming                 Start Flink in streaming mode
      -yt,--yarnship <arg>                 Ship files in the specified directory
-                                          (t for transfer)
+                                          (t for transfer), multiple options are 
+                                          supported.
      -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container
                                           with optional unit (default: MB)
      -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 931b22c..9a05c16 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -311,14 +311,16 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 
 		List<File> shipFiles = new ArrayList<>();
-		// path to directory to ship
+		// path to directories to ship
 		if (cmd.hasOption(shipPath.getOpt())) {
-			String shipPath = cmd.getOptionValue(this.shipPath.getOpt());
-			File shipDir = new File(shipPath);
-			if (shipDir.isDirectory()) {
-				shipFiles.add(shipDir);
-			} else {
-				LOG.warn("Ship directory is not a directory. Ignoring it.");
+			String[] shipPaths = cmd.getOptionValues(this.shipPath.getOpt());
+			for (String shipPath : shipPaths) {
+				File shipDir = new File(shipPath);
+				if (shipDir.isDirectory()) {
+					shipFiles.add(shipDir);
+				} else {
+					LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
+				}
 			}
 		}
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index aa958e2..3ee3654 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -450,6 +450,23 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
 	}
 
+	@Test
+	public void testMultipleYarnShipOptions() throws Exception {
+		final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
+		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
+			new Configuration(),
+			tmp.getRoot().getAbsolutePath(),
+			"y",
+			"yarn");
+
+		final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+		AbstractYarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);
+
+		assertEquals(2, flinkYarnDescriptor.shipFiles.size());
+
+	}
+
 
 	///////////
 	// Utils //