You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/18 14:34:10 UTC
[1/2] flink git commit: [FLINK-4228][yarn/s3] fix for yarn staging
with s3a defaultFs
Repository: flink
Updated Branches:
refs/heads/master b00f1b326 -> 51b5b53c7
[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs
+ includes a new unit tests for recursive uploads to hfds:// targets
+ add a unit test for recursive file uploads to s3:// via s3a
[FLINK-4228][yarn/s3] turn the dependencies around
Instead of having flink-s3-fs-hadoop depend on flink-yarn_<scala_version>,
let flink-yarn depend on the s3 filesystem and implement the test there.
This is safer with regards to the scala-independent flink-s3-fs-hadoop project.
[FLINK-4228][yarn] change the S3 upload tests to use Hadoop's S3 implementations
This is how YARN would use it and what should really be tested.
[FLINK-4228][yarn] enable S3 tests for newer Hadoop versions
- requires the 'include_hadoop_aws' build profile (or property) to be set
- requires a newer aws-sdk version (than Hadoop pulls in) to work with our
httpcomponents version
- we also add a check that at least one S3 implementation is tested to not
silently ignore all tests because of such a missing dependency
This closes #4939.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf8504db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf8504db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf8504db
Branch: refs/heads/master
Commit: cf8504dba606ee758ac16867423e65dbf6afc23a
Parents: b00f1b3
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Nov 9 15:04:50 2016 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Nov 18 10:58:07 2017 +0100
----------------------------------------------------------------------
.travis.yml | 2 +-
flink-yarn/pom.xml | 63 ++++++
.../yarn/AbstractYarnClusterDescriptor.java | 192 +++++++++++-----
.../main/java/org/apache/flink/yarn/Utils.java | 86 +++++---
.../apache/flink/yarn/YarnFileStageTest.java | 218 ++++++++++++++++++
.../flink/yarn/YarnFileStageTestS3ITCase.java | 220 +++++++++++++++++++
6 files changed, 698 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index daf2186..5e2ef74 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -59,7 +59,7 @@ matrix:
- jdk: "oraclejdk8"
env:
- TEST="misc"
- - PROFILE="-Dhadoop.version=2.8.0"
+ - PROFILE="-Dhadoop.version=2.8.0 -Dinclude_hadoop_aws"
- CACHE_NAME=JDK8_H280_M
- jdk: "openjdk8"
env:
http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 5eafcc4..6fc589e 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -31,6 +31,12 @@ under the License.
<name>flink-yarn</name>
<packaging>jar</packaging>
+ <properties>
+ <!-- for testing (will override Hadoop's default dependency on too low SDK versions that
+ do not work with our httpcomponents version) -->
+ <aws.sdk.version>1.11.171</aws.sdk.version>
+ </properties>
+
<dependencies>
<!-- core dependencies -->
@@ -153,6 +159,63 @@ under the License.
</plugins>
</build>
</profile>
+
+ <profile>
+ <!-- Hadoop >= 2.6 moved the S3 file systems from hadoop-common into hadoop-aws artifact
+ (see https://issues.apache.org/jira/browse/HADOOP-11074)
+ We can add the (test) dependency per default once 2.6 is the minimum required version.
+ -->
+ <id>include_hadoop_aws</id>
+ <activation>
+ <property>
+ <name>include_hadoop_aws</name>
+ </property>
+ </activation>
+ <dependencies>
+ <!-- for the S3 tests of YarnFileStageTestS3ITCase -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <!-- The aws-java-sdk-core requires jackson 2.6, but
+ hadoop pulls in 2.3 -->
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- override Hadoop's default dependency on too low SDK versions that do not work
+ with our httpcomponents version when initialising the s3a file system -->
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws.sdk.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-sts</artifactId>
+ <version>${aws.sdk.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+
</profiles>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 8ecc371..5ac5c4e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.ConfigConstants;
@@ -624,6 +625,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);
+ final Path homeDir = fs.getHomeDirectory();
// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
@@ -705,11 +707,25 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
StringBuilder envShipFileList = new StringBuilder();
// upload and register ship files
- List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList);
+ List<String> systemClassPaths = uploadAndRegisterFiles(
+ systemShipFiles,
+ fs,
+ homeDir,
+ appId,
+ paths,
+ localResources,
+ envShipFileList);
List<String> userClassPaths;
if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
- userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList);
+ userClassPaths = uploadAndRegisterFiles(
+ userJarFiles,
+ fs,
+ homeDir,
+ appId,
+ paths,
+ localResources,
+ envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}
@@ -739,32 +755,29 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
// Setup jar for ApplicationMaster
- LocalResource appMasterJar = Records.newRecord(LocalResource.class);
- Path remotePathJar = Utils.setupLocalResource(
+ Path remotePathJar = setupSingleLocalResource(
+ "flink.jar",
fs,
- appId.toString(),
+ appId,
flinkJarPath,
- appMasterJar,
- fs.getHomeDirectory());
-
- localResources.put("flink.jar", appMasterJar);
+ localResources,
+ homeDir,
+ "");
// Upload the flink configuration
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
-
// write out configuration file
File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
tmpConfigurationFile.deleteOnExit();
BootstrapTools.writeConfiguration(flinkConfiguration, tmpConfigurationFile);
- Path remotePathConf = Utils.setupLocalResource(
+ Path remotePathConf = setupSingleLocalResource(
+ "flink-conf.yaml",
fs,
- appId.toString(),
+ appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
- flinkConf,
- fs.getHomeDirectory());
-
- localResources.put("flink-conf.yaml", flinkConf);
+ localResources,
+ homeDir,
+ "");
paths.add(remotePathJar);
classPathBuilder.append("flink.jar").append(File.pathSeparator);
@@ -781,11 +794,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
ObjectOutputStream obOutput = new ObjectOutputStream(output);){
obOutput.writeObject(jobGraph);
}
- LocalResource jobgraph = Records.newRecord(LocalResource.class);
- Path remoteJobGraph =
- Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
- localResources.put("job.graph", jobgraph);
- paths.add(remoteJobGraph);
+
+ Path pathFromYarnURL = setupSingleLocalResource(
+ "job.graph",
+ fs,
+ appId,
+ new Path(fp.toURI()),
+ localResources,
+ homeDir,
+ "");
+ paths.add(pathFromYarnURL);
classPathBuilder.append("job.graph").append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
@@ -793,7 +811,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/');
+ Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/');
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.
@@ -810,32 +828,44 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
- LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
Path krb5ConfPath = new Path(krb5.getAbsolutePath());
- remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
- localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+ remoteKrb5Path = setupSingleLocalResource(
+ Utils.KRB5_FILE_NAME,
+ fs,
+ appId,
+ krb5ConfPath,
+ localResources,
+ homeDir,
+ "");
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
- LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
Path yarnSitePath = new Path(f.getAbsolutePath());
- remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory());
- localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
-
+ remoteYarnSiteXmlPath = setupSingleLocalResource(
+ Utils.YARN_SITE_FILE_NAME,
+ fs,
+ appId,
+ yarnSitePath,
+ localResources,
+ homeDir,
+ "");
hasKrb5 = true;
}
}
// setup security tokens
- LocalResource keytabResource = null;
Path remotePathKeytab = null;
String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
- keytabResource = Records.newRecord(LocalResource.class);
- Path keytabPath = new Path(keytab);
- remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
- localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+ remotePathKeytab = setupSingleLocalResource(
+ Utils.KEYTAB_FILE_NAME,
+ fs,
+ appId,
+ new Path(keytab),
+ localResources,
+ homeDir,
+ "");
}
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
@@ -866,7 +896,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
- appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+ appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
@@ -876,7 +906,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
- if (keytabResource != null) {
+ if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
@@ -981,25 +1011,54 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return report;
}
- private static List<String> uploadAndRegisterFiles(
- Collection<File> shipFiles,
+ /**
+ * Uploads and registers a single resource and adds it to <tt>localResources</tt>.
+ *
+ * @param key
+ * the key to add the resource under
+ * @param fs
+ * the remote file system to upload to
+ * @param appId
+ * application ID
+ * @param localSrcPath
+ * local path to the file
+ * @param localResources
+ * map of resources
+ *
+ * @return the remote path to the uploaded resource
+ */
+ private static Path setupSingleLocalResource(
+ String key,
FileSystem fs,
- String appId,
- List<Path> remotePaths,
+ ApplicationId appId,
+ Path localSrcPath,
Map<String, LocalResource> localResources,
- StringBuilder envShipFileList) throws IOException {
- final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
- for (File shipFile : shipFiles) {
- LocalResource shipResources = Records.newRecord(LocalResource.class);
+ Path targetHomeDir,
+ String relativeTargetPath) throws IOException, URISyntaxException {
- Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
- Path remotePath =
- Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory());
+ Tuple2<Path, LocalResource> resource = Utils.setupLocalResource(
+ fs,
+ appId.toString(),
+ localSrcPath,
+ targetHomeDir,
+ relativeTargetPath);
- remotePaths.add(remotePath);
+ localResources.put(key, resource.f1);
- localResources.put(shipFile.getName(), shipResources);
+ return resource.f0;
+ }
+
+ static List<String> uploadAndRegisterFiles(
+ Collection<File> shipFiles,
+ FileSystem fs,
+ Path targetHomeDir,
+ ApplicationId appId,
+ List<Path> remotePaths,
+ Map<String, LocalResource> localResources,
+ StringBuilder envShipFileList) throws IOException, URISyntaxException {
+ final List<String> classPaths = new ArrayList<>(2 + shipFiles.size());
+ for (File shipFile : shipFiles) {
if (shipFile.isDirectory()) {
// add directories to the classpath
java.nio.file.Path shipPath = shipFile.toPath();
@@ -1011,17 +1070,40 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
throws IOException {
java.nio.file.Path relativePath = parentPath.relativize(file);
- classPaths.add(relativePath.toString());
-
- return FileVisitResult.CONTINUE;
+ String key = relativePath.toString();
+ try {
+ Path remotePath = setupSingleLocalResource(
+ key,
+ fs,
+ appId,
+ new Path(file.toUri()),
+ localResources,
+ targetHomeDir,
+ relativePath.getParent().toString());
+ remotePaths.add(remotePath);
+ envShipFileList.append(key).append("=").append(remotePath).append(",");
+
+ // add files to the classpath
+ classPaths.add(key);
+
+ return FileVisitResult.CONTINUE;
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
}
});
} else {
+ Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+ String key = shipFile.getName();
+ Path remotePath = setupSingleLocalResource(
+ key, fs, appId, shipLocalPath, localResources, targetHomeDir, "");
+ remotePaths.add(remotePath);
+ envShipFileList.append(key).append("=").append(remotePath).append(",");
+
// add files to the classpath
- classPaths.add(shipFile.getName());
+ classPaths.add(key);
}
- envShipFileList.append(remotePath).append(",");
}
return classPaths;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 32cbb64..652afec 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -117,33 +118,60 @@ public final class Utils {
}
/**
+ * Copy a local file to a remote file system.
+ *
+ * @param fs
+ * remote filesystem
+ * @param appId
+ * application ID
+ * @param localSrcPath
+ * path to the local file
+ * @param homedir
+ * remote home directory base (will be extended)
+ * @param relativeTargetPath
+ * relative target path of the file (will be prefixed be the full home directory we set up)
+ *
* @return Path to remote file (usually hdfs)
- * @throws IOException
*/
- public static Path setupLocalResource(
- FileSystem fs,
- String appId, Path localRsrcPath,
- LocalResource appMasterJar,
- Path homedir) throws IOException {
+ static Tuple2<Path, LocalResource> setupLocalResource(
+ FileSystem fs,
+ String appId,
+ Path localSrcPath,
+ Path homedir,
+ String relativeTargetPath) throws IOException {
+
+ if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
+ throw new IllegalArgumentException("File to copy must not be a directory: " +
+ localSrcPath);
+ }
// copy resource to HDFS
- String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
+ String suffix =
+ ".flink/"
+ + appId
+ + (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath)
+ + "/" + localSrcPath.getName();
Path dst = new Path(homedir, suffix);
- LOG.info("Copying from " + localRsrcPath + " to " + dst);
- fs.copyFromLocalFile(localRsrcPath, dst);
- registerLocalResource(fs, dst, appMasterJar);
- return dst;
+ LOG.info("Copying from " + localSrcPath + " to " + dst);
+
+ fs.copyFromLocalFile(false, true, localSrcPath, dst);
+
+ // now create the resource instance
+ LocalResource resource = registerLocalResource(fs, dst);
+ return Tuple2.of(dst, resource);
}
- public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
+ private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException {
+ LocalResource localResource = Records.newRecord(LocalResource.class);
FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
localResource.setSize(jarStat.getLen());
localResource.setTimestamp(jarStat.getModificationTime());
localResource.setType(LocalResourceType.FILE);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ return localResource;
}
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
@@ -340,10 +368,9 @@ public final class Utils {
LocalResource keytabResource = null;
if (remoteKeytabPath != null) {
log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
- keytabResource = Records.newRecord(LocalResource.class);
Path keytabPath = new Path(remoteKeytabPath);
FileSystem fs = keytabPath.getFileSystem(yarnConfig);
- registerLocalResource(fs, keytabPath, keytabResource);
+ keytabResource = registerLocalResource(fs, keytabPath);
}
//To support Yarn Secure Integration Test Scenario
@@ -352,30 +379,28 @@ public final class Utils {
boolean hasKrb5 = false;
if (remoteYarnConfPath != null && remoteKrb5Path != null) {
log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
- yarnConfResource = Records.newRecord(LocalResource.class);
Path yarnConfPath = new Path(remoteYarnConfPath);
FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
- registerLocalResource(fs, yarnConfPath, yarnConfResource);
+ yarnConfResource = registerLocalResource(fs, yarnConfPath);
log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
- krb5ConfResource = Records.newRecord(LocalResource.class);
Path krb5ConfPath = new Path(remoteKrb5Path);
fs = krb5ConfPath.getFileSystem(yarnConfig);
- registerLocalResource(fs, krb5ConfPath, krb5ConfResource);
+ krb5ConfResource = registerLocalResource(fs, krb5ConfPath);
hasKrb5 = true;
}
// register Flink Jar with remote HDFS
- LocalResource flinkJar = Records.newRecord(LocalResource.class);
+ final LocalResource flinkJar;
{
Path remoteJarPath = new Path(remoteFlinkJarPath);
FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
- registerLocalResource(fs, remoteJarPath, flinkJar);
+ flinkJar = registerLocalResource(fs, remoteJarPath);
}
// register conf with local fs
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
+ final LocalResource flinkConf;
{
// write the TaskManager configuration to a local file
final File taskManagerConfigFile =
@@ -385,8 +410,13 @@ public final class Utils {
Path homeDirPath = new Path(clientHomeDir);
FileSystem fs = homeDirPath.getFileSystem(yarnConfig);
- setupLocalResource(fs, appId,
- new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+ flinkConf = setupLocalResource(
+ fs,
+ appId,
+ new Path(taskManagerConfigFile.toURI()),
+ homeDirPath,
+ "").f1;
log.info("Prepared local resource for modified yaml: {}", flinkConf);
}
@@ -408,10 +438,11 @@ public final class Utils {
// prepare additional files to be shipped
for (String pathStr : shipListString.split(",")) {
if (!pathStr.isEmpty()) {
- LocalResource resource = Records.newRecord(LocalResource.class);
- Path path = new Path(pathStr);
- registerLocalResource(path.getFileSystem(yarnConfig), path, resource);
- taskManagerLocalResources.put(path.getName(), resource);
+ String[] keyAndPath = pathStr.split("=");
+ require(keyAndPath.length == 2, "Invalid entry in ship file list: %s", pathStr);
+ Path path = new Path(keyAndPath[1]);
+ LocalResource resource = registerLocalResource(path.getFileSystem(yarnConfig), path);
+ taskManagerLocalResources.put(keyAndPath[0], resource);
}
}
@@ -488,4 +519,5 @@ public final class Utils {
throw new RuntimeException(String.format(message, values));
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
new file mode 100644
index 0000000..4d38253
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for verifying file staging during submission to YARN works.
+ */
+public class YarnFileStageTest extends TestLogger {
+
+ @ClassRule
+ public static final TemporaryFolder CLASS_TEMP_DIR = new TemporaryFolder();
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster hdfsCluster;
+
+ private static Path hdfsRootPath;
+
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ // ------------------------------------------------------------------------
+ // Test setup and shutdown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ Assume.assumeTrue(!OperatingSystem.isWindows());
+
+ final File tempDir = CLASS_TEMP_DIR.newFolder();
+
+ org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+ hdfsRootPath = new Path(hdfsCluster.getURI());
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
+ hdfsCluster = null;
+ hdfsRootPath = null;
+ }
+
+ @Before
+ public void initConfig() {
+ hadoopConfig = new org.apache.hadoop.conf.Configuration();
+ hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString());
+ }
+
+ /**
+ * Verifies that nested directories are properly copied with a <tt>hdfs://</tt> file
+ * system (from a <tt>file:///absolute/path</tt> source path).
+ */
+ @Test
+ public void testCopyFromLocalRecursiveWithScheme() throws Exception {
+ final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig);
+ final Path targetDir = targetFileSystem.getWorkingDirectory();
+
+ testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, true);
+ }
+
+ /**
+ * Verifies that nested directories are properly copied with a <tt>hdfs://</tt> file
+ * system (from a <tt>/absolute/path</tt> source path).
+ */
+ @Test
+ public void testCopyFromLocalRecursiveWithoutScheme() throws Exception {
+ final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig);
+ final Path targetDir = targetFileSystem.getWorkingDirectory();
+
+ testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, false);
+ }
+
+ /**
+ * Verifies that nested directories are properly copied with the given filesystem and paths.
+ *
+ * @param targetFileSystem
+ * file system of the target path
+ * @param targetDir
+ * target path (URI like <tt>hdfs://...</tt>)
+ * @param tempFolder
+ * JUnit temporary folder rule to create the source directory with
+ * @param addSchemeToLocalPath
+ * whether add the <tt>file://</tt> scheme to the local path to copy from
+ */
+ public static void testCopyFromLocalRecursive(
+ FileSystem targetFileSystem,
+ Path targetDir,
+ TemporaryFolder tempFolder,
+ boolean addSchemeToLocalPath) throws Exception {
+
+ // directory must not yet exist
+ assertFalse(targetFileSystem.exists(targetDir));
+
+ final File srcDir = tempFolder.newFolder();
+ final Path srcPath;
+ if (addSchemeToLocalPath) {
+ srcPath = new Path("file://" + srcDir.getAbsolutePath());
+ } else {
+ srcPath = new Path(srcDir.getAbsolutePath());
+ }
+
+ HashMap<String /* (relative) path */, /* contents */ String> srcFiles = new HashMap<>(4);
+
+ // create and fill source files
+ srcFiles.put("1", "Hello 1");
+ srcFiles.put("2", "Hello 2");
+ srcFiles.put("nested/3", "Hello nested/3");
+ srcFiles.put("nested/4/5", "Hello nested/4/5");
+ for (Map.Entry<String, String> src : srcFiles.entrySet()) {
+ File file = new File(srcDir, src.getKey());
+ //noinspection ResultOfMethodCallIgnored
+ file.getParentFile().mkdirs();
+ try (DataOutputStream out = new DataOutputStream(new FileOutputStream(file))) {
+ out.writeUTF(src.getValue());
+ }
+ }
+
+ // copy the created directory recursively:
+ try {
+ List<Path> remotePaths = new ArrayList<>();
+ HashMap<String, LocalResource> localResources = new HashMap<>();
+ AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
+ Collections.singletonList(new File(srcPath.toUri().getPath())),
+ targetFileSystem,
+ targetDir,
+ ApplicationId.newInstance(0, 0),
+ remotePaths,
+ localResources,
+ new StringBuilder());
+ assertEquals(srcFiles.size(), localResources.size());
+
+ Path workDir = ConverterUtils
+ .getPathFromYarnURL(localResources.get(srcPath.getName() + "/1").getResource())
+ .getParent();
+
+ RemoteIterator<LocatedFileStatus> targetFilesIterator =
+ targetFileSystem.listFiles(workDir, true);
+ HashMap<String /* (relative) path */, /* contents */ String> targetFiles =
+ new HashMap<>(4);
+
+ final int workDirPrefixLength =
+ workDir.toString().length() + 1; // one more for the concluding "/"
+ while (targetFilesIterator.hasNext()) {
+ LocatedFileStatus targetFile = targetFilesIterator.next();
+
+ try (FSDataInputStream in = targetFileSystem.open(targetFile.getPath())) {
+ String absolutePathString = targetFile.getPath().toString();
+ String relativePath = absolutePathString.substring(workDirPrefixLength);
+ targetFiles.put(relativePath, in.readUTF());
+
+ assertEquals("extraneous data in file " + relativePath, -1, in.read());
+ }
+ }
+
+ assertThat(targetFiles, equalTo(srcFiles));
+ } finally {
+ // clean up
+ targetFileSystem.delete(targetDir, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf8504db/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
new file mode 100644
index 0000000..74fb596
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNoException;
+
+/**
+ * Tests for verifying file staging during submission to YARN works with the S3A file system.
+ *
+ * <p>Note that the setup is similar to <tt>org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase</tt>.
+ */
+public class YarnFileStageTestS3ITCase extends TestLogger {
+
+ private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
+
+ private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+ private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+ private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ /**
+ * Number of tests executed.
+ */
+ private static int numRecursiveUploadTests = 0;
+
+ /**
+ * Will be updated by {@link #checkCredentialsAndSetup()} if the test is not skipped.
+ */
+ private static boolean skipTest = true;
+
+ @BeforeClass
+ public static void checkCredentialsAndSetup() throws IOException {
+ // check whether credentials exist
+ Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
+ Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
+ Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+ skipTest = false;
+
+ setupCustomHadoopConfig();
+ }
+
+ @AfterClass
+ public static void resetFileSystemConfiguration() throws IOException {
+ FileSystem.initialize(new Configuration());
+ }
+
+ @AfterClass
+ public static void checkAtLeastOneTestRun() {
+ if (!skipTest) {
+ assertThat(
+ "No S3 filesystem upload test executed. Please activate the " +
+ "'include_hadoop_aws' build profile or set '-Dinclude_hadoop_aws' during build " +
+ "(Hadoop >= 2.6 moved S3 filesystems out of hadoop-common).",
+ numRecursiveUploadTests, greaterThan(0));
+ }
+ }
+
+ /**
+ * Create a Hadoop config file containing S3 access credentials.
+ *
+ * <p>Note that we cannot use them as part of the URL since this may fail if the credentials
+ * contain a "/" (see <a href="https://issues.apache.org/jira/browse/HADOOP-3733">HADOOP-3733</a>).
+ */
+ private static void setupCustomHadoopConfig() throws IOException {
+ File hadoopConfig = TEMP_FOLDER.newFile();
+ Map<String /* key */, String /* value */> parameters = new HashMap<>();
+
+ // set all different S3 fs implementation variants' configuration keys
+ parameters.put("fs.s3a.access.key", ACCESS_KEY);
+ parameters.put("fs.s3a.secret.key", SECRET_KEY);
+
+ parameters.put("fs.s3.awsAccessKeyId", ACCESS_KEY);
+ parameters.put("fs.s3.awsSecretAccessKey", SECRET_KEY);
+
+ parameters.put("fs.s3n.awsAccessKeyId", ACCESS_KEY);
+ parameters.put("fs.s3n.awsSecretAccessKey", SECRET_KEY);
+
+ try (PrintStream out = new PrintStream(new FileOutputStream(hadoopConfig))) {
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+ out.println("<configuration>");
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ out.println("\t<property>");
+ out.println("\t\t<name>" + entry.getKey() + "</name>");
+ out.println("\t\t<value>" + entry.getValue() + "</value>");
+ out.println("\t</property>");
+ }
+ out.println("</configuration>");
+ }
+
+ final Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.HDFS_SITE_CONFIG, hadoopConfig.getAbsolutePath());
+
+ FileSystem.initialize(conf);
+ }
+
+ /**
+ * Verifies that nested directories are properly copied with to the given S3 path (using the
+ * appropriate file system) during resource uploads for YARN.
+ *
+ * @param scheme
+ * file system scheme
+ * @param pathSuffix
+ * test path suffix which will be the test's target path
+ */
+ private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws Exception {
+ ++numRecursiveUploadTests;
+
+ final Path basePath = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR);
+ final HadoopFileSystem fs = (HadoopFileSystem) basePath.getFileSystem();
+
+ assumeFalse(fs.exists(basePath));
+
+ try {
+ final Path directory = new Path(basePath, pathSuffix);
+
+ YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(),
+ new org.apache.hadoop.fs.Path(directory.toUri()), tempFolder, true);
+
+ // now directory must be gone
+ assertFalse(fs.exists(directory));
+ } finally {
+ // clean up
+ fs.delete(basePath, true);
+ }
+ }
+
+ /**
+ * Verifies that nested directories are properly copied with a <tt>s3a://</tt> file
+ * systems during resource uploads for YARN.
+ */
+ @Test
+ public void testRecursiveUploadForYarnS3() throws Exception {
+ try {
+ Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+ } catch (ClassNotFoundException e) {
+ // not in the classpath, cannot run this test
+ String msg = "Skipping test because S3FileSystem is not in the class path";
+ log.info(msg);
+ assumeNoException(msg, e);
+ }
+ testRecursiveUploadForYarn("s3", "testYarn-s3");
+ }
+
+ @Test
+ public void testRecursiveUploadForYarnS3n() throws Exception {
+ try {
+ Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+ } catch (ClassNotFoundException e) {
+ // not in the classpath, cannot run this test
+ String msg = "Skipping test because NativeS3FileSystem is not in the class path";
+ log.info(msg);
+ assumeNoException(msg, e);
+ }
+ testRecursiveUploadForYarn("s3n", "testYarn-s3n");
+ }
+
+ @Test
+ public void testRecursiveUploadForYarnS3a() throws Exception {
+ try {
+ Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+ } catch (ClassNotFoundException e) {
+ // not in the classpath, cannot run this test
+ String msg = "Skipping test because S3AFileSystem is not in the class path";
+ log.info(msg);
+ assumeNoException(msg, e);
+ }
+ testRecursiveUploadForYarn("s3a", "testYarn-s3a");
+ }
+}
[2/2] flink git commit: [FLINK-7988][s3] fix HadoopS3FileSystemITCase
leaving test directories behind in S3
Posted by tr...@apache.org.
[FLINK-7988][s3] fix HadoopS3FileSystemITCase leaving test directories behind in S3
This closes #4950.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b5b53c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b5b53c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b5b53c
Branch: refs/heads/master
Commit: 51b5b53c7cd7781959011ba48559c5361ac93ff9
Parents: cf8504d
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 2 19:38:48 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Nov 18 10:58:08 2017 +0100
----------------------------------------------------------------------
.../fs/s3hadoop/HadoopS3FileSystemITCase.java | 50 +++++++++++++++++++-
1 file changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/51b5b53c/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index 88f13ed..8c646f0 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -57,11 +58,58 @@ public class HadoopS3FileSystemITCase extends TestLogger {
private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+ /**
+ * Will be updated by {@link #checkCredentialsAndSetup()} if the test is not skipped.
+ */
+ private static boolean skipTest = true;
+
@BeforeClass
- public static void checkIfCredentialsArePresent() {
+ public static void checkCredentialsAndSetup() throws IOException {
+ // check whether credentials exist
Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+
+ // initialize configuration with valid credentials
+ final Configuration conf = new Configuration();
+ conf.setString("s3.access.key", ACCESS_KEY);
+ conf.setString("s3.secret.key", SECRET_KEY);
+ FileSystem.initialize(conf);
+
+ // check for uniqueness of the test directory
+ final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+ final FileSystem fs = directory.getFileSystem();
+
+ // directory must not yet exist
+ assertFalse(fs.exists(directory));
+
+ // reset configuration
+ FileSystem.initialize(new Configuration());
+
+ skipTest = false;
+ }
+
+ @AfterClass
+ public static void cleanUp() throws IOException {
+ if (!skipTest) {
+ // initialize configuration with valid credentials
+ final Configuration conf = new Configuration();
+ conf.setString("s3.access.key", ACCESS_KEY);
+ conf.setString("s3.secret.key", SECRET_KEY);
+ FileSystem.initialize(conf);
+
+ final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+ final FileSystem fs = directory.getFileSystem();
+
+ // clean up
+ fs.delete(directory, true);
+
+ // now directory must be gone
+ assertFalse(fs.exists(directory));
+
+ // reset configuration
+ FileSystem.initialize(new Configuration());
+ }
}
@Test