You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/22 20:40:47 UTC

[11/17] flink git commit: [FLINK-8402] [tests] Fix hadoop/presto S3 IT cases for eventually-consistent operations

[FLINK-8402] [tests] Fix hadoop/presto S3 IT cases for eventually-consistent operations

Also see
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel

This closes #5624


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65819147
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65819147
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65819147

Branch: refs/heads/master
Commit: 6581914749c3d7d06b18bfb1454937048ce4dad8
Parents: 29894d0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Mar 2 17:26:48 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 16:28:35 2018 +0200

----------------------------------------------------------------------
 .../flink/core/fs/FileSystemTestUtils.java      | 46 ++++++++++++++++++++
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java   | 34 +++++++++++----
 .../fs/s3presto/PrestoS3FileSystemITCase.java   | 19 +++++---
 3 files changed, 85 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65819147/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
new file mode 100644
index 0000000..564ef3a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Various utility functions for testing {@link FileSystem} implementations.
+ */
+public class FileSystemTestUtils {
+
+	/**
+	 * Verifies that the given path eventually appears on / disappears from <tt>fs</tt> within
+	 * <tt>deadline</tt> nanoseconds.
+	 */
+	public static void checkPathEventualExistence(
+			FileSystem fs,
+			Path path,
+			boolean expectedExists,
+			long deadline) throws IOException, InterruptedException {
+		boolean dirExists;
+		while ((dirExists = fs.exists(path)) != expectedExists &&
+				System.nanoTime() < deadline) {
+			Thread.sleep(10);
+		}
+		assertEquals(expectedExists, dirExists);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65819147/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index 8c646f0..e640603 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -39,6 +39,7 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.UUID;
 
+import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -46,8 +47,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Unit tests for the S3 file system support via Presto's PrestoS3FileSystem.
- * These tests do not actually read from or write to S3.
+ * Unit tests for the S3 file system support via Hadoop's {@link org.apache.hadoop.fs.s3a.S3AFileSystem}.
+ *
+ * <p><strong>BEWARE</strong>: tests must take special care of S3's
+ * <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel">consistency guarantees</a>
+ * and what the {@link org.apache.hadoop.fs.s3a.S3AFileSystem} offers.
  */
 public class HadoopS3FileSystemITCase extends TestLogger {
 
@@ -90,8 +94,9 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 	}
 
 	@AfterClass
-	public static void cleanUp() throws IOException {
+	public static void cleanUp() throws IOException, InterruptedException {
 		if (!skipTest) {
+			final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 			// initialize configuration with valid credentials
 			final Configuration conf = new Configuration();
 			conf.setString("s3.access.key", ACCESS_KEY);
@@ -105,7 +110,7 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 			fs.delete(directory, true);
 
 			// now directory must be gone
-			assertFalse(fs.exists(directory));
+			checkPathEventualExistence(fs, directory, false, deadline);
 
 			// reset configuration
 			FileSystem.initialize(new Configuration());
@@ -167,6 +172,7 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 
 	@Test
 	public void testSimpleFileWriteAndRead() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
 		conf.setString("s3.access.key", ACCESS_KEY);
 		conf.setString("s3.secret.key", SECRET_KEY);
@@ -184,6 +190,9 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 				writer.write(testLine);
 			}
 
+			// just in case, wait for the path to exist
+			checkPathEventualExistence(fs, path, true, deadline);
+
 			try (FSDataInputStream in = fs.open(path);
 					InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
 					BufferedReader reader = new BufferedReader(ir)) {
@@ -194,10 +203,14 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 		finally {
 			fs.delete(path, false);
 		}
+
+		// now file must be gone (this is eventually-consistent!)
+		checkPathEventualExistence(fs, path, false, deadline);
 	}
 
 	@Test
 	public void testDirectoryListing() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
 		conf.setString("s3.access.key", ACCESS_KEY);
 		conf.setString("s3.secret.key", SECRET_KEY);
@@ -214,8 +227,7 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 			// create directory
 			assertTrue(fs.mkdirs(directory));
 
-			// seems the presto file system does not assume existence of empty directories in S3
-			assertTrue(fs.exists(directory));
+			checkPathEventualExistence(fs, directory, true, deadline);
 
 			// directory empty
 			assertEquals(0, fs.listStatus(directory).length);
@@ -224,10 +236,13 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 			final int numFiles = 3;
 			for (int i = 0; i < numFiles; i++) {
 				Path file = new Path(directory, "/file-" + i);
-				try (FSDataOutputStream out = fs.create(file, WriteMode.NO_OVERWRITE);
+				try (FSDataOutputStream out = fs.create(file, WriteMode.OVERWRITE);
 						OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
 					writer.write("hello-" + i + "\n");
 				}
+				// just in case, wait for the file to exist (should then also be reflected in the
+				// directory's file list below)
+				checkPathEventualExistence(fs, file, true, deadline);
 			}
 
 			FileStatus[] files = fs.listStatus(directory);
@@ -246,7 +261,8 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 			fs.delete(directory, true);
 		}
 
-		// now directory must be gone
-		assertFalse(fs.exists(directory));
+		// now directory must be gone (this is eventually-consistent, though!)
+		checkPathEventualExistence(fs, directory, false, deadline);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/65819147/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
index b593208..caf1636 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
@@ -37,14 +37,18 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.UUID;
 
+import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Unit tests for the S3 file system support via Presto's PrestoS3FileSystem.
- * These tests do not actually read from or write to S3.
+ * Unit tests for the S3 file system support via Presto's {@link com.facebook.presto.hive.PrestoS3FileSystem}.
+ *
+ * <p><strong>BEWARE</strong>: tests must take special care of S3's
+ * <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel">consistency guarantees</a>
+ * and what the {@link com.facebook.presto.hive.PrestoS3FileSystem} offers.
  */
 public class PrestoS3FileSystemITCase extends TestLogger {
 
@@ -64,6 +68,7 @@ public class PrestoS3FileSystemITCase extends TestLogger {
 
 	@Test
 	public void testSimpleFileWriteAndRead() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
 		conf.setString("s3.access-key", ACCESS_KEY);
 		conf.setString("s3.secret-key", SECRET_KEY);
@@ -91,10 +96,14 @@ public class PrestoS3FileSystemITCase extends TestLogger {
 		finally {
 			fs.delete(path, false);
 		}
+
+		// now file must be gone (this is eventually-consistent!)
+		checkPathEventualExistence(fs, path, false, deadline);
 	}
 
 	@Test
 	public void testDirectoryListing() throws Exception {
+		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
 		conf.setString("s3.access-key", ACCESS_KEY);
 		conf.setString("s3.secret-key", SECRET_KEY);
@@ -121,7 +130,7 @@ public class PrestoS3FileSystemITCase extends TestLogger {
 			final int numFiles = 3;
 			for (int i = 0; i < numFiles; i++) {
 				Path file = new Path(directory, "/file-" + i);
-				try (FSDataOutputStream out = fs.create(file, WriteMode.NO_OVERWRITE);
+				try (FSDataOutputStream out = fs.create(file, WriteMode.OVERWRITE);
 						OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
 					writer.write("hello-" + i + "\n");
 				}
@@ -143,7 +152,7 @@ public class PrestoS3FileSystemITCase extends TestLogger {
 			fs.delete(directory, true);
 		}
 
-		// now directory must be gone
-		assertFalse(fs.exists(directory));
+		// now directory must be gone (this is eventually-consistent!)
+		checkPathEventualExistence(fs, directory, false, deadline);
 	}
 }