You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/25 12:18:54 UTC

[flink] branch release-1.11 updated (bc8d22e -> 6215e11)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from bc8d22e  [FLINK-16210] Extend the Flink Architecture section with more information about Flink Master components and application execution
     new 6de5cb3  [hotfix][filesystems][test] Fix improper usage of System.nanoTime().
     new 6215e11  [FLINK-17721][filesystems][test] Use independent timeout for each file status checking.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/core/fs/FileSystemTestUtils.java  |  7 +++---
 .../fs/hdfs/AbstractHadoopFileSystemITTest.java    | 26 ++++++++++++----------
 .../fs/osshadoop/HadoopOSSFileSystemITCase.java    |  2 +-
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java      |  2 +-
 .../fs/s3presto/PrestoS3FileSystemITCase.java      |  2 +-
 5 files changed, 21 insertions(+), 18 deletions(-)


[flink] 02/02: [FLINK-17721][filesystems][test] Use independent timeout for each file status checking.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6215e1166aa44ce178ae8c068159eb7efeb53041
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 21 16:44:18 2020 +0800

    [FLINK-17721][filesystems][test] Use independent timeout for each file status checking.
---
 .../apache/flink/core/fs/FileSystemTestUtils.java  |  5 +++--
 .../fs/hdfs/AbstractHadoopFileSystemITTest.java    | 24 ++++++++++++----------
 .../fs/osshadoop/HadoopOSSFileSystemITCase.java    |  2 +-
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java      |  2 +-
 .../fs/s3presto/PrestoS3FileSystemITCase.java      |  2 +-
 5 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
index 23bde19..f1e58d5 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
@@ -29,14 +29,15 @@ public class FileSystemTestUtils {
 
 	/**
 	 * Verifies that the given path eventually appears on / disappears from <tt>fs</tt> within
-	 * <tt>deadline</tt> nanoseconds.
+	 * <tt>consistencyToleranceNS</tt> nanoseconds.
 	 */
 	public static void checkPathEventualExistence(
 			FileSystem fs,
 			Path path,
 			boolean expectedExists,
-			long deadline) throws IOException, InterruptedException {
+			long consistencyToleranceNS) throws IOException, InterruptedException {
 		boolean dirExists;
+		long deadline = System.nanoTime() + consistencyToleranceNS;
 		while ((dirExists = fs.exists(path)) != expectedExists &&
 				System.nanoTime() - deadline < 0) {
 			Thread.sleep(10);
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
index a7b29a7..f59a91b 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
@@ -49,22 +49,22 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 
 	protected static FileSystem fs;
 	protected static Path basePath;
-	protected static long deadline;
+	protected static long consistencyToleranceNS;
 
 	public static void checkPathExistence(Path path,
 			boolean expectedExists,
-			long deadline) throws IOException, InterruptedException {
-		if (deadline == 0) {
+			long consistencyToleranceNS) throws IOException, InterruptedException {
+		if (consistencyToleranceNS == 0) {
 			//strongly consistency
 			assertEquals(expectedExists, fs.exists(path));
 		} else {
 			//eventually consistency
-			checkPathEventualExistence(fs, path, expectedExists, deadline);
+			checkPathEventualExistence(fs, path, expectedExists, consistencyToleranceNS);
 		}
 	}
 
 	protected void checkEmptyDirectory(Path path) throws IOException, InterruptedException {
-		checkPathExistence(path, true, deadline);
+		checkPathExistence(path, true, consistencyToleranceNS);
 	}
 
 	@Test
@@ -80,7 +80,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 			}
 
 			// just in case, wait for the path to exist
-			checkPathExistence(path, true, deadline);
+			checkPathExistence(path, true, consistencyToleranceNS);
 
 			try (FSDataInputStream in = fs.open(path);
 				InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
@@ -93,7 +93,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 			fs.delete(path, false);
 		}
 
-		checkPathExistence(path, false, deadline);
+		checkPathExistence(path, false, consistencyToleranceNS);
 	}
 
 	@Test
@@ -122,7 +122,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 				}
 				// just in case, wait for the file to exist (should then also be reflected in the
 				// directory's file list below)
-				checkPathExistence(file, true, deadline);
+				checkPathExistence(file, true, consistencyToleranceNS);
 			}
 
 			FileStatus[] files = fs.listStatus(directory);
