You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/22 20:40:47 UTC
[11/17] flink git commit: [FLINK-8402] [tests] Fix hadoop/presto S3
IT cases for eventually-consistent operations
[FLINK-8402] [tests] Fix hadoop/presto S3 IT cases for eventually-consistent operations
Also see
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
This closes #5624
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65819147
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65819147
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65819147
Branch: refs/heads/master
Commit: 6581914749c3d7d06b18bfb1454937048ce4dad8
Parents: 29894d0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Mar 2 17:26:48 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 16:28:35 2018 +0200
----------------------------------------------------------------------
.../flink/core/fs/FileSystemTestUtils.java | 46 ++++++++++++++++++++
.../fs/s3hadoop/HadoopS3FileSystemITCase.java | 34 +++++++++++----
.../fs/s3presto/PrestoS3FileSystemITCase.java | 19 +++++---
3 files changed, 85 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/65819147/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
new file mode 100644
index 0000000..564ef3a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTestUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Various utility functions for testing {@link FileSystem} implementations.
+ */
+public class FileSystemTestUtils {
+
+ /**
+ * Verifies that the given path eventually appears on / disappears from <tt>fs</tt> within
+ * <tt>deadline</tt> nanoseconds.
+ */
+ public static void checkPathEventualExistence(
+ FileSystem fs,
+ Path path,
+ boolean expectedExists,
+ long deadline) throws IOException, InterruptedException {
+ boolean dirExists;
+ while ((dirExists = fs.exists(path)) != expectedExists &&
+ System.nanoTime() < deadline) {
+ Thread.sleep(10);
+ }
+ assertEquals(expectedExists, dirExists);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65819147/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index 8c646f0..e640603 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -39,6 +39,7 @@ import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
+import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -46,8 +47,11 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
- * Unit tests for the S3 file system support via Presto's PrestoS3FileSystem.
- * These tests do not actually read from or write to S3.
+ * Unit tests for the S3 file system support via Hadoop's {@link org.apache.hadoop.fs.s3a.S3AFileSystem}.
+ *
+ * <p><strong>BEWARE</strong>: tests must take special care of S3's
+ * <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel">consistency guarantees</a>
+ * and what the {@link org.apache.hadoop.fs.s3a.S3AFileSystem} offers.
*/
public class HadoopS3FileSystemITCase extends TestLogger {
@@ -90,8 +94,9 @@ public class HadoopS3FileSystemITCase extends TestLogger {
}
@AfterClass
- public static void cleanUp() throws IOException {
+ public static void cleanUp() throws IOException, InterruptedException {
if (!skipTest) {
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
// initialize configuration with valid credentials
final Configuration conf = new Configuration();
conf.setString("s3.access.key", ACCESS_KEY);
@@ -105,7 +110,7 @@ public class HadoopS3FileSystemITCase extends TestLogger {
fs.delete(directory, true);
// now directory must be gone
- assertFalse(fs.exists(directory));
+ checkPathEventualExistence(fs, directory, false, deadline);
// reset configuration
FileSystem.initialize(new Configuration());
@@ -167,6 +172,7 @@ public class HadoopS3FileSystemITCase extends TestLogger {
@Test
public void testSimpleFileWriteAndRead() throws Exception {
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
final Configuration conf = new Configuration();
conf.setString("s3.access.key", ACCESS_KEY);
conf.setString("s3.secret.key", SECRET_KEY);
@@ -184,6 +190,9 @@ public class HadoopS3FileSystemITCase extends TestLogger {
writer.write(testLine);
}
+ // just in case, wait for the path to exist
+ checkPathEventualExistence(fs, path, true, deadline);
+
try (FSDataInputStream in = fs.open(path);
InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(ir)) {
@@ -194,10 +203,14 @@ public class HadoopS3FileSystemITCase extends TestLogger {
finally {
fs.delete(path, false);
}
+
+ // now file must be gone (this is eventually-consistent!)
+ checkPathEventualExistence(fs, path, false, deadline);
}
@Test
public void testDirectoryListing() throws Exception {
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
final Configuration conf = new Configuration();
conf.setString("s3.access.key", ACCESS_KEY);
conf.setString("s3.secret.key", SECRET_KEY);
@@ -214,8 +227,7 @@ public class HadoopS3FileSystemITCase extends TestLogger {
// create directory
assertTrue(fs.mkdirs(directory));
- // seems the presto file system does not assume existence of empty directories in S3
- assertTrue(fs.exists(directory));
+ checkPathEventualExistence(fs, directory, true, deadline);
// directory empty
assertEquals(0, fs.listStatus(directory).length);
@@ -224,10 +236,13 @@ public class HadoopS3FileSystemITCase extends TestLogger {
final int numFiles = 3;
for (int i = 0; i < numFiles; i++) {
Path file = new Path(directory, "/file-" + i);
- try (FSDataOutputStream out = fs.create(file, WriteMode.NO_OVERWRITE);
+ try (FSDataOutputStream out = fs.create(file, WriteMode.OVERWRITE);
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
writer.write("hello-" + i + "\n");
}
+ // just in case, wait for the file to exist (should then also be reflected in the
+ // directory's file list below)
+ checkPathEventualExistence(fs, file, true, deadline);
}
FileStatus[] files = fs.listStatus(directory);
@@ -246,7 +261,8 @@ public class HadoopS3FileSystemITCase extends TestLogger {
fs.delete(directory, true);
}
- // now directory must be gone
- assertFalse(fs.exists(directory));
+ // now directory must be gone (this is eventually-consistent, though!)
+ checkPathEventualExistence(fs, directory, false, deadline);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/65819147/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
index b593208..caf1636 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
@@ -37,14 +37,18 @@ import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
+import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
- * Unit tests for the S3 file system support via Presto's PrestoS3FileSystem.
- * These tests do not actually read from or write to S3.
+ * Unit tests for the S3 file system support via Presto's {@link com.facebook.presto.hive.PrestoS3FileSystem}.
+ *
+ * <p><strong>BEWARE</strong>: tests must take special care of S3's
+ * <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel">consistency guarantees</a>
+ * and what the {@link com.facebook.presto.hive.PrestoS3FileSystem} offers.
*/
public class PrestoS3FileSystemITCase extends TestLogger {
@@ -64,6 +68,7 @@ public class PrestoS3FileSystemITCase extends TestLogger {
@Test
public void testSimpleFileWriteAndRead() throws Exception {
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
final Configuration conf = new Configuration();
conf.setString("s3.access-key", ACCESS_KEY);
conf.setString("s3.secret-key", SECRET_KEY);
@@ -91,10 +96,14 @@ public class PrestoS3FileSystemITCase extends TestLogger {
finally {
fs.delete(path, false);
}
+
+ // now file must be gone (this is eventually-consistent!)
+ checkPathEventualExistence(fs, path, false, deadline);
}
@Test
public void testDirectoryListing() throws Exception {
+ final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
final Configuration conf = new Configuration();
conf.setString("s3.access-key", ACCESS_KEY);
conf.setString("s3.secret-key", SECRET_KEY);
@@ -121,7 +130,7 @@ public class PrestoS3FileSystemITCase extends TestLogger {
final int numFiles = 3;
for (int i = 0; i < numFiles; i++) {
Path file = new Path(directory, "/file-" + i);
- try (FSDataOutputStream out = fs.create(file, WriteMode.NO_OVERWRITE);
+ try (FSDataOutputStream out = fs.create(file, WriteMode.OVERWRITE);
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
writer.write("hello-" + i + "\n");
}
@@ -143,7 +152,7 @@ public class PrestoS3FileSystemITCase extends TestLogger {
fs.delete(directory, true);
}
- // now directory must be gone
- assertFalse(fs.exists(directory));
+ // now directory must be gone (this is eventually-consistent!)
+ checkPathEventualExistence(fs, directory, false, deadline);
}
}