You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/16 09:31:59 UTC

[flink] branch master updated (97a8524 -> 770a404)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 97a8524  [FLINK-13193][tests] Enable custom flink cluster config per test
     new 623d254  Revert "[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent read-after-write"
     new 770a404  [FLINK-8801][yarn/s3] Retry if remote file not found and use remote modification time

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/java/org/apache/flink/yarn/Utils.java | 47 ++++++++++++++++++----
 1 file changed, 40 insertions(+), 7 deletions(-)


[flink] 01/02: Revert "[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent read-after-write"

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 623d2540c8a4142ef016eeccd4b120c7a595117a
Author: Alice Yan <yy...@uber.com>
AuthorDate: Thu Apr 18 12:02:09 2019 -0700

    Revert "[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent read-after-write"
    
    This reverts commit c90a757b29f168144b1bae99df532911ae682e63.
---
 .../src/main/java/org/apache/flink/yarn/Utils.java | 34 ++--------------------
 1 file changed, 2 insertions(+), 32 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 6fc685a..4fb0fbf 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -143,8 +143,7 @@ public final class Utils {
 		Path homedir,
 		String relativeTargetPath) throws IOException {
 
-		File localFile = new File(localSrcPath.toUri().getPath());
-		if (localFile.isDirectory()) {
+		if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
 			throw new IllegalArgumentException("File to copy must not be a directory: " +
 				localSrcPath);
 		}
@@ -162,15 +161,8 @@ public final class Utils {
 
 		fs.copyFromLocalFile(false, true, localSrcPath, dst);
 
-		// Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote
-		//       file once again which has problems with eventually consistent read-after-write file
-		//       systems. Instead, we decide to preserve the modification time at the remote
-		//       location because this and the size of the resource will be checked by YARN based on
-		//       the values we provide to #registerLocalResource() below.
-		fs.setTimes(dst, localFile.lastModified(), -1);
 		// now create the resource instance
-		LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified());
-
+		LocalResource resource = registerLocalResource(fs, dst);
 		return Tuple2.of(dst, resource);
 	}
 
@@ -197,28 +189,6 @@ public final class Utils {
 		}
 	}
 
-	/**
-	 * Creates a YARN resource for the remote object at the given location.
-	 *
-	 * @param remoteRsrcPath	remote location of the resource
-	 * @param resourceSize		size of the resource
-	 * @param resourceModificationTime last modification time of the resource
-	 *
-	 * @return YARN resource
-	 */
-	private static LocalResource registerLocalResource(
-			Path remoteRsrcPath,
-			long resourceSize,
-			long resourceModificationTime) {
-		LocalResource localResource = Records.newRecord(LocalResource.class);
-		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
-		localResource.setSize(resourceSize);
-		localResource.setTimestamp(resourceModificationTime);
-		localResource.setType(LocalResourceType.FILE);
-		localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-		return localResource;
-	}
-
 	private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException {
 		LocalResource localResource = Records.newRecord(LocalResource.class);
 		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);


[flink] 02/02: [FLINK-8801][yarn/s3] Retry if remote file not found and use remote modification time

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 770a40476e7aea4c53babc39e3edc922a3d6a4e7
Author: Alice Yan <yy...@uber.com>
AuthorDate: Thu Apr 18 12:16:47 2019 -0700

    [FLINK-8801][yarn/s3] Retry if remote file not found and use remote modification time
---
 .../src/main/java/org/apache/flink/yarn/Utils.java | 67 +++++++++++++++++++++-
 1 file changed, 65 insertions(+), 2 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 4fb0fbf..3f0bcdc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -52,6 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -62,6 +63,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 
@@ -81,6 +83,12 @@ public final class Utils {
 	/** Yarn site xml file name populated in YARN container for secure IT run. */
 	public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
+	/** Number of total retries to fetch the remote resources after uploaded in case of FileNotFoundException. */
+	public static final int REMOTE_RESOURCES_FETCH_NUM_RETRY = 3;
+
+	/** Time to wait in milliseconds between each remote resources fetch in case of FileNotFoundException. */
+	public static final int REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI = 100;
+
 	/**
 	 * See documentation.
 	 */
@@ -143,7 +151,8 @@ public final class Utils {
 		Path homedir,
 		String relativeTargetPath) throws IOException {
 
-		if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
+		File localFile = new File(localSrcPath.toUri().getPath());
+		if (localFile.isDirectory()) {
 			throw new IllegalArgumentException("File to copy must not be a directory: " +
 				localSrcPath);
 		}
@@ -161,8 +170,40 @@ public final class Utils {
 
 		fs.copyFromLocalFile(false, true, localSrcPath, dst);
 
+		// Note: If we directly used registerLocalResource(FileSystem, Path) here, we would access the remote
+		//       file once again which has problems with eventually consistent read-after-write file
+		//       systems. Instead, we decide to wait until the remote file be available.
+
+		FileStatus[] fss = null;
+		int iter = 1;
+		while (iter <= REMOTE_RESOURCES_FETCH_NUM_RETRY + 1) {
+			try {
+				fss = fs.listStatus(dst);
+				break;
+			} catch (FileNotFoundException e) {
+				LOG.debug("Got FileNotFoundException while fetching uploaded remote resources at retry num {}", iter);
+				try {
+					LOG.debug("Sleeping for {}ms", REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI);
+					TimeUnit.MILLISECONDS.sleep(REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI);
+				} catch (InterruptedException ie) {
+					LOG.warn("Failed to sleep for {}ms at retry num {} while fetching uploaded remote resources",
+						REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI, iter, ie);
+				}
+				iter++;
+			}
+		}
+
+		final long dstModificationTime;
+		if (fss != null && fss.length >  0) {
+			dstModificationTime = fss[0].getModificationTime();
+			LOG.debug("Got modification time {} from remote path {}", dstModificationTime, dst);
+		} else {
+			dstModificationTime = localFile.lastModified();
+			LOG.debug("Failed to fetch remote modification time from {}, using local timestamp {}", dst, dstModificationTime);
+		}
+
 		// now create the resource instance
-		LocalResource resource = registerLocalResource(fs, dst);
+		LocalResource resource = registerLocalResource(dst, localFile.length(), dstModificationTime);
 		return Tuple2.of(dst, resource);
 	}
 
@@ -189,6 +230,28 @@ public final class Utils {
 		}
 	}
 
+	/**
+	 * Creates a YARN resource for the remote object at the given location.
+	 *
+	 * @param remoteRsrcPath	remote location of the resource
+	 * @param resourceSize		size of the resource
+	 * @param resourceModificationTime last modification time of the resource
+	 *
+	 * @return YARN resource
+	 */
+	private static LocalResource registerLocalResource(
+			Path remoteRsrcPath,
+			long resourceSize,
+			long resourceModificationTime) {
+		LocalResource localResource = Records.newRecord(LocalResource.class);
+		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
+		localResource.setSize(resourceSize);
+		localResource.setTimestamp(resourceModificationTime);
+		localResource.setType(LocalResourceType.FILE);
+		localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+		return localResource;
+	}
+
 	private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException {
 		LocalResource localResource = Records.newRecord(LocalResource.class);
 		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);