@@ -138,7 +138,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 		}
 		finally {
 			// clean up
-			cleanupDirectoryWithRetry(fs, directory, deadline);
+			cleanupDirectoryWithRetry(fs, directory, consistencyToleranceNS);
 		}
 	}
 
@@ -146,7 +146,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 	public static void teardown() throws IOException, InterruptedException {
 		try {
 			if (fs != null) {
-				cleanupDirectoryWithRetry(fs, basePath, deadline);
+				cleanupDirectoryWithRetry(fs, basePath, consistencyToleranceNS);
 			}
 		}
 		finally {
@@ -154,7 +154,9 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 		}
 	}
 
-	public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long deadline) throws IOException, InterruptedException {
+	public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long consistencyToleranceNS) throws IOException, InterruptedException {
+		fs.delete(path, true);
+		long deadline = System.nanoTime() + consistencyToleranceNS;
 		while (fs.exists(path) && System.nanoTime() - deadline < 0) {
 			fs.delete(path, true);
 			Thread.sleep(50L);
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
index eaf722a..2f6178b 100644
--- a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
@@ -51,7 +51,7 @@ public class HadoopOSSFileSystemITCase extends AbstractHadoopFileSystemITTest {
 		FileSystem.initialize(conf);
 		basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 		fs = basePath.getFileSystem();
-		deadline = 0;
+		consistencyToleranceNS = 0;
 	}
 
 	@Test
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index fdfed24..23d7123 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -53,7 +53,7 @@ public class HadoopS3FileSystemITCase extends AbstractHadoopFileSystemITTest {
 
 		basePath = new Path(S3TestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
 		fs = basePath.getFileSystem();
-		deadline = System.nanoTime() + 90_000_000_000L;
+		consistencyToleranceNS = 30_000_000_000L; // 30 seconds
 
 		// check for uniqueness of the test directory
 		// directory must not yet exist
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
index 5929af1..5ab4a33 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
@@ -69,7 +69,7 @@ public class PrestoS3FileSystemITCase extends AbstractHadoopFileSystemITTest {
 
 		basePath = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 		fs = basePath.getFileSystem();
-		deadline = System.nanoTime() + 90_000_000_000L;
+		consistencyToleranceNS = 30_000_000_000L; // 30 seconds
 
 		// check for uniqueness of the test directory
 		// directory must not yet exist


[flink] 01/02: [hotfix][filesystems][test] Fix improper usage of System.nanoTime().

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6de5cb310e63b97a6efea13f4940953655120ffd
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 21 16:36:15 2020 +0800

    [hotfix][filesystems][test] Fix improper usage of System.nanoTime().
    
    Per the JavaDoc of System.nanoTime(), we should use `t1 - t0 < 0` rather than `t1 < t0` because of the possibility of numerical overflow.
---
 .../src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java     | 2 +-
 .../apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
index 564ef3a..23bde19 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
@@ -38,7 +38,7 @@ public class FileSystemTestUtils {
 			long deadline) throws IOException, InterruptedException {
 		boolean dirExists;
 		while ((dirExists = fs.exists(path)) != expectedExists &&
-				System.nanoTime() < deadline) {
+				System.nanoTime() - deadline < 0) {
 			Thread.sleep(10);
 		}
 		assertEquals(expectedExists, dirExists);
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
index 083a880..a7b29a7 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
@@ -155,7 +155,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 	}
 
 	public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long deadline) throws IOException, InterruptedException {
-		while (fs.exists(path) && System.nanoTime() < deadline) {
+		while (fs.exists(path) && System.nanoTime() - deadline < 0) {
 			fs.delete(path, true);
 			Thread.sleep(50L);
 		}