You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/25 12:18:22 UTC

[flink] branch master updated (d88c195 -> e92b2bf)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d88c195  [FLINK-16210] Extend the Flink Architecture section with more information about Flink Master components and application execution
     new 656e8dc  [hotfix][filesystems][test] Fix improper usage of System.nanoTime().
     new e92b2bf  [FLINK-17721][filesystems][test] Use independent timeout for each file status checking.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/core/fs/FileSystemTestUtils.java  |  7 +++---
 .../fs/hdfs/AbstractHadoopFileSystemITTest.java    | 26 ++++++++++++----------
 .../fs/osshadoop/HadoopOSSFileSystemITCase.java    |  2 +-
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java      |  2 +-
 .../fs/s3presto/PrestoS3FileSystemITCase.java      |  2 +-
 5 files changed, 21 insertions(+), 18 deletions(-)


[flink] 01/02: [hotfix][filesystems][test] Fix improper usage of System.nanoTime().

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 656e8dc8e6eadaadaa63c2d184ac3fbd4293f908
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 21 16:36:15 2020 +0800

    [hotfix][filesystems][test] Fix improper usage of System.nanoTime().
    
    Per the JavaDoc of System.nanoTime(), we should use `t1 - t0 < 0` rather than `t1 < t0` because of the possibility of numerical overflow.
---
 .../src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java     | 2 +-
 .../apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
index 564ef3a..23bde19 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
@@ -38,7 +38,7 @@ public class FileSystemTestUtils {
 			long deadline) throws IOException, InterruptedException {
 		boolean dirExists;
 		while ((dirExists = fs.exists(path)) != expectedExists &&
-				System.nanoTime() < deadline) {
+				System.nanoTime() - deadline < 0) {
 			Thread.sleep(10);
 		}
 		assertEquals(expectedExists, dirExists);
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
index 083a880..a7b29a7 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
@@ -155,7 +155,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 	}
 
 	public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long deadline) throws IOException, InterruptedException {
-		while (fs.exists(path) && System.nanoTime() < deadline) {
+		while (fs.exists(path) && System.nanoTime() - deadline < 0) {
 			fs.delete(path, true);
 			Thread.sleep(50L);
 		}


[flink] 02/02: [FLINK-17721][filesystems][test] Use independent timeout for each file status checking.

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e92b2bf19bdf03ad3bae906dc5fa3781aeddb3ee
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 21 16:44:18 2020 +0800

    [FLINK-17721][filesystems][test] Use independent timeout for each file status checking.
---
 .../apache/flink/core/fs/FileSystemTestUtils.java  |  5 +++--
 .../fs/hdfs/AbstractHadoopFileSystemITTest.java    | 24 ++++++++++++----------
 .../fs/osshadoop/HadoopOSSFileSystemITCase.java    |  2 +-
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java      |  2 +-
 .../fs/s3presto/PrestoS3FileSystemITCase.java      |  2 +-
 5 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
index 23bde19..f1e58d5 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
@@ -29,14 +29,15 @@ public class FileSystemTestUtils {
 
 	/**
 	 * Verifies that the given path eventually appears on / disappears from <tt>fs</tt> within
-	 * <tt>deadline</tt> nanoseconds.
+	 * <tt>consistencyToleranceNS</tt> nanoseconds.
 	 */
 	public static void checkPathEventualExistence(
 			FileSystem fs,
 			Path path,
 			boolean expectedExists,
-			long deadline) throws IOException, InterruptedException {
+			long consistencyToleranceNS) throws IOException, InterruptedException {
 		boolean dirExists;
+		long deadline = System.nanoTime() + consistencyToleranceNS;
 		while ((dirExists = fs.exists(path)) != expectedExists &&
 				System.nanoTime() - deadline < 0) {
 			Thread.sleep(10);
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
index a7b29a7..f59a91b 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/AbstractHadoopFileSystemITTest.java
@@ -49,22 +49,22 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 
 	protected static FileSystem fs;
 	protected static Path basePath;
-	protected static long deadline;
+	protected static long consistencyToleranceNS;
 
 	public static void checkPathExistence(Path path,
 			boolean expectedExists,
-			long deadline) throws IOException, InterruptedException {
-		if (deadline == 0) {
+			long consistencyToleranceNS) throws IOException, InterruptedException {
+		if (consistencyToleranceNS == 0) {
 			//strongly consistency
 			assertEquals(expectedExists, fs.exists(path));
 		} else {
 			//eventually consistency
-			checkPathEventualExistence(fs, path, expectedExists, deadline);
+			checkPathEventualExistence(fs, path, expectedExists, consistencyToleranceNS);
 		}
 	}
 
 	protected void checkEmptyDirectory(Path path) throws IOException, InterruptedException {
-		checkPathExistence(path, true, deadline);
+		checkPathExistence(path, true, consistencyToleranceNS);
 	}
 
 	@Test
@@ -80,7 +80,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 			}
 
 			// just in case, wait for the path to exist
-			checkPathExistence(path, true, deadline);
+			checkPathExistence(path, true, consistencyToleranceNS);
 
 			try (FSDataInputStream in = fs.open(path);
 				InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
@@ -93,7 +93,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 			fs.delete(path, false);
 		}
 
-		checkPathExistence(path, false, deadline);
+		checkPathExistence(path, false, consistencyToleranceNS);
 	}
 
 	@Test
@@ -122,7 +122,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 				}
 				// just in case, wait for the file to exist (should then also be reflected in the
 				// directory's file list below)
-				checkPathExistence(file, true, deadline);
+				checkPathExistence(file, true, consistencyToleranceNS);
 			}
 
 			FileStatus[] files = fs.listStatus(directory);
@@ -138,7 +138,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 		}
 		finally {
 			// clean up
-			cleanupDirectoryWithRetry(fs, directory, deadline);
+			cleanupDirectoryWithRetry(fs, directory, consistencyToleranceNS);
 		}
 	}
 
