You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/06/04 09:40:42 UTC
[flink] branch release-1.11 updated: [FLINK-18087][yarn] Fix
uploading user artifact for Yarn job cluster
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f394995 [FLINK-18087][yarn] Fix uploading user artifact for Yarn job cluster
f394995 is described below
commit f394995281f888813c888303025735adb56cc952
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Wed Jun 3 19:28:58 2020 +0800
[FLINK-18087][yarn] Fix uploading user artifact for Yarn job cluster
This closes #12463.
---
.../src/test/java/org/apache/flink/yarn/YARNITCase.java | 7 +++++++
.../src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 1e5a64e..88d91ca 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
@@ -126,6 +127,12 @@ public class YARNITCase extends YarnTestBase {
false)
.getClusterClient()) {
+ for (DistributedCache.DistributedCacheEntry entry : jobGraph.getUserArtifacts().values()) {
+ assertTrue(
+ String.format("The user artifacts(%s) should be remote or uploaded to remote filesystem.", entry.filePath),
+ Utils.isRemotePath(entry.filePath));
+ }
+
ApplicationId applicationId = clusterClient.getClusterId();
final CompletableFuture<JobResult> jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID());
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 2d2103c..8e15e56 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -744,7 +744,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
// only upload local files
- if (Utils.isRemotePath(entry.getValue().filePath)) {
+ if (!Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());