You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/18 14:34:10 UTC

[1/2] flink git commit: [FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs

Repository: flink
Updated Branches:
  refs/heads/master b00f1b326 -> 51b5b53c7


[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs

+ includes a new unit tests for recursive uploads to hfds:// targets
+ add a unit test for recursive file uploads to s3:// via s3a

[FLINK-4228][yarn/s3] turn the dependencies around

Instead of having flink-s3-fs-hadoop depend on flink-yarn_<scala_version>,
let flink-yarn depend on the s3 filesystem and implement the test there.
This is safer with regards to the scala-independent flink-s3-fs-hadoop project.

[FLINK-4228][yarn] change the S3 upload tests to use Hadoop's S3 implementations

This is how YARN would use it and what should really be tested.

[FLINK-4228][yarn] enable S3 tests for newer Hadoop versions

- requires the 'include_hadoop_aws' build profile (or property) to be set
- requires a newer aws-sdk version (than Hadoop pulls in) to work with our
  httpcomponents version
- we also add a check that at least one S3 implementation is tested to not
silently ignore all tests because of such a missing dependency

This closes #4939.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf8504db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf8504db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf8504db

Branch: refs/heads/master
Commit: cf8504dba606ee758ac16867423e65dbf6afc23a
Parents: b00f1b3
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Nov 9 15:04:50 2016 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Nov 18 10:58:07 2017 +0100

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 flink-yarn/pom.xml                              |  63 ++++++
 .../yarn/AbstractYarnClusterDescriptor.java     | 192 +++++++++++-----
 .../main/java/org/apache/flink/yarn/Utils.java  |  86 +++++---
 .../apache/flink/yarn/YarnFileStageTest.java    | 218 ++++++++++++++++++
 .../flink/yarn/YarnFileStageTestS3ITCase.java   | 220 +++++++++++++++++++
 6 files changed, 698 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index daf2186..5e2ef74 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -59,7 +59,7 @@ matrix:
     - jdk: "oraclejdk8"
       env:
         - TEST="misc"
-        - PROFILE="-Dhadoop.version=2.8.0"
+        - PROFILE="-Dhadoop.version=2.8.0 -Dinclude_hadoop_aws"
         - CACHE_NAME=JDK8_H280_M
     - jdk: "openjdk8"
       env:

http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 5eafcc4..6fc589e 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -31,6 +31,12 @@ under the License.
 	<name>flink-yarn</name>
 	<packaging>jar</packaging>
 
+	<properties>
+		<!-- for testing (will override Hadoop's default dependency on too low SDK versions that
+			do not work with our httpcomponents version) -->
+		<aws.sdk.version>1.11.171</aws.sdk.version>
+	</properties>
+
 	<dependencies>
 
 		<!-- core dependencies -->
@@ -153,6 +159,63 @@ under the License.
 				</plugins>
 			</build>
 		</profile>
+
+		<profile>
+			<!-- Hadoop >= 2.6 moved the S3 file systems from hadoop-common into hadoop-aws artifact
+				(see https://issues.apache.org/jira/browse/HADOOP-11074)
+				We can add the (test) dependency per default once 2.6 is the minimum required version.
+			-->
+			<id>include_hadoop_aws</id>
+			<activation>
+				<property>
+					<name>include_hadoop_aws</name>
+				</property>
+			</activation>
+			<dependencies>
+				<!-- for the S3 tests of YarnFileStageTestS3ITCase -->
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-aws</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+					<exclusions>
+						<exclusion>
+							<groupId>org.apache.avro</groupId>
+							<artifactId>avro</artifactId>
+						</exclusion>
+						<!-- The aws-java-sdk-core requires jackson 2.6, but
+							hadoop pulls in 2.3 -->
+						<exclusion>
+							<groupId>com.fasterxml.jackson.core</groupId>
+							<artifactId>jackson-annotations</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>com.fasterxml.jackson.core</groupId>
+							<artifactId>jackson-core</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>com.fasterxml.jackson.core</groupId>
+							<artifactId>jackson-databind</artifactId>
+						</exclusion>
+					</exclusions>
+				</dependency>
+				<!-- override Hadoop's default dependency on too low SDK versions that do not work
+					with our httpcomponents version when initialising the s3a file system -->
+				<dependency>
+					<groupId>com.amazonaws</groupId>
+					<artifactId>aws-java-sdk-s3</artifactId>
+					<version>${aws.sdk.version}</version>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+					<groupId>com.amazonaws</groupId>
+					<artifactId>aws-java-sdk-sts</artifactId>
+					<version>${aws.sdk.version}</version>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+
 	</profiles>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 8ecc371..5ac5c4e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
@@ -624,6 +625,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// Copy the application master jar to the filesystem
 		// Create a local resource to point to the destination jar path
 		final FileSystem fs = FileSystem.get(conf);
+		final Path homeDir = fs.getHomeDirectory();
 
 		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
 		if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
@@ -705,11 +707,25 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		StringBuilder envShipFileList = new StringBuilder();
 
 		// upload and register ship files
-		List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList);
+		List<String> systemClassPaths = uploadAndRegisterFiles(
+			systemShipFiles,
+			fs,
+			homeDir,
+			appId,
+			paths,
+			localResources,
+			envShipFileList);
 
 		List<String> userClassPaths;
 		if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
-			userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList);
+			userClassPaths = uploadAndRegisterFiles(
+				userJarFiles,
+				fs,
+				homeDir,
+				appId,
+				paths,
+				localResources,
+				envShipFileList);
 		} else {
 			userClassPaths = Collections.emptyList();
 		}