@@ -146,7 +146,7 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 	public static void teardown() throws IOException, InterruptedException {
 		try {
 			if (fs != null) {
-				cleanupDirectoryWithRetry(fs, basePath, deadline);
+				cleanupDirectoryWithRetry(fs, basePath, consistencyToleranceNS);
 			}
 		}
 		finally {
@@ -154,7 +154,9 @@ public abstract class AbstractHadoopFileSystemITTest extends TestLogger {
 		}
 	}
 
-	public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long deadline) throws IOException, InterruptedException {
+	public static void cleanupDirectoryWithRetry(FileSystem fs, Path path, long consistencyToleranceNS) throws IOException, InterruptedException {
+		fs.delete(path, true);
+		long deadline = System.nanoTime() + consistencyToleranceNS;
 		while (fs.exists(path) && System.nanoTime() - deadline < 0) {
 			fs.delete(path, true);
 			Thread.sleep(50L);
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
index eaf722a..2f6178b 100644
--- a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSFileSystemITCase.java
@@ -51,7 +51,7 @@ public class HadoopOSSFileSystemITCase extends AbstractHadoopFileSystemITTest {
 		FileSystem.initialize(conf);
 		basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 		fs = basePath.getFileSystem();
-		deadline = 0;
+		consistencyToleranceNS = 0;
 	}
 
 	@Test
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index fdfed24..23d7123 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -53,7 +53,7 @@ public class HadoopS3FileSystemITCase extends AbstractHadoopFileSystemITTest {
 
 		basePath = new Path(S3TestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
 		fs = basePath.getFileSystem();
-		deadline = System.nanoTime() + 90_000_000_000L;
+		consistencyToleranceNS = 30_000_000_000L; // 30 seconds
 
 		// check for uniqueness of the test directory
 		// directory must not yet exist
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
index 5929af1..5ab4a33 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
@@ -69,7 +69,7 @@ public class PrestoS3FileSystemITCase extends AbstractHadoopFileSystemITTest {
 
 		basePath = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 		fs = basePath.getFileSystem();
-		deadline = System.nanoTime() + 90_000_000_000L;
+		consistencyToleranceNS = 30_000_000_000L; // 30 seconds
 
 		// check for uniqueness of the test directory
 		// directory must not yet exist