You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/19 11:25:17 UTC

[GitHub] yanghua closed pull request #5617: [FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable

yanghua closed pull request #5617: [FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable
URL: https://github.com/apache/flink/pull/5617
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 4d2aaa02efe..ff74e4d358e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -21,15 +21,12 @@
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the
@@ -50,25 +47,6 @@ public TestingYarnClusterDescriptor(
 			configurationDirectory,
 			yarnClient,
 			sharedYarnClient);
-		List<File> filesToShip = new ArrayList<>();
-
-		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
-		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
-			"Make sure to package the flink-yarn-tests module.");
-
-		File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
-		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
-			"jar. Make sure to package the flink-runtime module.");
-
-		File testingYarnJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn"));
-		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-yarn tests " +
-			"jar. Make sure to package the flink-yarn module.");
-
-		filesToShip.add(testingJar);
-		filesToShip.add(testingRuntimeJar);
-		filesToShip.add(testingYarnJar);
-
-		addShipFiles(filesToShip);
 	}
 
 	@Override
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index f9c03f93784..18e8e6b58a5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -34,12 +34,13 @@
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
@@ -50,7 +51,6 @@
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -110,29 +110,50 @@ public void testMultipleAMKill() throws Exception {
 		final int numberKillingAttempts = numberApplicationAttempts - 1;
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(
-			configuration,
-			getYarnConfiguration(),
-			confDirPath,
-			getYarnClient(),
-			true);
 
-		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+		configuration.setString(YarnConfigOptions.FLINK_JAR, flinkUberjar.getAbsolutePath());
 
-		String fsStateHandlePath = temp.getRoot().getPath();
+		StringBuilder sb = new StringBuilder();
+		for (File file : flinkLibFolder.listFiles()) {
+			sb.append(file.getAbsolutePath());
+			sb.append(",");
+		}
 
-		// load the configuration
-		File configDirectory = new File(confDirPath);
-		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+		String linkedShipFiles = sb.toString();
+
+		File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
+			"Make sure to package the flink-yarn-tests module.");
+		linkedShipFiles += testingJar.getAbsolutePath();
+
+		File testingRuntimeJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-runtime"));
+		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
+			"jar. Make sure to package the flink-runtime module.");
+		linkedShipFiles += ("," + testingRuntimeJar.getAbsolutePath());
+
+		File testingYarnJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn"));
+		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-yarn tests " +
+			"jar. Make sure to package the flink-yarn module.");
+		linkedShipFiles += ("," + testingYarnJar.getAbsolutePath());
 