@@ -739,32 +755,29 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		// Setup jar for ApplicationMaster
-		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
-		Path remotePathJar = Utils.setupLocalResource(
+		Path remotePathJar = setupSingleLocalResource(
+			"flink.jar",
 			fs,
-			appId.toString(),
+			appId,
 			flinkJarPath,
-			appMasterJar,
-			fs.getHomeDirectory());
-
-		localResources.put("flink.jar", appMasterJar);
+			localResources,
+			homeDir,
+			"");
 
 		// Upload the flink configuration
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-
 		// write out configuration file
 		File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
 		tmpConfigurationFile.deleteOnExit();
 		BootstrapTools.writeConfiguration(flinkConfiguration, tmpConfigurationFile);
 
-		Path remotePathConf = Utils.setupLocalResource(
+		Path remotePathConf = setupSingleLocalResource(
+			"flink-conf.yaml",
 			fs,
-			appId.toString(),
+			appId,
 			new Path(tmpConfigurationFile.getAbsolutePath()),
-			flinkConf,
-			fs.getHomeDirectory());
-
-		localResources.put("flink-conf.yaml", flinkConf);
+			localResources,
+			homeDir,
+			"");
 
 		paths.add(remotePathJar);
 		classPathBuilder.append("flink.jar").append(File.pathSeparator);
@@ -781,11 +794,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					ObjectOutputStream obOutput = new ObjectOutputStream(output);){
 					obOutput.writeObject(jobGraph);
 				}
-				LocalResource jobgraph = Records.newRecord(LocalResource.class);
-				Path remoteJobGraph =
-						Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
-				localResources.put("job.graph", jobgraph);
-				paths.add(remoteJobGraph);
+
+				Path pathFromYarnURL = setupSingleLocalResource(
+					"job.graph",
+					fs,
+					appId,
+					new Path(fp.toURI()),
+					localResources,
+					homeDir,
+					"");
+				paths.add(pathFromYarnURL);
 				classPathBuilder.append("job.graph").append(File.pathSeparator);
 			} catch (Exception e) {
 				LOG.warn("Add job graph to local resource fail");
@@ -793,7 +811,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/');
+		Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/');
 
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 		fs.setPermission(yarnFilesDir, permission); // set permission for path.
@@ -810,32 +828,44 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			if (krb5Config != null && krb5Config.length() != 0) {
 				File krb5 = new File(krb5Config);
 				LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
-				LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
 				Path krb5ConfPath = new Path(krb5.getAbsolutePath());
-				remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
-				localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+				remoteKrb5Path = setupSingleLocalResource(
+					Utils.KRB5_FILE_NAME,
+					fs,
+					appId,
+					krb5ConfPath,
+					localResources,
+					homeDir,
+					"");
 
 				File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
 				LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
-				LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
 				Path yarnSitePath = new Path(f.getAbsolutePath());
-				remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory());
-				localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
-
+				remoteYarnSiteXmlPath = setupSingleLocalResource(
+					Utils.YARN_SITE_FILE_NAME,
+					fs,
+					appId,
+					yarnSitePath,
+					localResources,
+					homeDir,
+					"");
 				hasKrb5 = true;
 			}
 		}
 
 		// setup security tokens
