You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/04 09:40:42 UTC

[flink] branch release-1.11 updated: [FLINK-18087][yarn] Fix uploading user artifact for Yarn job cluster

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f394995  [FLINK-18087][yarn] Fix uploading user artifact for Yarn job cluster
f394995 is described below

commit f394995281f888813c888303025735adb56cc952
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Wed Jun 3 19:28:58 2020 +0800

    [FLINK-18087][yarn] Fix uploading user artifact for Yarn job cluster
    
    This closes #12463.
---
 .../src/test/java/org/apache/flink/yarn/YARNITCase.java            | 7 +++++++
 .../src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 1e5a64e..88d91ca 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
@@ -126,6 +127,12 @@ public class YARNITCase extends YarnTestBase {
 							false)
 					.getClusterClient()) {
 
+				for (DistributedCache.DistributedCacheEntry entry : jobGraph.getUserArtifacts().values()) {
+					assertTrue(
+						String.format("The user artifacts(%s) should be remote or uploaded to remote filesystem.", entry.filePath),
+						Utils.isRemotePath(entry.filePath));
+				}
+
 				ApplicationId applicationId = clusterClient.getClusterId();
 
 				final CompletableFuture<JobResult> jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2d2103c..8e15e56 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -744,7 +744,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		if (jobGraph != null) {
 			for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
 				// only upload local files
-				if (Utils.isRemotePath(entry.getValue().filePath)) {
+				if (!Utils.isRemotePath(entry.getValue().filePath)) {
 					Path localPath = new Path(entry.getValue().filePath);
 					Tuple2<Path, Long> remoteFileInfo =
 							fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());