-		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
+		configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, linkedShipFiles);
+
+		String fsStateHandlePath = temp.getRoot().getPath();
+		configuration.setString(YarnConfigOptions.DYNAMIC_PROPERTIES_ENCODED, "recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
 			"@@" + CheckpointingOptions.STATE_BACKEND.key() + "=FILESYSTEM" +
 			"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
 			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
 
+		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(
+			configuration, getYarnConfiguration(), confDirPath, getYarnClient(), true);
+
+		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
+
+		// load the configuration
+		File configDirectory = new File(confDirPath);
+		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
+
 		ClusterClient<ApplicationId> yarnCluster = null;
 
 		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index ef6706ad5fe..9c193d91889 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -27,8 +27,8 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
@@ -36,7 +36,6 @@
 import org.junit.Test;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.Random;
 
 /**
@@ -55,7 +54,19 @@ public static void setup() {
 	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
-		final YarnClient yarnClient = getYarnClient();
+		configuration.setString(YarnConfigOptions.FLINK_JAR, flinkUberjar.getAbsolutePath());
+
+		StringBuilder sb = new StringBuilder();
+		for (File file : flinkLibFolder.listFiles()) {
+			sb.append(file.getAbsolutePath());
+			sb.append(",");
+		}
+
+		String linkedShipFiles = sb.toString();
+		linkedShipFiles = linkedShipFiles.substring(0, linkedShipFiles.length() - 1);
+		configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, linkedShipFiles);
+
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 
 		try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
 			configuration,
@@ -64,9 +75,6 @@ public void testPerJobMode() throws Exception {
 			yarnClient,
 			true)) {
 
-			flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-			flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
 			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 				.setMasterMemoryMB(768)
 				.setTaskManagerMemoryMB(1024)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index b3dcaca1459..5978ad64ce9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -27,7 +27,6 @@
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -46,7 +45,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -231,6 +229,17 @@ public void testJavaAPI() throws Exception {
 
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		Configuration configuration = GlobalConfiguration.loadConfiguration();
+		configuration.setString(YarnConfigOptions.FLINK_JAR, flinkUberjar.getAbsolutePath());
+
+		StringBuilder sb = new StringBuilder();
+		for (File file : flinkLibFolder.listFiles()) {
+			sb.append(file.getAbsolutePath());
+			sb.append(",");
+		}
+
+		String linkedShipFiles = sb.toString();
+		linkedShipFiles = linkedShipFiles.substring(0, linkedShipFiles.length() - 1);
+		configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, linkedShipFiles);
 
 		try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
@@ -239,8 +248,6 @@ public void testJavaAPI() throws Exception {
 			getYarnClient(),
 			true)) {
 			Assert.assertNotNull("unable to get yarn client", clusterDescriptor);
-			clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-			clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
 			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 				.setMasterMemoryMB(768)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2a1b099399a..fe11ab5454e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -37,8 +37,8 @@
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -51,7 +51,6 @@
 
 import java.io.File;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -88,6 +87,17 @@ public void testFlinkContainerMemory() throws Exception {
 		configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
 		configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, (1L << 20));
 		configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, (4L << 20));
+		configuration.setString(YarnConfigOptions.FLINK_JAR, flinkUberjar.getAbsolutePath());
+
+		StringBuilder sb = new StringBuilder();
+		for (File file : flinkLibFolder.listFiles()) {
+			sb.append(file.getAbsolutePath());
+			sb.append(",");
+		}
+
+		String linkedShipFiles = sb.toString();
+		linkedShipFiles = linkedShipFiles.substring(0, linkedShipFiles.length() - 1);
+		configuration.setString(YarnConfigOptions.YARN_SHIP_PATHS, linkedShipFiles);
 
 		final YarnConfiguration yarnConfiguration = getYarnConfiguration();
 		final Flip6YarnClusterDescriptor clusterDescriptor = new Flip6YarnClusterDescriptor(
@@ -97,9 +107,6 @@ public void testFlinkContainerMemory() throws Exception {
 			yarnClient,
 			true);
 
-		clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
 		final File streamingWordCountFile = new File("target/programs/WindowJoin.jar");
 
 		assertThat(streamingWordCountFile.exists(), is(true));
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index bdb471a142f..91cd459ba91 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -79,10 +79,13 @@
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.SimpleFileVisitor;
@@ -99,6 +102,7 @@
 import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
+import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
 import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
 import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
@@ -122,12 +126,15 @@
 	/** True if the descriptor must not shut down the YarnClient. */
 	private final boolean sharedYarnClient;
 
+	@Nullable
 	private String yarnQueue;
 
 	private String configurationDirectory;
 
+	@Nullable
 	private Path flinkJarPath;
 
+	@Nullable
 	private String dynamicPropertiesEncoded;
 
 	/** Lazily initialized list of files to ship. */
@@ -137,8 +144,10 @@
 
 	private boolean detached;
 
+	@Nullable
 	private String customName;
 
+	@Nullable
 	private String zookeeperNamespace;
 
 	/** Optional Jar file to include in the system class loader of all application nodes
@@ -172,6 +181,88 @@ public AbstractYarnClusterDescriptor(
 		userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
 		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
+
+		String yarnQueueConfigValue = flinkConfiguration.getString(YarnConfigOptions.YARN_QUEUE);
+		if (yarnQueueConfigValue != null) {
+			this.yarnQueue = yarnQueueConfigValue;
+		}
+
+		String zkNamespaceConfigValue = flinkConfiguration.getString(HA_CLUSTER_ID);
+		if (zkNamespaceConfigValue != null) {
+			this.zookeeperNamespace = zkNamespaceConfigValue;
+		}
+
+		this.detached = flinkConfiguration.getBoolean(YarnConfigOptions.DETACHED_MODE);
+
+		String dynamicPropertiesEncodedConfigValue = flinkConfiguration.getString(YarnConfigOptions.DYNAMIC_PROPERTIES_ENCODED);
+		if (dynamicPropertiesEncodedConfigValue != null) {
+			this.dynamicPropertiesEncoded = dynamicPropertiesEncodedConfigValue;
+		}
+
+		String nameConfigValue = flinkConfiguration.getString(YarnConfigOptions.YARN_APPLICATION_NAME);
+		if (nameConfigValue != null) {
+			this.customName = nameConfigValue;
+		}
+
+		buildFlinkJarPath(flinkConfiguration);
+
+		this.addShipFiles();
+	}
+
+	private void buildFlinkJarPath(Configuration flinkConfiguration) {
+		String flinkJarPathStrConfigValue = flinkConfiguration.getString(YarnConfigOptions.FLINK_JAR);
+		final Path localJarPath;
+		if (flinkJarPathStrConfigValue != null) {
+			if (!flinkJarPathStrConfigValue.startsWith("file://")) {
+				flinkJarPathStrConfigValue = "file://" + flinkJarPathStrConfigValue;
+			}
+
+			if (!flinkJarPathStrConfigValue.endsWith("jar")) {
+				LOG.error("The passed jar path ('" + flinkJarPathStrConfigValue + "') does not end with the 'jar' extension");
+				localJarPath = null;
+			} else {
+				localJarPath = new Path(flinkJarPathStrConfigValue);
+			}
+		} else {
+			LOG.info("No path for the flink jar passed. Using the location of "
+				+ this.getClass() + " to locate the jar");
+			String encodedJarPath =
+				this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+
+			final String decodedPath;
+			try {
+				// we have to decode the url encoded parts of the path
+				decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
+			} catch (UnsupportedEncodingException e) {
+				throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
+					" Please supply a path manually via the jar option.");
+			}
+
+			// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
+			if (decodedPath.endsWith(".jar")) {
+				localJarPath = new Path(new File(decodedPath).toURI());
+			} else {
+				localJarPath = null;
+			}
+		}
+
+		if (localJarPath != null) {
+			this.flinkJarPath = localJarPath;
+		}
+	}
+
+	private void addShipFiles() {
+		List<File> shipFiles = new ArrayList<>();
+		// path to directory to ship
+		String shipPathConfigValue = flinkConfiguration.getString(YarnConfigOptions.YARN_SHIP_PATHS);
+		if (shipPathConfigValue != null) {
+			String[] shipPaths = shipPathConfigValue.split(",");
+			for (String shipPath : shipPaths) {
+				shipFiles.add(new File(shipPath));
+			}
+		}
+
+		this.shipFiles.addAll(shipFiles);
 	}
 
 	public YarnClient getYarnClient() {
@@ -194,34 +285,6 @@ public Configuration getFlinkConfiguration() {
 		return flinkConfiguration;
 	}
 
-	public void setQueue(String queue) {
-		this.yarnQueue = queue;
-	}
-
-	public void setLocalJarPath(Path localJarPath) {
-		if (!localJarPath.toString().endsWith("jar")) {
-			throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
-		}
-		this.flinkJarPath = localJarPath;
-	}
-
-	/**
-	 * Adds the given files to the list of files to ship.
-	 *
-	 * <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
-	 * {@link #uploadAndRegisterFiles(Collection, FileSystem, Path, ApplicationId, List, Map, StringBuilder)}
-	 * since we upload the Flink uber jar ourselves and do not need to deploy it multiple times.
-	 *
-	 * @param shipFiles files to ship
-	 */
-	public void addShipFiles(List<File> shipFiles) {
-		this.shipFiles.addAll(shipFiles);
-	}
-
-	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
-		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
-	}
-
 	/**
 	 * Returns true if the descriptor has the job jars to include in the classpath.
 	 */