-		LocalResource keytabResource = null;
 		Path remotePathKeytab = null;
 		String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
 		if (keytab != null) {
 			LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
-			keytabResource = Records.newRecord(LocalResource.class);
-			Path keytabPath = new Path(keytab);
-			remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
-			localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+			remotePathKeytab = setupSingleLocalResource(
+				Utils.KEYTAB_FILE_NAME,
+				fs,
+				appId,
+				new Path(keytab),
+				localResources,
+				homeDir,
+				"");
 		}
 
 		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
@@ -866,7 +896,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
 		appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
-		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
 		appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
@@ -876,7 +906,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
 		appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
 
-		if (keytabResource != null) {
+		if (remotePathKeytab != null) {
 			appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
 			String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
 			appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
@@ -981,25 +1011,54 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return report;
 	}
 
-	private static List<String> uploadAndRegisterFiles(
-			Collection<File> shipFiles,
+	/**
+	 * Uploads and registers a single resource and adds it to <tt>localResources</tt>.
+	 *
+	 * @param key
+	 * 		the key to add the resource under
+	 * @param fs
+	 * 		the remote file system to upload to
+	 * @param appId
+	 * 		application ID
+	 * @param localSrcPath
+	 * 		local path to the file
+	 * @param localResources
+	 * 		map of resources
+	 *
+	 * @return the remote path to the uploaded resource
+	 */
+	private static Path setupSingleLocalResource(
+			String key,
 			FileSystem fs,
-			String appId,
-			List<Path> remotePaths,
+			ApplicationId appId,
+			Path localSrcPath,
 			Map<String, LocalResource> localResources,
-			StringBuilder envShipFileList) throws IOException {
-		final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
-		for (File shipFile : shipFiles) {
-			LocalResource shipResources = Records.newRecord(LocalResource.class);
+			Path targetHomeDir,
+			String relativeTargetPath) throws IOException, URISyntaxException {
 
-			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
-			Path remotePath =
-				Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory());
+		Tuple2<Path, LocalResource> resource = Utils.setupLocalResource(
+			fs,
+			appId.toString(),
+			localSrcPath,
+			targetHomeDir,
+			relativeTargetPath);
 
-			remotePaths.add(remotePath);
+		localResources.put(key, resource.f1);
 
-			localResources.put(shipFile.getName(), shipResources);
+		return resource.f0;
+	}
+
+	static List<String> uploadAndRegisterFiles(
+			Collection<File> shipFiles,
+			FileSystem fs,
+			Path targetHomeDir,
+			ApplicationId appId,
+			List<Path> remotePaths,
+			Map<String, LocalResource> localResources,
+			StringBuilder envShipFileList) throws IOException, URISyntaxException {
 
+		final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
+		for (File shipFile : shipFiles) {
 			if (shipFile.isDirectory()) {
 				// add directories to the classpath
 				java.nio.file.Path shipPath = shipFile.toPath();
@@ -1011,17 +1070,40 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 						throws IOException {
 						java.nio.file.Path relativePath = parentPath.relativize(file);
 
-						classPaths.add(relativePath.toString());
-
-						return FileVisitResult.CONTINUE;
+						String key = relativePath.toString();
+						try {
+							Path remotePath = setupSingleLocalResource(
+								key,
+								fs,
+								appId,
+								new Path(file.toUri()),
+								localResources,
+								targetHomeDir,
+								relativePath.getParent().toString());
+							remotePaths.add(remotePath);
+							envShipFileList.append(key).append("=").append(remotePath).append(",");
+
+							// add files to the classpath
+							classPaths.add(key);
+
+							return FileVisitResult.CONTINUE;
+						} catch (URISyntaxException e) {
+							throw new IOException(e);
+						}
 					}
 				});
 			} else {
+				Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+				String key = shipFile.getName();
+				Path remotePath = setupSingleLocalResource(
+					key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
+				remotePaths.add(remotePath);
+				envShipFileList.append(key).append("=").append(remotePath).append(",");
+
 				// add files to the classpath
-				classPaths.add(shipFile.getName());
+				classPaths.add(key);
 			}
 
-			envShipFileList.append(remotePath).append(",");
 		}
 		return classPaths;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 32cbb64..652afec 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -117,33 +118,60 @@ public final class Utils {
 	}
 
 	/**
+	 * Copy a local file to a remote file system.
+	 *
+	 * @param fs
+	 * 		remote filesystem
+	 * @param appId
+	 * 		application ID
+	 * @param localSrcPath
+	 * 		path to the local file
+	 * @param homedir
+	 * 		remote home directory base (will be extended)
+	 * @param relativeTargetPath
+	 * 		relative target path of the file (will be prefixed be the full home directory we set up)
+	 *
 	 * @return Path to remote file (usually hdfs)
-	 * @throws IOException
 	 */
-	public static Path setupLocalResource(
-			FileSystem fs,
-			String appId, Path localRsrcPath,
-			LocalResource appMasterJar,
-			Path homedir) throws IOException {
+	static Tuple2<Path, LocalResource> setupLocalResource(
+		FileSystem fs,
+		String appId,
+		Path localSrcPath,
+		Path homedir,
+		String relativeTargetPath) throws IOException {
+
+		if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
+			throw new IllegalArgumentException("File to copy must not be a directory: " +
+				localSrcPath);
+		}
 
 		// copy resource to HDFS
-		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
+		String suffix =
+			".flink/"
+				+ appId
+				+ (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath)
+				+ "/" + localSrcPath.getName();
 
 		Path dst = new Path(homedir, suffix);
 
-		LOG.info("Copying from " + localRsrcPath + " to " + dst);
-		fs.copyFromLocalFile(localRsrcPath, dst);
-		registerLocalResource(fs, dst, appMasterJar);
-		return dst;
+		LOG.info("Copying from " + localSrcPath + " to " + dst);
+
+		fs.copyFromLocalFile(false, true, localSrcPath, dst);
+
+		// now create the resource instance
+		LocalResource resource = registerLocalResource(fs, dst);
+		return Tuple2.of(dst, resource);
 	}
 
-	public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
+	private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException {
+		LocalResource localResource = Records.newRecord(LocalResource.class);
 		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
 		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
 		localResource.setSize(jarStat.getLen());
 		localResource.setTimestamp(jarStat.getModificationTime());
 		localResource.setType(LocalResourceType.FILE);
 		localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+		return localResource;
 	}
 
 	public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
@@ -340,10 +368,9 @@ public final class Utils {
 		LocalResource keytabResource = null;
 		if (remoteKeytabPath != null) {
 			log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
-			keytabResource = Records.newRecord(LocalResource.class);
 			Path keytabPath = new Path(remoteKeytabPath);
 			FileSystem fs = keytabPath.getFileSystem(yarnConfig);
-			registerLocalResource(fs, keytabPath, keytabResource);
+			keytabResource = registerLocalResource(fs, keytabPath);
 		}
 
 		//To support Yarn Secure Integration Test Scenario
@@ -352,30 +379,28 @@ public final class Utils {
 		boolean hasKrb5 = false;
 		if (remoteYarnConfPath != null && remoteKrb5Path != null) {
 			log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
-			yarnConfResource = Records.newRecord(LocalResource.class);
 			Path yarnConfPath = new Path(remoteYarnConfPath);
 			FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
-			registerLocalResource(fs, yarnConfPath, yarnConfResource);
+			yarnConfResource = registerLocalResource(fs, yarnConfPath);
 
 			log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
-			krb5ConfResource = Records.newRecord(LocalResource.class);
 			Path krb5ConfPath = new Path(remoteKrb5Path);
 			fs = krb5ConfPath.getFileSystem(yarnConfig);
-			registerLocalResource(fs, krb5ConfPath, krb5ConfResource);
+			krb5ConfResource = registerLocalResource(fs, krb5ConfPath);
 
 			hasKrb5 = true;
 		}
 
 		// register Flink Jar with remote HDFS
-		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		final LocalResource flinkJar;
 		{
 			Path remoteJarPath = new Path(remoteFlinkJarPath);
 			FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
-			registerLocalResource(fs, remoteJarPath, flinkJar);
+			flinkJar = registerLocalResource(fs, remoteJarPath);
 		}
 
 		// register conf with local fs
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		final LocalResource flinkConf;
 		{
 			// write the TaskManager configuration to a local file
 			final File taskManagerConfigFile =
@@ -385,8 +410,13 @@ public final class Utils {
 
 			Path homeDirPath = new Path(clientHomeDir);
 			FileSystem fs = homeDirPath.getFileSystem(yarnConfig);
-			setupLocalResource(fs, appId,
-					new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+			flinkConf = setupLocalResource(
+				fs,
+				appId,
+				new Path(taskManagerConfigFile.toURI()),
+				homeDirPath,
+				"").f1;
 
 			log.info("Prepared local resource for modified yaml: {}", flinkConf);
 		}
@@ -408,10 +438,11 @@ public final class Utils {
 		// prepare additional files to be shipped
 		for (String pathStr : shipListString.split(",")) {
 			if (!pathStr.isEmpty()) {
-				LocalResource resource = Records.newRecord(LocalResource.class);
-				Path path = new Path(pathStr);
-				registerLocalResource(path.getFileSystem(yarnConfig), path, resource);
-				taskManagerLocalResources.put(path.getName(), resource);
+				String[] keyAndPath = pathStr.split("=");
+				require(keyAndPath.length == 2, "Invalid entry in ship file list: %s", pathStr);
+				Path path = new Path(keyAndPath[1]);
+				LocalResource resource = registerLocalResource(path.getFileSystem(yarnConfig), path);
+				taskManagerLocalResources.put(keyAndPath[0], resource);
 			}
 		}
 
@@ -488,4 +519,5 @@ public final class Utils {
 			throw new RuntimeException(String.format(message, values));
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
new file mode 100644
index 0000000..4d38253
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for verifying file staging during submission to YARN works.
+ */
+public class YarnFileStageTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder CLASS_TEMP_DIR = new TemporaryFolder();
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+
+	private static Path hdfsRootPath;
+
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	// ------------------------------------------------------------------------
+	//  Test setup and shutdown
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		Assume.assumeTrue(!OperatingSystem.isWindows());
+
+		final File tempDir = CLASS_TEMP_DIR.newFolder();
+
+		org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+		hdfsCluster = builder.build();
+		hdfsRootPath = new Path(hdfsCluster.getURI());
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+		hdfsCluster = null;
+		hdfsRootPath = null;
+	}
+
+	@Before
+	public void initConfig() {
+		hadoopConfig = new org.apache.hadoop.conf.Configuration();
+		hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString());
+	}
+
+	/**
+	 * Verifies that nested directories are properly copied with a <tt>hdfs://</tt> file
+	 * system (from a <tt>file:///absolute/path</tt> source path).
+	 */
+	@Test
+	public void testCopyFromLocalRecursiveWithScheme() throws Exception {
+		final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig);
+		final Path targetDir = targetFileSystem.getWorkingDirectory();
+
+		testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, true);
+	}
+
+	/**
+	 * Verifies that nested directories are properly copied with a <tt>hdfs://</tt> file
+	 * system (from a <tt>/absolute/path</tt> source path).
+	 */
+	@Test
+	public void testCopyFromLocalRecursiveWithoutScheme() throws Exception {
+		final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig);
+		final Path targetDir = targetFileSystem.getWorkingDirectory();
+
+		testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, false);
+	}
+
+	/**
+	 * Verifies that nested directories are properly copied with the given filesystem and paths.
+	 *
+	 * @param targetFileSystem
+	 * 		file system of the target path
+	 * @param targetDir
+	 * 		target path (URI like <tt>hdfs://...</tt>)
+	 * @param tempFolder
+	 * 		JUnit temporary folder rule to create the source directory with
+	 * @param addSchemeToLocalPath
+	 * 		whether add the <tt>file://</tt> scheme to the local path to copy from
+	 */
+	public static void testCopyFromLocalRecursive(
+			FileSystem targetFileSystem,
+			Path targetDir,
+			TemporaryFolder tempFolder,
+			boolean addSchemeToLocalPath) throws Exception {
+
+		// directory must not yet exist
+		assertFalse(targetFileSystem.exists(targetDir));
+
+		final File srcDir = tempFolder.newFolder();
+		final Path srcPath;
+		if (addSchemeToLocalPath) {
+			srcPath = new Path("file://" + srcDir.getAbsolutePath());
+		} else {
+			srcPath = new Path(srcDir.getAbsolutePath());
+		}
+
+		HashMap<String /* (relative) path */, /* contents */ String> srcFiles = new HashMap<>(4);
+
+		// create and fill source files
+		srcFiles.put("1", "Hello 1");
+		srcFiles.put("2", "Hello 2");
+		srcFiles.put("nested/3", "Hello nested/3");
+		srcFiles.put("nested/4/5", "Hello nested/4/5");
+		for (Map.Entry<String, String> src : srcFiles.entrySet()) {
+			File file = new File(srcDir, src.getKey());
+			//noinspection ResultOfMethodCallIgnored
+			file.getParentFile().mkdirs();
+			try (DataOutputStream out = new DataOutputStream(new FileOutputStream(file))) {
+				out.writeUTF(src.getValue());
+			}
+		}
+
+		// copy the created directory recursively:
+		try {
+			List<Path> remotePaths = new ArrayList<>();
+			HashMap<String, LocalResource> localResources = new HashMap<>();
+			AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
+				Collections.singletonList(new File(srcPath.toUri().getPath())),
+				targetFileSystem,
+				targetDir,
+				ApplicationId.newInstance(0, 0),
+				remotePaths,
+				localResources,
+				new StringBuilder());
+			assertEquals(srcFiles.size(), localResources.size());
+
+			Path workDir = ConverterUtils
+				.getPathFromYarnURL(localResources.get(srcPath.getName() + "/1").getResource())
+				.getParent();
+
+			RemoteIterator<LocatedFileStatus> targetFilesIterator =
+				targetFileSystem.listFiles(workDir, true);
+			HashMap<String /* (relative) path */, /* contents */ String> targetFiles =
+				new HashMap<>(4);
+
+			final int workDirPrefixLength =
+				workDir.toString().length() + 1; // one more for the concluding "/"
+			while (targetFilesIterator.hasNext()) {
+				LocatedFileStatus targetFile = targetFilesIterator.next();
+
+				try (FSDataInputStream in = targetFileSystem.open(targetFile.getPath())) {
+					String absolutePathString = targetFile.getPath().toString();
+					String relativePath = absolutePathString.substring(workDirPrefixLength);
+					targetFiles.put(relativePath, in.readUTF());
+
+					assertEquals("extraneous data in file " + relativePath, -1, in.read());
+				}
+			}
+
+			assertThat(targetFiles, equalTo(srcFiles));
+		} finally {
+			// clean up
+			targetFileSystem.delete(targetDir, true);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
new file mode 100644
index 0000000..74fb596
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNoException;
+
+/**
+ * Tests for verifying file staging during submission to YARN works with the S3A file system.
+ *
+ * <p>Note that the setup is similar to <tt>org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase</tt>.
+ */
+public class YarnFileStageTestS3ITCase extends TestLogger {
+
+	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+
+	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	/**
+	 * Number of tests executed.
+	 */
+	private static int numRecursiveUploadTests = 0;
+
+	/**
+	 * Will be updated by {@link #checkCredentialsAndSetup()} if the test is not skipped.
+	 */
+	private static boolean skipTest = true;
+
+	@BeforeClass
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials exist
+		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
+		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
+		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+		skipTest = false;
+
+		setupCustomHadoopConfig();
+	}
+
+	@AfterClass
+	public static void resetFileSystemConfiguration() throws IOException {
+		FileSystem.initialize(new Configuration());
+	}
+
+	@AfterClass
+	public static void checkAtLeastOneTestRun() {
+		if (!skipTest) {
+			assertThat(
+				"No S3 filesystem upload test executed. Please activate the " +
+					"'include_hadoop_aws' build profile or set '-Dinclude_hadoop_aws' during build " +
+					"(Hadoop >= 2.6 moved S3 filesystems out of hadoop-common).",
+				numRecursiveUploadTests, greaterThan(0));
+		}
+	}
+
+	/**
+	 * Create a Hadoop config file containing S3 access credentials.
+	 *
+	 * <p>Note that we cannot use them as part of the URL since this may fail if the credentials
+	 * contain a "/" (see <a href="https://issues.apache.org/jira/browse/HADOOP-3733">HADOOP-3733</a>).
+	 */
+	private static void setupCustomHadoopConfig() throws IOException {
+		File hadoopConfig = TEMP_FOLDER.newFile();
+		Map<String /* key */, String /* value */> parameters = new HashMap<>();
+
+		// set all different S3 fs implementation variants' configuration keys
+		parameters.put("fs.s3a.access.key", ACCESS_KEY);
+		parameters.put("fs.s3a.secret.key", SECRET_KEY);
+
+		parameters.put("fs.s3.awsAccessKeyId", ACCESS_KEY);
+		parameters.put("fs.s3.awsSecretAccessKey", SECRET_KEY);
+
+		parameters.put("fs.s3n.awsAccessKeyId", ACCESS_KEY);
+		parameters.put("fs.s3n.awsSecretAccessKey", SECRET_KEY);
+
+		try (PrintStream out = new PrintStream(new FileOutputStream(hadoopConfig))) {
+			out.println("<?xml version=\"1.0\"?>");
+			out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+			out.println("<configuration>");
+			for (Map.Entry<String, String> entry : parameters.entrySet()) {
+				out.println("\t<property>");
+				out.println("\t\t<name>" + entry.getKey() + "</name>");
+				out.println("\t\t<value>" + entry.getValue() + "</value>");
+				out.println("\t</property>");
+			}
+			out.println("</configuration>");
+		}
+
+		final Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.HDFS_SITE_CONFIG, hadoopConfig.getAbsolutePath());
+
+		FileSystem.initialize(conf);
+	}
+
+	/**
+	 * Verifies that nested directories are properly copied with to the given S3 path (using the
+	 * appropriate file system) during resource uploads for YARN.
+	 *
+	 * @param scheme
+	 * 		file system scheme
+	 * @param pathSuffix
+	 * 		test path suffix which will be the test's target path
+	 */
+	private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws Exception {
+		++numRecursiveUploadTests;
+
+		final Path basePath = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR);
+		final HadoopFileSystem fs = (HadoopFileSystem) basePath.getFileSystem();
+
+		assumeFalse(fs.exists(basePath));
+
+		try {
+			final Path directory = new Path(basePath, pathSuffix);
+
+			YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(),
+				new org.apache.hadoop.fs.Path(directory.toUri()), tempFolder, true);
+
+			// now directory must be gone
+			assertFalse(fs.exists(directory));
+		} finally {
+			// clean up
+			fs.delete(basePath, true);
+		}
+	}
+
+	/**
+	 * Verifies that nested directories are properly copied with a <tt>s3a://</tt> file
+	 * systems during resource uploads for YARN.
+	 */
+	@Test
+	public void testRecursiveUploadForYarnS3() throws Exception {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+		} catch (ClassNotFoundException e) {
+			// not in the classpath, cannot run this test
+			String msg = "Skipping test because S3FileSystem is not in the class path";
+			log.info(msg);
+			assumeNoException(msg, e);
+		}
+		testRecursiveUploadForYarn("s3", "testYarn-s3");
+	}
+
+	@Test
+	public void testRecursiveUploadForYarnS3n() throws Exception {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+		} catch (ClassNotFoundException e) {
+			// not in the classpath, cannot run this test
+			String msg = "Skipping test because NativeS3FileSystem is not in the class path";
+			log.info(msg);
+			assumeNoException(msg, e);
+		}
+		testRecursiveUploadForYarn("s3n", "testYarn-s3n");
+	}
+
+	@Test
+	public void testRecursiveUploadForYarnS3a() throws Exception {
+		try {
+			Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+		} catch (ClassNotFoundException e) {
+			// not in the classpath, cannot run this test
+			String msg = "Skipping test because S3AFileSystem is not in the class path";
+			log.info(msg);
+			assumeNoException(msg, e);
+		}
+		testRecursiveUploadForYarn("s3a", "testYarn-s3a");
+	}
+}


