You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/16 09:32:00 UTC

[flink] 01/02: Revert "[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent read-after-write"

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 623d2540c8a4142ef016eeccd4b120c7a595117a
Author: Alice Yan <yy...@uber.com>
AuthorDate: Thu Apr 18 12:02:09 2019 -0700

    Revert "[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent read-after-write"
    
    This reverts commit c90a757b29f168144b1bae99df532911ae682e63.
---
 .../src/main/java/org/apache/flink/yarn/Utils.java | 34 ++--------------------
 1 file changed, 2 insertions(+), 32 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 6fc685a..4fb0fbf 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -143,8 +143,7 @@ public final class Utils {
 		Path homedir,
 		String relativeTargetPath) throws IOException {
 
-		File localFile = new File(localSrcPath.toUri().getPath());
-		if (localFile.isDirectory()) {
+		if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
 			throw new IllegalArgumentException("File to copy must not be a directory: " +
 				localSrcPath);
 		}
@@ -162,15 +161,8 @@ public final class Utils {
 
 		fs.copyFromLocalFile(false, true, localSrcPath, dst);
 
-		// Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote
-		//       file once again which has problems with eventually consistent read-after-write file
-		//       systems. Instead, we decide to preserve the modification time at the remote
-		//       location because this and the size of the resource will be checked by YARN based on
-		//       the values we provide to #registerLocalResource() below.
-		fs.setTimes(dst, localFile.lastModified(), -1);
 		// now create the resource instance
-		LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified());
-
+		LocalResource resource = registerLocalResource(fs, dst);
 		return Tuple2.of(dst, resource);
 	}
 
@@ -197,28 +189,6 @@ public final class Utils {
 		}
 	}
 
-	/**
-	 * Creates a YARN resource for the remote object at the given location.
-	 *
-	 * @param remoteRsrcPath	remote location of the resource
-	 * @param resourceSize		size of the resource
-	 * @param resourceModificationTime last modification time of the resource
-	 *
-	 * @return YARN resource
-	 */
-	private static LocalResource registerLocalResource(
-			Path remoteRsrcPath,
-			long resourceSize,
-			long resourceModificationTime) {
-		LocalResource localResource = Records.newRecord(LocalResource.class);
-		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
-		localResource.setSize(resourceSize);
-		localResource.setTimestamp(resourceModificationTime);
-		localResource.setType(LocalResourceType.FILE);
-		localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-		return localResource;
-	}
-
 	private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException {
 		LocalResource localResource = Records.newRecord(LocalResource.class);
 		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);