@@ -247,7 +310,7 @@ public boolean hasUserJarFiles(List<URL> requiredJarFiles) {
 	/**
 	 * Sets the user jar which is included in the system classloader of all nodes.
 	 */
-	public void setProvidedUserJarFiles(List<URL> userJarFiles) {
+	private void setProvidedUserJarFiles(List<URL> userJarFiles) {
 		for (URL jarFile : userJarFiles) {
 			try {
 				this.userJarFiles.add(new File(jarFile.toURI()));
@@ -258,10 +321,21 @@ public void setProvidedUserJarFiles(List<URL> userJarFiles) {
 		}
 	}
 
+	@Nullable
 	public String getDynamicPropertiesEncoded() {
 		return this.dynamicPropertiesEncoded;
 	}
 
+	@Nullable
+	public Path getFlinkJarPath() {
+		return flinkJarPath;
+	}
+
+	@Nullable
+	public String getCustomName() {
+		return customName;
+	}
+
 	private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws YarnDeploymentException {
 
 		if (clusterSpecification.getNumberTaskManagers() <= 0) {
@@ -311,10 +385,6 @@ private static boolean allocateResource(int[] nodeManagers, int toAllocate) {
 		return false;
 	}
 
-	public void setDetachedMode(boolean detachedMode) {
-		this.detached = detachedMode;
-	}
-
 	public boolean isDetachedMode() {
 		return detached;
 	}
@@ -323,10 +393,6 @@ public String getZookeeperNamespace() {
 		return zookeeperNamespace;
 	}
 
-	public void setZookeeperNamespace(String zookeeperNamespace) {
-		this.zookeeperNamespace = zookeeperNamespace;
-	}
-
 	// -------------------------------------------------------------
 	// Lifecycle management
 	// -------------------------------------------------------------
@@ -708,7 +774,7 @@ public ApplicationReport startAppMaster(
 		if (zkNamespace == null || zkNamespace.isEmpty()) {
 			// namespace defined in config? else use applicationId as default.
 			zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
-			setZookeeperNamespace(zkNamespace);
+			this.zookeeperNamespace = zkNamespace;
 		}
 
 		configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
@@ -1267,13 +1333,6 @@ public String getClusterDescription() {
 		}
 	}
 
-	public void setName(String name) {
-		if (name == null) {
-			throw new IllegalArgumentException("The passed name is null");
-		}
-		customName = name;
-	}
-
 	private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
 		InvocationTargetException, IllegalAccessException {
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 2cdc19d3c93..5b769dd1f62 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -49,7 +49,6 @@
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -70,10 +69,6 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -190,7 +185,7 @@ public FlinkYarnSessionCli(
 		query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
 		applicationId = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
 		queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
-		shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
+		shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory, multi directory split them with comma (t for transfer)");
 		flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
 		jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
 		tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
@@ -276,98 +271,6 @@ private AbstractYarnClusterDescriptor createDescriptor(
 			yarnConfiguration,
 			configurationDirectory);
 
-		// Jar Path
-		final Path localJarPath;
-		if (cmd.hasOption(flinkJar.getOpt())) {
-			String userPath = cmd.getOptionValue(flinkJar.getOpt());
-			if (!userPath.startsWith("file://")) {
-				userPath = "file://" + userPath;
-			}
-			localJarPath = new Path(userPath);
-		} else {
-			LOG.info("No path for the flink jar passed. Using the location of "
-				+ yarnClusterDescriptor.getClass() + " to locate the jar");
-			String encodedJarPath =
-				yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
-
-			final String decodedPath;
-			try {
-				// we have to decode the url encoded parts of the path
-				decodedPath = URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
-			} catch (UnsupportedEncodingException e) {
-				throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath +
-					" Please supply a path manually via the -" + flinkJar.getOpt() + " option.");
-			}
-
-			// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
-			if (decodedPath.endsWith(".jar")) {
-				localJarPath = new Path(new File(decodedPath).toURI());
-			} else {
-				localJarPath = null;
-			}
-		}
-
-		if (localJarPath != null) {
-			yarnClusterDescriptor.setLocalJarPath(localJarPath);
-		}
-
-		List<File> shipFiles = new ArrayList<>();
-		// path to directory to ship
-		if (cmd.hasOption(shipPath.getOpt())) {
-			String shipPath = cmd.getOptionValue(this.shipPath.getOpt());
-			File shipDir = new File(shipPath);
-			if (shipDir.isDirectory()) {
-				shipFiles.add(shipDir);
-			} else {
-				LOG.warn("Ship directory is not a directory. Ignoring it.");
-			}
-		}
-
-		yarnClusterDescriptor.addShipFiles(shipFiles);
-
-		// queue
-		if (cmd.hasOption(queue.getOpt())) {
-			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
-		}
-
-		final Properties properties = cmd.getOptionProperties(dynamicproperties.getOpt());
-
-		String[] dynamicProperties = properties.stringPropertyNames().stream()
-			.flatMap(
-				(String key) -> {
-					final String value = properties.getProperty(key);
-
-					if (value != null) {
-						return Stream.of(key + dynamicproperties.getValueSeparator() + value);
-					} else {
-						return Stream.empty();
-					}
-				})
-			.toArray(String[]::new);
-
-		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
-		yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
-		if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
-			this.detachedMode = true;
-			yarnClusterDescriptor.setDetachedMode(true);
-		}
-
-		if (cmd.hasOption(name.getOpt())) {
-			yarnClusterDescriptor.setName(cmd.getOptionValue(name.getOpt()));
-		} else {
-			// set the default application name, if none is specified
-			if (defaultApplicationName != null) {
-				yarnClusterDescriptor.setName(defaultApplicationName);
-			}
-		}
-
-		if (cmd.hasOption(zookeeperNamespace.getOpt())) {
-			String zookeeperNamespaceValue = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
-			yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespaceValue);
-		}
-
 		return yarnClusterDescriptor;
 	}
 
@@ -491,14 +394,12 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
 
 		final ApplicationId applicationId = getClusterId(commandLine);
 
-		if (applicationId != null) {
-			final String zooKeeperNamespace;
-			if (commandLine.hasOption(zookeeperNamespace.getOpt())){
-				zooKeeperNamespace = commandLine.getOptionValue(zookeeperNamespace.getOpt());
-			} else {
-				zooKeeperNamespace = effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
-			}
-
+		final String zooKeeperNamespace;
+		if (commandLine.hasOption(zookeeperNamespace.getOpt())) {
+			zooKeeperNamespace = commandLine.getOptionValue(zookeeperNamespace.getOpt());
+			effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
+		} else if (applicationId != null) {
+			zooKeeperNamespace = effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
 			effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
 		}
 
@@ -514,6 +415,49 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
 			effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
 		}
 
+		// queue
+		if (commandLine.hasOption(queue.getOpt())) {
+			effectiveConfiguration.setString(YarnConfigOptions.YARN_QUEUE, commandLine.getOptionValue(queue.getOpt()));
+		}
+
+		if (commandLine.hasOption(detached.getOpt()) || commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+			this.detachedMode = true;
+			effectiveConfiguration.setBoolean(YarnConfigOptions.DETACHED_MODE, true);
+		}
+
+		final Properties properties = commandLine.getOptionProperties(dynamicproperties.getOpt());
+
+		if (properties != null) {
+			String[] dynamicProperties = properties.stringPropertyNames().stream()
+				.flatMap(
+					(String key) -> {
+						final String value = properties.getProperty(key);
+
+						if (value != null) {
+							return Stream.of(key + dynamicproperties.getValueSeparator() + value);
+						} else {
+							return Stream.empty();
+						}
+					})
+				.toArray(String[]::new);
+
+			String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+			effectiveConfiguration.setString(YarnConfigOptions.DYNAMIC_PROPERTIES_ENCODED, dynamicPropertiesEncoded);
+		}
+
+		if (commandLine.hasOption(flinkJar.getOpt())) {
+			effectiveConfiguration.setString(YarnConfigOptions.FLINK_JAR, commandLine.getOptionValue(flinkJar.getOpt()));
+		}
+
+		if (commandLine.hasOption(name.getOpt())) {
+			effectiveConfiguration.setString(YarnConfigOptions.YARN_APPLICATION_NAME, commandLine.getOptionValue(name.getOpt()));
+		}
+
+		if (commandLine.hasOption(shipPath.getOpt())) {
+			effectiveConfiguration.setString(YarnConfigOptions.YARN_SHIP_PATHS, commandLine.getOptionValue(this.shipPath.getOpt()));
+		}
+
 		if (isYarnPropertiesFileMode(commandLine)) {
 			return applyYarnProperties(effectiveConfiguration);
 		} else {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 255a6c71f8a..a92cbd615e4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -142,6 +142,54 @@
 		.defaultValue("")
 		.withDescription("A comma-separated list of tags to apply to the Flink YARN application.");
 
+	/**
+	 * A command option to specify the application will submit to which YARN queue.
+	 */
+	public static final ConfigOption<String> YARN_QUEUE =
+		key("yarn.queue")
+		.noDefaultValue()
+		.withDescription("A command option to specify the application will submit to which YARN queue.");
+
+	/**
+	 * A command option to specify whether the application submission uses detached mode or not.
+	 */
+	public static final ConfigOption<Boolean> DETACHED_MODE =
+		key("yarn.detached-mode")
+		.defaultValue(false)
+		.withDescription("A command option to specify whether the application submission uses detached mode or not.");
+
+	/**
+	 * A command option to specify the encoded dynamic properties.
+	 */
+	public static final ConfigOption<String> DYNAMIC_PROPERTIES_ENCODED =
+		key("yarn.dynamic-properties-encoded")
+		.noDefaultValue()
+		.withDescription("A command option to specify the encoded dynamic properties.");
+
+	/**
+	 * A command option to specify the flink jar path.
+	 */
+	public static final ConfigOption<String> FLINK_JAR =
+		key("yarn.flink-jar")
+		.noDefaultValue()
+		.withDescription("A command option to specify the flink jar path.");
+
+	/**
+	 * A command option to specify the YARN flink application name.
+	 */
+	public static final ConfigOption<String> YARN_APPLICATION_NAME =
+		key("yarn.application-name")
+		.noDefaultValue()
+		.withDescription("A command option to specify the YARN flink application name.");
+
+	/**
+	 * A command option which contains comma-separated list of ship paths.
+	 */
+	public static final ConfigOption<String> YARN_SHIP_PATHS =
+		key("yarn.ship-paths")
+		.noDefaultValue()
+		.withDescription("A comma-separated list of ship paths.");
+
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated. */
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 20ce314399f..c66e2180dd4 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -137,6 +137,42 @@ public void testZookeeperNamespaceProperty() throws Exception {
 		assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
 	}
 
+	@Test
+	public void testNameProperty() throws Exception {
+		String testName = "testFlinkApplication";
+		String[] params = new String[] {"-yn", "2", "-ynm", testName};
+
+		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
+			new Configuration(),
+			tmp.getRoot().getAbsolutePath(),
+			"y",
+			"yarn");
+
+		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+
+		AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
+
+		assertEquals(testName, descriptor.getCustomName());
+	}
+
+	@Test
+	public void testFlinkJarPathProperty() throws Exception {
+		String testFlinkJarPathCliInput = "/path/to/flink.jar";
+		String[] params = new String[] {"-yn", "2", "-yj", testFlinkJarPathCliInput};
+
+		FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
+			new Configuration(),
+			tmp.getRoot().getAbsolutePath(),
+			"y",
+			"yarn");
+
+		CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+
+		AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
+
+		assertEquals(testFlinkJarPathCliInput, new File(descriptor.getFlinkJarPath().toUri()).getAbsolutePath());
+	}
+
 	/**
 	 * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
 	 */
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index dd8b62536c2..9abdb040f1c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -29,7 +29,6 @@
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -44,10 +43,8 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -90,16 +87,18 @@ public static void tearDownClass() {
 
 	@Test
 	public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setString(YarnConfigOptions.FLINK_JAR, flinkJar.getPath());
 
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
-			new Configuration(),
+			flinkConfig,
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
 			yarnClient,
 			true);
 
-		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
-
 		ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 			.setMasterMemoryMB(-1)
 			.setTaskManagerMemoryMB(-1)
@@ -126,6 +125,7 @@ public void testConfigOverwrite() throws ClusterDeploymentException {
 		Configuration configuration = new Configuration();
 		// overwrite vcores in config
 		configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
+		configuration.setString(YarnConfigOptions.FLINK_JAR, flinkJar.getPath());
 
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
@@ -134,8 +134,6 @@ public void testConfigOverwrite() throws ClusterDeploymentException {
 			yarnClient,
 			true);
 
-		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
-
 		// configure slots
 		ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 			.setMasterMemoryMB(-1)
@@ -412,28 +410,38 @@ public void testSetupApplicationMasterContainer() {
 	 */
 	@Test
 	public void testExplicitLibShipping() throws Exception {
+		Configuration flinkConfiguration = new Configuration();
+
+		File libFile = temporaryFolder.newFile("libFile.jar");
+		File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+
 		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
-			new Configuration(),
+			flinkConfiguration,
 			yarnConfiguration,
 			temporaryFolder.getRoot().getAbsolutePath(),
 			yarnClient,
 			true);
 
 		try {
-			descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
-
-			File libFile = temporaryFolder.newFile("libFile.jar");
-			File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
-
 			Assert.assertFalse(descriptor.shipFiles.contains(libFile));
 			Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
+		} finally {
+			descriptor.close();
+		}
 
-			List<File> shipFiles = new ArrayList<>();
-			shipFiles.add(libFile);
-			shipFiles.add(libFolder);
+		flinkConfiguration = new Configuration();
+		flinkConfiguration.setString(YarnConfigOptions.FLINK_JAR, "/path/to/flink.jar");
+		String shipFilePaths = libFile.getAbsolutePath() + "," + libFolder.getAbsolutePath();
+		flinkConfiguration.setString(YarnConfigOptions.YARN_SHIP_PATHS, shipFilePaths);
 
-			descriptor.addShipFiles(shipFiles);
+		descriptor = new YarnClusterDescriptor(
+			flinkConfiguration,
+			yarnConfiguration,
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient,
+			true);
 
+		try {
 			Assert.assertTrue(descriptor.shipFiles.contains(libFile));
 			Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services