You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/05 18:38:53 UTC

[12/19] flink git commit: [FLINK-8373] [core, hdfs] Ensure consistent semantics of FileSystem.mkdirs() across file system implementations.

[FLINK-8373] [core, hdfs] Ensure consistent semantics of FileSystem.mkdirs() across file system implementations.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d0ed12e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d0ed12e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d0ed12e

Branch: refs/heads/master
Commit: 3d0ed12edab5e1b89db0829230e69fb6ef841b7e
Parents: fd13ed0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Dec 7 16:11:24 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 5 19:38:06 2018 +0100

----------------------------------------------------------------------
 .../flink/core/fs/local/LocalFileSystem.java    |  29 ++-
 .../core/fs/FileSystemBehaviorTestSuite.java    | 210 +++++++++++++++++++
 .../java/org/apache/flink/core/fs/PathTest.java |  21 +-
 .../fs/local/LocalFileSystemBehaviorTest.java   |  51 +++++
 flink-filesystems/flink-hadoop-fs/pom.xml       |  26 +++
 .../hdfs/HadoopLocalFileSystemBehaviorTest.java |  74 +++++++
 .../flink/runtime/fs/hdfs/HdfsBehaviorTest.java |  98 +++++++++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java     | 101 +++++++++
 flink-filesystems/flink-s3-fs-hadoop/pom.xml    |   9 +
 .../HadoopS3FileSystemBehaviorITCase.java       |  79 +++++++
 flink-filesystems/flink-s3-fs-presto/pom.xml    |   9 +
 .../PrestoS3FileSystemBehaviorITCase.java       |  79 +++++++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java     | 101 ---------
 13 files changed, 765 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index c3e5a2f..d16108b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -70,11 +70,11 @@ public class LocalFileSystem extends FileSystem {
 
 	/** Path pointing to the current working directory.
 	 * Because Paths are not immutable, we cannot cache the proper path here */
-	private final String workingDir;
+	private final URI workingDir;
 
 	/** Path pointing to the current working directory.
 	 * Because Paths are not immutable, we cannot cache the proper path here. */
-	private final String homeDir;
+	private final URI homeDir;
 
 	/** The host name of this machine. */
 	private final String hostName;
@@ -83,8 +83,8 @@ public class LocalFileSystem extends FileSystem {
 	 * Constructs a new <code>LocalFileSystem</code> object.
 	 */
 	public LocalFileSystem() {
-		this.workingDir = new Path(System.getProperty("user.dir")).makeQualified(this).toString();
-		this.homeDir = new Path(System.getProperty("user.home")).toString();
+		this.workingDir = new File(System.getProperty("user.dir")).toURI();
+		this.homeDir = new File(System.getProperty("user.home")).toURI();
 
 		String tmp = "unknownHost";
 		try {
@@ -229,14 +229,25 @@ public class LocalFileSystem extends FileSystem {
 	 */
 	@Override
 	public boolean mkdirs(final Path f) throws IOException {
-		final File p2f = pathToFile(f);
+		checkNotNull(f, "path is null");
+		return mkdirsInternal(pathToFile(f));
+	}
 
-		if (p2f.isDirectory()) {
-			return true;
+	private boolean mkdirsInternal(File file) throws IOException {
+		if (file.isDirectory()) {
+				return true;
 		}
+		else if (file.exists() && !file.isDirectory()) {
+			// Important: The 'exists()' check above must come before the 'isDirectory()' check to
+			//            be safe when multiple parallel instances try to create the directory
 
-		final Path parent = f.getParent();
-		return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
+			// exists and is not a directory -> is a regular file
+			throw new FileAlreadyExistsException(file.getAbsolutePath());
+		}
+		else {
+			File parent = file.getParentFile();
+			return (parent == null || mkdirsInternal(parent)) && file.mkdir();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemBehaviorTestSuite.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemBehaviorTestSuite.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemBehaviorTestSuite.java
new file mode 100644
index 0000000..a06aff6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemBehaviorTestSuite.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Common tests for the behavior of {@link FileSystem} methods.
+ */
+public abstract class FileSystemBehaviorTestSuite {
+
+	private static final Random RND = new Random();
+
+	/** The cached file system instance. */
+	private FileSystem fs;
+
+	/** The cached base path. */
+	private Path basePath;
+
+	// ------------------------------------------------------------------------
+	//  FileSystem-specific methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets an instance of the {@code FileSystem} to be tested.
+	 */
+	public abstract FileSystem getFileSystem() throws Exception;
+
+	/**
+	 * Gets the base path in the file system under which tests will place their temporary files.
+	 */
+	public abstract Path getBasePath() throws Exception;
+
+	/**
+	 * Gets the kind of the file system (file system, object store, ...).
+	 */
+	public abstract FileSystemKind getFileSystemKind();
+
+	// ------------------------------------------------------------------------
+	//  Init / Cleanup
+	// ------------------------------------------------------------------------
+
+	@Before
+	public void prepare() throws Exception {
+		fs = getFileSystem();
+		basePath = new Path(getBasePath(), randomName());
+		fs.mkdirs(basePath);
+	}
+
+	@After
+	public void cleanup() throws Exception {
+		fs.delete(basePath, true);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+	// --- file system kind
+
+	@Test
+	public void testFileSystemKind() {
+		assertEquals(getFileSystemKind(), fs.getKind());
+	}
+
+	// --- access and scheme
+
+	@Test
+	public void testPathAndScheme() throws Exception {
+		assertEquals(fs.getUri(), getBasePath().getFileSystem().getUri());
+		assertEquals(fs.getUri().getScheme(), getBasePath().toUri().getScheme());
+	}
+
+	@Test
+	public void testHomeAndWorkDir() {
+		assertEquals(fs.getUri().getScheme(), fs.getWorkingDirectory().toUri().getScheme());
+		assertEquals(fs.getUri().getScheme(), fs.getHomeDirectory().toUri().getScheme());
+	}
+
+	// --- mkdirs
+
+	@Test
+	public void testMkdirsReturnsTrueWhenCreatingDirectory() throws Exception {
+		// this test applies to object stores as well, as rely on the fact that they
+		// return true when things are not bad
+
+		final Path directory = new Path(basePath, randomName());
+		assertTrue(fs.mkdirs(directory));
+
+		if (getFileSystemKind() != FileSystemKind.OBJECT_STORE) {
+			assertTrue(fs.exists(directory));
+		}
+	}
+
+	@Test
+	public void testMkdirsCreatesParentDirectories() throws Exception {
+		// this test applies to object stores as well, as rely on the fact that they
+		// return true when things are not bad
+
+		final Path directory = new Path(new Path(new Path(basePath, randomName()), randomName()), randomName());
+		assertTrue(fs.mkdirs(directory));
+
+		if (getFileSystemKind() != FileSystemKind.OBJECT_STORE) {
+			assertTrue(fs.exists(directory));
+		}
+	}
+
+	@Test
+	public void testMkdirsReturnsTrueForExistingDirectory() throws Exception {
+		// this test applies to object stores as well, as rely on the fact that they
+		// return true when things are not bad
+
+		final Path directory = new Path(basePath, randomName());
+
+		// make sure the directory exists
+		createRandomFileInDirectory(directory);
+
+		assertTrue(fs.mkdirs(directory));
+	}
+
+	@Test
+	public void testMkdirsFailsForExistingFile() throws Exception {
+		// test is not defined for object stores, they have no proper notion
+		// of directories
+		assumeNotObjectStore();
+
+		final Path file = new Path(getBasePath(), randomName());
+		createFile(file);
+
+		try {
+			fs.mkdirs(file);
+			fail("should fail with an IOException");
+		}
+		catch (IOException e) {
+			// good!
+		}
+	}
+
+	@Test
+	public void testMkdirsFailsWithExistingParentFile() throws Exception {
+		// test is not defined for object stores, they have no proper notion
+		// of directories
+		assumeNotObjectStore();
+
+		final Path file = new Path(getBasePath(), randomName());
+		createFile(file);
+
+		final Path dirUnderFile = new Path(file, randomName());
+		try {
+			fs.mkdirs(dirUnderFile);
+			fail("should fail with an IOException");
+		}
+		catch (IOException e) {
+			// good!
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static String randomName() {
+		return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
+	}
+
+	private void createFile(Path file) throws IOException {
+		try (FSDataOutputStream out = fs.create(file, WriteMode.NO_OVERWRITE)) {
+			out.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+		}
+	}
+
+	private void createRandomFileInDirectory(Path directory) throws IOException {
+		fs.mkdirs(directory);
+		createFile(new Path(directory, randomName()));
+	}
+
+	private void assumeNotObjectStore() {
+		Assume.assumeTrue("Test does not apply to object stores",
+				getFileSystemKind() != FileSystemKind.OBJECT_STORE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
index b4da2dc..6d53adb 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/PathTest.java
@@ -287,23 +287,20 @@ public class PathTest {
 
 	@Test
 	public void testMakeQualified() throws IOException {
-		String path;
-		Path p;
-		URI u;
+		// make relative path qualified
+		String path = "test/test";
+		Path p  = new Path(path).makeQualified(FileSystem.getLocalFileSystem());
+		URI u = p.toUri();
 
-		path = "test/test";
-		p = new Path(path);
-		u = p.toUri();
-		p = p.makeQualified(FileSystem.get(u));
-		u = p.toUri();
 		assertEquals("file", u.getScheme());
 		assertEquals(null, u.getAuthority());
-		assertEquals(FileSystem.getLocalFileSystem().getWorkingDirectory().toUri().getPath() + "/" + path, u.getPath());
 
+		String q = new Path(FileSystem.getLocalFileSystem().getWorkingDirectory().getPath(), path).getPath();
+		assertEquals(q, u.getPath());
+
+		// make absolute path qualified
 		path = "/test/test";
-		p = new Path(path);
-		u = p.toUri();
-		p = p.makeQualified(FileSystem.get(u));
+		p = new Path(path).makeQualified(FileSystem.getLocalFileSystem());
 		u = p.toUri();
 		assertEquals("file", u.getScheme());
 		assertEquals(null, u.getAuthority());

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemBehaviorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemBehaviorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemBehaviorTest.java
new file mode 100644
index 0000000..2a8522a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemBehaviorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Behavior tests for Flink's {@link LocalFileSystem}.
+ */
+public class LocalFileSystemBehaviorTest extends FileSystemBehaviorTestSuite {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	@Override
+	public FileSystem getFileSystem() throws Exception {
+		return LocalFileSystem.getSharedInstance();
+	}
+
+	@Override
+	public Path getBasePath() throws Exception {
+		return new Path(tmp.newFolder().toURI());
+	}
+
+	@Override
+	public FileSystemKind getFileSystemKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-hadoop-fs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/pom.xml b/flink-filesystems/flink-hadoop-fs/pom.xml
index 3085e63..0e7fae3 100644
--- a/flink-filesystems/flink-hadoop-fs/pom.xml
+++ b/flink-filesystems/flink-hadoop-fs/pom.xml
@@ -57,6 +57,32 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<!-- for the behavior test suite -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<!-- for the HDFS mini cluster test suite -->
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${hadoop.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopLocalFileSystemBehaviorTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopLocalFileSystemBehaviorTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopLocalFileSystemBehaviorTest.java
new file mode 100644
index 0000000..644744c
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopLocalFileSystemBehaviorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.util.VersionInfo;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Behavior tests for HDFS.
+ */
+public class HadoopLocalFileSystemBehaviorTest extends FileSystemBehaviorTestSuite {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	@Override
+	public FileSystem getFileSystem() throws Exception {
+		org.apache.hadoop.fs.FileSystem fs = new RawLocalFileSystem();
+		fs.initialize(LocalFileSystem.getLocalFsURI(), new Configuration());
+		return new HadoopFileSystem(fs);
+	}
+
+	@Override
+	public Path getBasePath() throws Exception {
+		return new Path(tmp.newFolder().toURI());
+	}
+
+	@Override
+	public FileSystemKind getFileSystemKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This test needs to be skipped for earlier Hadoop versions because those
+	 * have a bug.
+	 */
+	@Override
+	public void testMkdirsFailsForExistingFile() throws Exception {
+		final String versionString = VersionInfo.getVersion();
+		final String prefix = versionString.substring(0, 3);
+		final float version = Float.parseFloat(prefix);
+		Assume.assumeTrue("Cannot execute this test on Hadoop prior to 2.8", version >= 2.8f);
+
+		super.testMkdirsFailsForExistingFile();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsBehaviorTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsBehaviorTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsBehaviorTest.java
new file mode 100644
index 0000000..ebcc663
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsBehaviorTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Behavior tests for HDFS.
+ */
+public class HdfsBehaviorTest extends FileSystemBehaviorTestSuite {
+
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+
+	private static FileSystem fs;
+
+	private static Path basePath;
+
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void verifyOS() {
+		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
+	}
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File baseDir = TMP.newFolder();
+
+		Configuration hdConf = new Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		hdfsCluster = builder.build();
+
+		org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
+		fs = new HadoopFileSystem(hdfs);
+
+		basePath = new Path(hdfs.getUri().toString() + "/tests");
+	}
+
+	@AfterClass
+	public static void destroyHDFS() throws Exception {
+		if (hdfsCluster != null) {
+			hdfsCluster.getFileSystem().delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
+			hdfsCluster.shutdown();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public FileSystem getFileSystem() {
+		return fs;
+	}
+
+	@Override
+	public Path getBasePath() {
+		return basePath;
+	}
+
+	@Override
+	public FileSystemKind getFileSystemKind() {
+		// this tests tests only HDFS, so it should be a file system
+		return FileSystemKind.FILE_SYSTEM;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
new file mode 100644
index 0000000..69ecdb8
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for extracting the {@link FileSystemKind} from file systems that Flink
+ * accesses through Hadoop's File System interface.
+ *
+ * <p>This class needs to be in this package, because it accesses package private methods
+ * from the HDFS file system wrapper class.
+ */
+public class HdfsKindTest extends TestLogger {
+
+	@Test
+	public void testHdfsKind() throws IOException {
+		final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
+		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
+	}
+
+	@Test
+	public void testS3Kind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
+	}
+
+	@Test
+	public void testS3nKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
+			return;
+		}
+
+		final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
+	}
+
+	@Test
+	public void testS3aKind() throws IOException {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+		} catch (ClassNotFoundException ignored) {
+			// not in the classpath, cannot run this test
+			log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
+			return;
+		}
+
+		final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
+		assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
+	}
+
+	@Test
+	public void testS3fileSystemSchemes() {
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a"));
+		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS"));
+	}
+
+	@Test
+	public void testViewFs() {
+		assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-s3-fs-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
index 093ee08..76d43f8 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
@@ -183,6 +183,15 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<!-- for the behavior test suite -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 	</dependencies>
 
 	<!-- We need to bump the AWS dependencies compared to the ones referenced

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java
new file mode 100644
index 0000000..c8aaaee
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for the s3a-based S3 file system.
+ */
+public class HadoopS3FileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials exist
+		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
+		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
+		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		conf.setString("s3.access.key", ACCESS_KEY);
+		conf.setString("s3.secret.key", SECRET_KEY);
+		FileSystem.initialize(conf);
+	}
+
+	@AfterClass
+	public static void clearFsConfig() throws IOException {
+		FileSystem.initialize(new Configuration());
+	}
+
+	@Override
+	public FileSystem getFileSystem() throws Exception {
+		return getBasePath().getFileSystem();
+	}
+
+	@Override
+	public Path getBasePath() throws Exception {
+		return new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+	}
+
+	@Override
+	public FileSystemKind getFileSystemKind() {
+		return FileSystemKind.OBJECT_STORE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-s3-fs-presto/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml
index b871032..4bfc2f1 100644
--- a/flink-filesystems/flink-s3-fs-presto/pom.xml
+++ b/flink-filesystems/flink-s3-fs-presto/pom.xml
@@ -206,6 +206,15 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<!-- for the behavior test suite -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 	</dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java
new file mode 100644
index 0000000..812404c
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link FileSystemBehaviorTestSuite} for the s3a-based S3 file system.
+ */
+public class PrestoS3FileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials exist
+		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
+		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
+		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		conf.setString("s3.access.key", ACCESS_KEY);
+		conf.setString("s3.secret.key", SECRET_KEY);
+		FileSystem.initialize(conf);
+	}
+
+	@AfterClass
+	public static void clearFsConfig() throws IOException {
+		FileSystem.initialize(new Configuration());
+	}
+
+	@Override
+	public FileSystem getFileSystem() throws Exception {
+		return getBasePath().getFileSystem();
+	}
+
+	@Override
+	public Path getBasePath() throws Exception {
+		return new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+	}
+
+	@Override
+	public FileSystemKind getFileSystemKind() {
+		return FileSystemKind.OBJECT_STORE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d0ed12e/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
deleted file mode 100644
index 69ecdb8..0000000
--- a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.fs.hdfs;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemKind;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for extracting the {@link FileSystemKind} from file systems that Flink
- * accesses through Hadoop's File System interface.
- *
- * <p>This class needs to be in this package, because it accesses package private methods
- * from the HDFS file system wrapper class.
- */
-public class HdfsKindTest extends TestLogger {
-
-	@Test
-	public void testHdfsKind() throws IOException {
-		final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
-		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
-	}
-
-	@Test
-	public void testS3Kind() throws IOException {
-		try {
-			Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
-		} catch (ClassNotFoundException ignored) {
-			// not in the classpath, cannot run this test
-			log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
-			return;
-		}
-
-		final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
-		assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
-	}
-
-	@Test
-	public void testS3nKind() throws IOException {
-		try {
-			Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
-		} catch (ClassNotFoundException ignored) {
-			// not in the classpath, cannot run this test
-			log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
-			return;
-		}
-
-		final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
-		assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
-	}
-
-	@Test
-	public void testS3aKind() throws IOException {
-		try {
-			Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
-		} catch (ClassNotFoundException ignored) {
-			// not in the classpath, cannot run this test
-			log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
-			return;
-		}
-
-		final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
-		assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
-	}
-
-	@Test
-	public void testS3fileSystemSchemes() {
-		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3"));
-		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n"));
-		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a"));
-		assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS"));
-	}
-
-	@Test
-	public void testViewFs() {
-		assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs"));
-	}
-}