[2/2] flink git commit: [FLINK-7988][s3] fix HadoopS3FileSystemITCase leaving test directories behind in S3

Posted by tr...@apache.org.
[FLINK-7988][s3] fix HadoopS3FileSystemITCase leaving test directories behind in S3

This closes #4950.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b5b53c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b5b53c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b5b53c

Branch: refs/heads/master
Commit: 51b5b53c7cd7781959011ba48559c5361ac93ff9
Parents: cf8504d
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 2 19:38:48 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Nov 18 10:58:08 2017 +0100

----------------------------------------------------------------------
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java   | 50 +++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51b5b53c/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index 88f13ed..8c646f0 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -57,11 +58,58 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
 	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
 
+	/**
+	 * Will be updated by {@link #checkCredentialsAndSetup()} if the test is not skipped.
+	 */
+	private static boolean skipTest = true;
+
 	@BeforeClass
-	public static void checkIfCredentialsArePresent() {
+	public static void checkCredentialsAndSetup() throws IOException {
+		// check whether credentials exist
 		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
 		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
 		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+		// initialize configuration with valid credentials
+		final Configuration conf = new Configuration();
+		conf.setString("s3.access.key", ACCESS_KEY);
+		conf.setString("s3.secret.key", SECRET_KEY);
+		FileSystem.initialize(conf);
+
+		// check for uniqueness of the test directory
+		final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+		final FileSystem fs = directory.getFileSystem();
+
+		// directory must not yet exist
+		assertFalse(fs.exists(directory));
+
+		// reset configuration
+		FileSystem.initialize(new Configuration());
+
+		skipTest = false;
+	}
+
+	@AfterClass
+	public static void cleanUp() throws IOException {
+		if (!skipTest) {
+			// initialize configuration with valid credentials
+			final Configuration conf = new Configuration();
+			conf.setString("s3.access.key", ACCESS_KEY);
+			conf.setString("s3.secret.key", SECRET_KEY);
+			FileSystem.initialize(conf);
+
+			final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+			final FileSystem fs = directory.getFileSystem();
+
+			// clean up
+			fs.delete(directory, true);
+
+			// now directory must be gone
+			assertFalse(fs.exists(directory));
+
+			// reset configuration
+			FileSystem.initialize(new Configuration());
+		}
 	}
 
 	@Test