You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/03/17 11:28:37 UTC
[hadoop] 03/03: HADOOP-18163. hadoop-azure support for the Manifest Committer of MAPREDUCE-7341
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 8294bd5a37c0de15af576700c6cba46791eddd07
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Wed Mar 16 15:41:03 2022 +0000
HADOOP-18163. hadoop-azure support for the Manifest Committer of MAPREDUCE-7341
Follow-on patch to MAPREDUCE-7341, adding ABFS support and tests
* resilient rename
* tests for job commit through the manifest committer.
contains
- HADOOP-17976. ABFS etag extraction inconsistent between LIST and HEAD calls
- HADOOP-16204. ABFS tests to include terasort
Contributed by Steve Loughran.
Change-Id: I0a7d4043bdf19bcb00c033fc389730109b93b77f
---
hadoop-project/pom.xml | 7 +
hadoop-tools/hadoop-azure/pom.xml | 81 ++++-
.../src/config/checkstyle-suppressions.xml | 3 +
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 9 +
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 197 +++++++++---
.../fs/azurebfs/AzureBlobFileSystemStore.java | 34 +-
.../commit/AbfsManifestStoreOperations.java | 130 ++++++++
.../commit/AzureManifestCommitterFactory.java | 58 ++++
.../azurebfs/commit/ResilientCommitByRename.java | 101 ++++++
.../hadoop/fs/azurebfs/commit/package-info.java | 28 ++
.../fs/azurebfs/constants/ConfigurationKeys.java | 3 +
.../constants/FileSystemConfigurations.java | 5 +
.../hadoop/fs/azurebfs/services/AbfsClient.java | 99 +++++-
.../fs/azurebfs/services/SimpleKeyProvider.java | 10 +-
.../fs/azurebfs/AbstractAbfsIntegrationTest.java | 11 +-
.../fs/azurebfs/ITestAbfsReadWriteAndSeek.java | 13 +-
.../ITestAzureBlobFileSystemDelegationSAS.java | 5 +-
.../fs/azurebfs/ITestCustomerProvidedKey.java | 3 +-
.../fs/azurebfs/commit/AbfsCommitTestHelper.java | 49 +++
.../azurebfs/commit/AbstractAbfsClusterITest.java | 260 +++++++++++++++
.../fs/azurebfs/commit/ITestAbfsCleanupStage.java | 54 ++++
.../azurebfs/commit/ITestAbfsCommitTaskStage.java | 54 ++++
.../ITestAbfsCreateOutputDirectoriesStage.java | 54 ++++
.../ITestAbfsJobThroughManifestCommitter.java | 101 ++++++
.../commit/ITestAbfsLoadManifestsStage.java | 55 ++++
.../commit/ITestAbfsManifestCommitProtocol.java | 62 ++++
.../commit/ITestAbfsManifestStoreOperations.java | 175 ++++++++++
.../commit/ITestAbfsRenameStageFailure.java | 69 ++++
.../commit/ITestAbfsTaskManifestFileIO.java | 54 ++++
.../fs/azurebfs/commit/ITestAbfsTerasort.java | 353 +++++++++++++++++++++
.../hadoop/fs/azurebfs/commit/package-info.java | 24 ++
.../azurebfs/contract/AbfsFileSystemContract.java | 2 +-
.../hadoop-azure/src/test/resources/core-site.xml | 25 ++
33 files changed, 2106 insertions(+), 82 deletions(-)
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 193f898..80e07f0 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -386,6 +386,13 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index a9f58e3..40aeec0 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -219,6 +219,67 @@
<scope>test</scope>
</dependency>
+ <!--
+ the mapreduce-client-core module is compiled against for the
+ manifest committer support. It is not needed on the classpath
+ except when using this committer, so it only tagged as
+ "provided".
+ It is not exported as a transitive dependency of the JAR.
+ Applications which wish to to use the manifest committer
+ will need to explicity add the mapreduce JAR to their classpath.
+ This is already done by MapReduce and Spark.
+ -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!--
+ These are only added to the classpath for tests,
+ and are not exported by the test JAR as transitive
+ dependencies.
+ -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-hs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-examples</artifactId>
+ <scope>test</scope>
+ <type>jar</type>
+ </dependency>
+ <!-- artifacts needed to bring up a Mini MR Yarn cluster-->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
@@ -319,7 +380,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -350,7 +411,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -392,7 +453,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
@@ -482,7 +543,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -526,7 +587,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
@@ -544,6 +605,7 @@
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
+ <exclude>**/azurebfs/commit/*.java</exclude>
</excludes>
</configuration>
@@ -572,7 +634,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
@@ -585,6 +647,7 @@
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
+ <include>**/azurebfs/commit/*.java</include>
</includes>
</configuration>
</execution>
@@ -634,7 +697,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -664,7 +727,7 @@
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
@@ -706,7 +769,7 @@
<!-- surefire.forkNumber won't do the parameter -->
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
- <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index 070c8c1..fd2a7c2 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -48,4 +48,7 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
+ <!-- allow tests to use _ for ordering. -->
+ <suppress checks="MethodName"
+ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
</suppressions>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 9719da7..fafc303 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -260,6 +260,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
+ MinValue = 0,
+ DefaultValue = RATE_LIMIT_DEFAULT)
+ private int rateLimit;
+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = DEFAULT_FS_AZURE_USER_AGENT_PREFIX)
private String userAgentId;
@@ -726,6 +731,10 @@ public class AbfsConfiguration{
return this.enableAutoThrottling;
}
+ public int getRateLimit() {
+ return rateLimit;
+ }
+
public String getCustomUserAgentPrefix() {
return this.userAgentId;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index ae70b8d..46141e7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -27,6 +27,7 @@ import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.AccessDeniedException;
+import java.time.Duration;
import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
@@ -42,13 +43,17 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.hadoop.io.IOUtils;
+import javax.annotation.Nullable;
+
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
@@ -94,9 +99,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.RateLimitingFactory;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
@@ -143,6 +151,9 @@ public class AzureBlobFileSystem extends FileSystem
/** Maximum Active blocks per OutputStream. */
private int blockOutputActiveBlocks;
+ /** Rate limiting for operations which use it to throttle their IO. */
+ private RateLimiting rateLimiting;
+
@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
@@ -215,7 +226,7 @@ public class AzureBlobFileSystem extends FileSystem
}
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
-
+ rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}
@@ -261,7 +272,7 @@ public class AzureBlobFileSystem extends FileSystem
InputStream inputStream = abfsStore
.openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
return new FSDataInputStream(inputStream);
- } catch(AzureBlobFileSystemException ex) {
+ } catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
}
@@ -290,8 +301,13 @@ public class AzureBlobFileSystem extends FileSystem
}
@Override
- public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress) throws IOException {
+ public FSDataOutputStream create(final Path f,
+ final FsPermission permission,
+ final boolean overwrite,
+ final int bufferSize,
+ final short replication,
+ final long blockSize,
+ final Progressable progress) throws IOException {
LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}",
f,
permission,
@@ -311,7 +327,7 @@ public class AzureBlobFileSystem extends FileSystem
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);
return new FSDataOutputStream(outputStream, statistics);
- } catch(AzureBlobFileSystemException ex) {
+ } catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
return null;
}
@@ -340,8 +356,12 @@ public class AzureBlobFileSystem extends FileSystem
@Override
@SuppressWarnings("deprecation")
- public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission,
- final EnumSet<CreateFlag> flags, final int bufferSize, final short replication, final long blockSize,
+ public FSDataOutputStream createNonRecursive(final Path f,
+ final FsPermission permission,
+ final EnumSet<CreateFlag> flags,
+ final int bufferSize,
+ final short replication,
+ final long blockSize,
final Progressable progress) throws IOException {
// Check if file should be appended or overwritten. Assume that the file
@@ -365,7 +385,8 @@ public class AzureBlobFileSystem extends FileSystem
}
@Override
- public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
+ public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress)
+ throws IOException {
LOG.debug(
"AzureBlobFileSystem.append path: {} bufferSize: {}",
f.toString(),
@@ -380,7 +401,7 @@ public class AzureBlobFileSystem extends FileSystem
OutputStream outputStream = abfsStore
.openFileForWrite(qualifiedPath, statistics, false, tracingContext);
return new FSDataOutputStream(outputStream, statistics);
- } catch(AzureBlobFileSystemException ex) {
+ } catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
return null;
}
@@ -403,7 +424,7 @@ public class AzureBlobFileSystem extends FileSystem
fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
listener);
// rename under same folder;
- if(makeQualified(parentFolder).equals(qualifiedDstPath)) {
+ if (makeQualified(parentFolder).equals(qualifiedDstPath)) {
return tryGetFileStatus(qualifiedSrcPath, tracingContext) != null;
}
@@ -438,24 +459,99 @@ public class AzureBlobFileSystem extends FileSystem
qualifiedDstPath = makeQualified(adjustedDst);
- abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext);
+ abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
return true;
- } catch(AzureBlobFileSystemException ex) {
+ } catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
checkException(
- src,
- ex,
- AzureServiceErrorCode.PATH_ALREADY_EXISTS,
- AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
- AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
- AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
- AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
- AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
+ src,
+ ex,
+ AzureServiceErrorCode.PATH_ALREADY_EXISTS,
+ AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
+ AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
+ AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
+ AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
+ AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
return false;
}
}
+ /**
+ * Private method to create resilient commit support.
+ * @return a new instance
+ * @param path destination path
+ * @throws IOException problem probing store capabilities
+ * @throws UnsupportedOperationException if the store lacks this support
+ */
+ @InterfaceAudience.Private
+ public ResilientCommitByRename createResilientCommitSupport(final Path path)
+ throws IOException {
+
+ if (!hasPathCapability(path,
+ CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) {
+ throw new UnsupportedOperationException(
+ "Resilient commit support not available for " + path);
+ }
+ return new ResilientCommitByRenameImpl();
+ }
+
+ /**
+ * Resilient commit support.
+ * Provided as a nested class to avoid contaminating the
+ * FS instance with too many private methods which end up
+ * being used widely (as has happened to the S3A FS)
+ */
+ public class ResilientCommitByRenameImpl implements ResilientCommitByRename {
+
+ /**
+ * Perform the rename.
+ * This will be rate limited, as well as able to recover
+ * from rename errors if the etag was passed in.
+ * @param source path to source file
+ * @param dest destination of rename.
+ * @param sourceEtag etag of source file. may be null or empty
+ * @return the outcome of the operation
+ * @throws IOException any rename failure which was not recovered from.
+ */
+ public Pair<Boolean, Duration> commitSingleFileByRename(
+ final Path source,
+ final Path dest,
+ @Nullable final String sourceEtag) throws IOException {
+
+ LOG.debug("renameFileWithEtag source: {} dest: {} etag {}", source, dest, sourceEtag);
+ statIncrement(CALL_RENAME);
+
+ trailingPeriodCheck(dest);
+ Path qualifiedSrcPath = makeQualified(source);
+ Path qualifiedDstPath = makeQualified(dest);
+
+ TracingContext tracingContext = new TracingContext(clientCorrelationId,
+ fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
+ listener);
+
+ if (qualifiedSrcPath.equals(qualifiedDstPath)) {
+ // rename to itself is forbidden
+ throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self");
+ }
+
+ // acquire one IO permit
+ final Duration waitTime = rateLimiting.acquire(1);
+
+ try {
+ final boolean recovered = abfsStore.rename(qualifiedSrcPath,
+ qualifiedDstPath, tracingContext, sourceEtag);
+ return Pair.of(recovered, waitTime);
+ } catch (AzureBlobFileSystemException ex) {
+ LOG.debug("Rename operation failed. ", ex);
+ checkException(source, ex);
+ // never reached
+ return null;
+ }
+
+ }
+ }
+
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
LOG.debug(
@@ -533,7 +629,7 @@ public class AzureBlobFileSystem extends FileSystem
* @throws IllegalArgumentException if the path has a trailing period (.)
*/
private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
- while (!path.isRoot()){
+ while (!path.isRoot()) {
String pathToString = path.toString();
if (pathToString.length() != 0) {
if (pathToString.charAt(pathToString.length() - 1) == '.') {
@@ -541,8 +637,7 @@ public class AzureBlobFileSystem extends FileSystem
"ABFS does not allow files or directories to end with a dot.");
}
path = path.getParent();
- }
- else {
+ } else {
break;
}
}
@@ -601,10 +696,10 @@ public class AzureBlobFileSystem extends FileSystem
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
- TracingContext tracingContext = new TracingContext(clientCorrelationId,
- fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat,
- listener);
- return getFileStatus(f, tracingContext);
+ TracingContext tracingContext = new TracingContext(clientCorrelationId,
+ fileSystemId, FSOperationType.GET_FILESTATUS, tracingHeaderFormat,
+ listener);
+ return getFileStatus(f, tracingContext);
}
private FileStatus getFileStatus(final Path path,
@@ -615,7 +710,7 @@ public class AzureBlobFileSystem extends FileSystem
try {
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
- } catch(AzureBlobFileSystemException ex) {
+ } catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
}
@@ -639,7 +734,7 @@ public class AzureBlobFileSystem extends FileSystem
fileSystemId, FSOperationType.BREAK_LEASE, tracingHeaderFormat,
listener);
abfsStore.breakLease(qualifiedPath, tracingContext);
- } catch(AzureBlobFileSystemException ex) {
+ } catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
}
}
@@ -666,7 +761,6 @@ public class AzureBlobFileSystem extends FileSystem
return super.makeQualified(path);
}
-
@Override
public Path getWorkingDirectory() {
return this.workingDir;
@@ -689,8 +783,8 @@ public class AzureBlobFileSystem extends FileSystem
@Override
public Path getHomeDirectory() {
return makeQualified(new Path(
- FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
- + "/" + abfsStore.getUser()));
+ FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
+ + "/" + abfsStore.getUser()));
}
/**
@@ -714,8 +808,8 @@ public class AzureBlobFileSystem extends FileSystem
}
final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost();
- final String[] name = { blobLocationHost };
- final String[] host = { blobLocationHost };
+ final String[] name = {blobLocationHost};
+ final String[] host = {blobLocationHost};
long blockSize = file.getBlockSize();
if (blockSize <= 0) {
throw new IllegalArgumentException(
@@ -790,15 +884,14 @@ public class AzureBlobFileSystem extends FileSystem
}
});
}
- }
- finally {
+ } finally {
executorService.shutdownNow();
}
return true;
}
- /**
+ /**
* Set owner of a path (i.e. a file or a directory).
* The parameters owner and group cannot both be null.
*
@@ -828,9 +921,9 @@ public class AzureBlobFileSystem extends FileSystem
try {
abfsStore.setOwner(qualifiedPath,
- owner,
- group,
- tracingContext);
+ owner,
+ group,
+ tracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
@@ -847,7 +940,10 @@ public class AzureBlobFileSystem extends FileSystem
* @throws IllegalArgumentException If name is null or empty or if value is null
*/
@Override
- public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag)
+ public void setXAttr(final Path path,
+ final String name,
+ final byte[] value,
+ final EnumSet<XAttrSetFlag> flag)
throws IOException {
LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path);
@@ -971,7 +1067,7 @@ public class AzureBlobFileSystem extends FileSystem
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"modifyAclEntries is only supported by storage accounts with the "
- + "hierarchical namespace enabled.");
+ + "hierarchical namespace enabled.");
}
if (aclSpec == null || aclSpec.isEmpty()) {
@@ -1006,7 +1102,7 @@ public class AzureBlobFileSystem extends FileSystem
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"removeAclEntries is only supported by storage accounts with the "
- + "hierarchical namespace enabled.");
+ + "hierarchical namespace enabled.");
}
if (aclSpec == null || aclSpec.isEmpty()) {
@@ -1038,7 +1134,7 @@ public class AzureBlobFileSystem extends FileSystem
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"removeDefaultAcl is only supported by storage accounts with the "
- + "hierarchical namespace enabled.");
+ + "hierarchical namespace enabled.");
}
Path qualifiedPath = makeQualified(path);
@@ -1068,7 +1164,7 @@ public class AzureBlobFileSystem extends FileSystem
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"removeAcl is only supported by storage accounts with the "
- + "hierarchical namespace enabled.");
+ + "hierarchical namespace enabled.");
}
Path qualifiedPath = makeQualified(path);
@@ -1101,7 +1197,7 @@ public class AzureBlobFileSystem extends FileSystem
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"setAcl is only supported by storage accounts with the hierarchical "
- + "namespace enabled.");
+ + "namespace enabled.");
}
if (aclSpec == null || aclSpec.size() == 0) {
@@ -1133,7 +1229,7 @@ public class AzureBlobFileSystem extends FileSystem
if (!getIsNamespaceEnabled(tracingContext)) {
throw new UnsupportedOperationException(
"getAclStatus is only supported by storage account with the "
- + "hierarchical namespace enabled.");
+ + "hierarchical namespace enabled.");
}
Path qualifiedPath = makeQualified(path);
@@ -1243,7 +1339,7 @@ public class AzureBlobFileSystem extends FileSystem
private boolean fileSystemExists() throws IOException {
LOG.debug(
- "AzureBlobFileSystem.fileSystemExists uri: {}", uri);
+ "AzureBlobFileSystem.fileSystemExists uri: {}", uri);
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.TEST_OP, tracingHeaderFormat, listener);
@@ -1534,8 +1630,9 @@ public class AzureBlobFileSystem extends FileSystem
case CommonPathCapabilities.FS_PERMISSIONS:
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.ETAGS_AVAILABLE:
- case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
return true;
+
+ case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME:
case CommonPathCapabilities.FS_ACLS:
return getIsNamespaceEnabled(
new TracingContext(clientCorrelationId, fileSystemId,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index f4f8959..046f9f0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Listenable
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -878,7 +879,22 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
client.breakLease(getRelativePath(path), tracingContext);
}
- public void rename(final Path source, final Path destination, TracingContext tracingContext) throws
+ /**
+ * Rename a file or directory.
+ * If a source etag is passed in, the operation will attempt to recover
+ * from a missing source file by probing the destination for
+ * existence and comparing etags.
+ * @param source path to source file
+ * @param destination destination of rename.
+ * @param tracingContext trace context
+ * @param sourceEtag etag of source file. may be null or empty
+ * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
+ * @return true if recovery was needed and succeeded.
+ */
+ public boolean rename(final Path source,
+ final Path destination,
+ final TracingContext tracingContext,
+ final String sourceEtag) throws
AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
long countAggregate = 0;
@@ -898,23 +914,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
String sourceRelativePath = getRelativePath(source);
String destinationRelativePath = getRelativePath(destination);
+ // was any operation recovered from?
+ boolean recovered = false;
do {
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
- AbfsRestOperation op = client
- .renamePath(sourceRelativePath, destinationRelativePath,
- continuation, tracingContext);
+ final Pair<AbfsRestOperation, Boolean> pair =
+ client.renamePath(sourceRelativePath, destinationRelativePath,
+ continuation, tracingContext, sourceEtag);
+
+ AbfsRestOperation op = pair.getLeft();
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
-
+ // update the recovery flag.
+ recovered |= pair.getRight();
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
}
}
} while (shouldContinue);
+ return recovered;
}
public void delete(final Path path, final boolean recursive,
@@ -1932,7 +1954,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
* @param result response to process.
* @return the quote-unwrapped etag.
*/
- private static String extractEtagHeader(AbfsHttpOperation result) {
+ public static String extractEtagHeader(AbfsHttpOperation result) {
String etag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
if (etag != null) {
// strip out any wrapper "" quotes which come back, for consistency with
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
new file mode 100644
index 0000000..efba924
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
+
+/**
+ * Extension of StoreOperationsThroughFileSystem with ABFS awareness.
+ * Purely for use by jobs committing work through the manifest committer.
+ * The {@link AzureManifestCommitterFactory} will configure
+ * this as the binding to the FS.
+ *
+ * ADLS Gen2 stores support etag-recovery on renames, but not WASB
+ * stores.
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public class AbfsManifestStoreOperations extends
+ ManifestStoreOperationsThroughFileSystem {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsManifestStoreOperations.class);
+
+ /**
+ * Classname, which can be declared in jpb configurations.
+ */
+ public static final String NAME = AbfsManifestStoreOperations.class.getName();
+
+ /**
+ * Resilient rename calls; only available on an ADLS Gen2 store.
+ * Will be null after binding if the FS isn't compatible.
+ */
+ private ResilientCommitByRename resilientCommitByRename;
+
+ @Override
+ public AzureBlobFileSystem getFileSystem() {
+ return (AzureBlobFileSystem) super.getFileSystem();
+ }
+
+ /**
+ * Bind to the store.
+ *
+ * @param filesystem FS.
+ * @param path path to work under
+ * @throws IOException binding problems.
+ */
+ @Override
+ public void bindToFileSystem(FileSystem filesystem, Path path) throws IOException {
+ if (!(filesystem instanceof AzureBlobFileSystem)) {
+ throw new PathIOException(path.toString(),
+ "Not an abfs filesystem: " + filesystem.getClass());
+ }
+ super.bindToFileSystem(filesystem, path);
+ try {
+ resilientCommitByRename = getFileSystem().createResilientCommitSupport(path);
+ LOG.debug("Bonded to filesystem with resilient commits under path {}", path);
+ } catch (UnsupportedOperationException e) {
+ LOG.debug("No resilient commit support under path {}", path);
+ }
+ }
+
+ @Override
+ public boolean storePreservesEtagsThroughRenames(final Path path) {
+ return true;
+ }
+
+ /**
+ * Resilient commits available on hierarchical stores.
+ * @return true if the FS can use etags on renames.
+ */
+ @Override
+ public boolean storeSupportsResilientCommit() {
+ return resilientCommitByRename != null;
+ }
+
+ /**
+ * Commit a file through an internal ABFS operation.
+ * If resilient commit is unavailable, invokes the superclass, which
+ * will raise an UnsupportedOperationException
+ * @param entry entry to commit
+ * @return the outcome
+ * @throws IOException any failure in resilient commit.
+ * @throws UnsupportedOperationException if not available.
+ */
+ @Override
+ public CommitFileResult commitFile(final FileEntry entry) throws IOException {
+
+ if (resilientCommitByRename != null) {
+ final Pair<Boolean, Duration> result =
+ resilientCommitByRename.commitSingleFileByRename(
+ entry.getSourcePath(),
+ entry.getDestPath(),
+ entry.getEtag());
+ return CommitFileResult.fromResilientCommit(result.getLeft(),
+ result.getRight());
+ } else {
+ return super.commitFile(entry);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java
new file mode 100644
index 0000000..b760fa7
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AzureManifestCommitterFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+
+/**
+ * A Committer for the manifest committer which performs all bindings needed
+ * to work best with abfs.
+ * This includes, at a minimum, switching to the abfs-specific manifest store operations.
+ *
+ * This classname is referenced in configurations, so MUST NOT change.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AzureManifestCommitterFactory extends ManifestCommitterFactory {
+
+ /**
+ * Classname, which can be declared in job configurations.
+ */
+ public static final String NAME = ManifestCommitterFactory.class.getName();
+
+ @Override
+ public ManifestCommitter createOutputCommitter(final Path outputPath,
+ final TaskAttemptContext context) throws IOException {
+ final Configuration conf = context.getConfiguration();
+ // use ABFS Store operations
+ conf.set(OPT_STORE_OPERATIONS_CLASS,
+ AbfsManifestStoreOperations.NAME);
+ return super.createOutputCommitter(outputPath, context);
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
new file mode 100644
index 0000000..2e91392
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * API exclusively for committing files.
+ *
+ * This is only for use by (@link {@link AbfsManifestStoreOperations},
+ * and is intended to be implemented by ABFS.
+ * To ensure that there is no need to add mapreduce JARs to the
+ * classpath just to work with ABFS, this interface
+ * MUST NOT refer to anything in the
+ * {@code org.apache.hadoop.mapreduce} package.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ResilientCommitByRename extends IOStatisticsSource {
+
+ /**
+ * Rename source file to dest path *Exactly*; no subdirectory games here.
+ * if the method does not raise an exception,then
+ * the data at dest is the data which was at source.
+ *
+ * Requirements
+ *
+ * <pre>
+ * exists(FS, source) else raise FileNotFoundException
+ * source != dest else raise PathIOException
+ * not exists(FS, dest)
+ * isDir(FS, dest.getParent)
+ * </pre>
+ * <ol>
+ * <li>source != dest else raise PathIOException</li>
+ * <li>source must exist else raise FileNotFoundException</li>
+ * <li>source must exist and be a file</li>
+ * <li>dest must not exist; </li>
+ * <li>dest.getParent() must be a dir</li>
+ * <li>if sourceEtag is non-empty, it MAY be used to qualify/validate the rename.</li>
+ * </ol>
+ *
+ * The outcome of the operation is undefined if source is not a file, dest exists,
+ * dest.getParent() doesn't exist/is a file.
+ * That is: implementations SHOULD assume that the code calling this method has
+ * set up the destination directory tree and is only invoking this call on a file.
+ * Accordingly: <i>implementations MAY skip validation checks</i>
+ *
+ * Post Conditions on a successful operation:
+ * <pre>
+ * FS' where:
+ * not exists(FS', source)
+ * and exists(FS', dest)
+ * and data(FS', dest) == data (FS, source)
+ * </pre>
+ * This is exactly the same outcome as `FileSystem.rename()` when the same preconditions
+ * are met. This API call simply restricts the operation to file rename with strict
+ * conditions, (no need to be 'clever' about dest path calculation) and the ability
+ * to pass in etags, modtimes and file status values.
+ *
+ * @param source path to source file
+ * @param dest destination of rename.
+ * @param sourceEtag etag of source file. may be null or empty
+ * @return true if recovery was needed.
+ * @throws FileNotFoundException source file not found
+ * @throws PathIOException failure, including source and dest being the same path
+ * @throws IOException any other exception
+ */
+ Pair<Boolean, Duration> commitSingleFileByRename(
+ Path source,
+ Path dest,
+ @Nullable String sourceEtag) throws IOException;
+
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
new file mode 100644
index 0000000..3567377
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for manifest committer.
+ * Unless otherwise stated: classes are private.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 12beb5a..9d3b2d5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -220,6 +220,9 @@ public final class ConfigurationKeys {
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
+ /** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
+ public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";
+
public static String accountProperty(String property, String account) {
return property + "." + account;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index f58c61e..63d62a3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -133,5 +133,10 @@ public final class FileSystemConfigurations {
public static final String DATA_BLOCKS_BUFFER_DEFAULT =
DATA_BLOCKS_BUFFER_DISK;
+ /**
+ * IO rate limit. Value: {@value}
+ */
+ public static final int RATE_LIMIT_DEFAULT = 10_000;
+
private FileSystemConfigurations() {}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 08142a1..b701037 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFact
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@@ -67,6 +68,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@@ -478,8 +481,30 @@ public class AbfsClient implements Closeable {
return op;
}
- public AbfsRestOperation renamePath(String source, final String destination,
- final String continuation, TracingContext tracingContext)
+
+ /**
+ * Rename a file or directory.
+ * If a source etag is passed in, the operation will attempt to recover
+ * from a missing source file by probing the destination for
+ * existence and comparing etags.
+ * The second value in the result will be true to indicate that this
+ * took place.
+ * As rename recovery is only attempted if the source etag is non-empty,
+ * in normal rename operations rename recovery will never happen.
+ * @param source path to source file
+ * @param destination destination of rename.
+ * @param continuation continuation.
+ * @param tracingContext trace context
+ * @param sourceEtag etag of source file. may be null or empty
+ * @return pair of (the rename operation, flag indicating recovery took place)
+ * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
+ */
+ public Pair<AbfsRestOperation, Boolean> renamePath(
+ final String source,
+ final String destination,
+ final String continuation,
+ final TracingContext tracingContext,
+ final String sourceEtag)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@@ -505,9 +530,73 @@ public class AbfsClient implements Closeable {
HTTP_METHOD_PUT,
url,
requestHeaders);
- // no attempt at recovery using timestamps as it was not reliable.
- op.execute(tracingContext);
- return op;
+ try {
+ op.execute(tracingContext);
+ return Pair.of(op, false);
+ } catch (AzureBlobFileSystemException e) {
+ // If we have no HTTP response, throw the original exception.
+ if (!op.hasResult()) {
+ throw e;
+ }
+ boolean etagCheckSucceeded = renameIdempotencyCheckOp(
+ source,
+ sourceEtag, op, destination, tracingContext);
+ if (!etagCheckSucceeded) {
+ // idempotency did not return different result
+ // throw back the exception
+ throw e;
+ }
+ return Pair.of(op, true);
+ }
+ }
+
+ /**
+ * Check if the rename request failure is post a retry and if earlier rename
+ * request might have succeeded at back-end.
+ *
+ * If a source etag was passed in, and the error was 404, get the
+ * etag of any file at the destination.
+ * If it matches the source etag, then the rename is considered
+ * a success.
+ * Exceptions raised in the probe of the destination are swallowed,
+ * so that they do not interfere with the original rename failures.
+ * @param source source path
+ * @param op Rename request REST operation response with non-null HTTP response
+ * @param destination rename destination path
+ * @param sourceEtag etag of source file. may be null or empty
+ * @param tracingContext Tracks identifiers for request header
+ * @return true if the file was successfully copied
+ */
+ public boolean renameIdempotencyCheckOp(
+ final String source,
+ final String sourceEtag,
+ final AbfsRestOperation op,
+ final String destination,
+ TracingContext tracingContext) {
+ Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
+
+ if ((op.isARetriedRequest())
+ && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
+ && isNotEmpty(sourceEtag)) {
+
+ // Server has returned HTTP 404, which means rename source no longer
+ // exists. Check on destination status and if its etag matches
+ // that of the source, consider it to be a success.
+ LOG.debug("rename {} to {} failed, checking etag of destination",
+ source, destination);
+
+ try {
+ final AbfsRestOperation destStatusOp = getPathStatus(destination,
+ false, tracingContext);
+ final AbfsHttpOperation result = destStatusOp.getResult();
+
+ return result.getStatusCode() == HttpURLConnection.HTTP_OK
+ && sourceEtag.equals(extractEtagHeader(result));
+ } catch (AzureBlobFileSystemException ignored) {
+ // GetFileStatus on the destination failed, the rename did not take place
+ }
+ }
+ return false;
}
public AbfsRestOperation append(final String path, final byte[] buffer,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
index bb1ec9e..e3adc59 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java
@@ -48,9 +48,15 @@ public class SimpleKeyProvider implements KeyProvider {
// Validating the key.
validateStorageAccountKey(key);
} catch (IllegalAccessException | InvalidConfigurationValueException e) {
- throw new KeyProviderException("Failure to initialize configuration", e);
+ LOG.debug("Failure to retrieve storage account key for {}", accountName,
+ e);
+ throw new KeyProviderException("Failure to initialize configuration for "
+ + accountName
+ + " key =\"" + key + "\""
+ + ": " + e, e);
} catch(IOException ioe) {
- LOG.warn("Unable to get key from credential providers. {}", ioe);
+ LOG.warn("Unable to get key for {} from credential providers. {}",
+ accountName, ioe, ioe);
}
return key;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 56d5538..4a55075 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -271,11 +271,12 @@ public abstract class AbstractAbfsIntegrationTest extends
// The SAS tests do not have permission to create a filesystem
// so first create temporary instance of the filesystem using SharedKey
// then re-use the filesystem it creates with SAS auth instead of SharedKey.
- AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
- ContractTestUtils.assertPathExists(tempFs, "This path should exist",
- new Path("/"));
- abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
- usingFilesystemForSASTests = true;
+ try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){
+ ContractTestUtils.assertPathExists(tempFs, "This path should exist",
+ new Path("/"));
+ abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
+ usingFilesystemForSASTests = true;
+ }
}
public AzureBlobFileSystem getFileSystem() throws IOException {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
index b0e8244..5bd6eaf 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
@@ -32,7 +32,10 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
@@ -76,13 +79,19 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
new Random().nextBytes(b);
Path testPath = path(TEST_PATH);
- try (FSDataOutputStream stream = fs.create(testPath)) {
+ FSDataOutputStream stream = fs.create(testPath);
+ try {
stream.write(b);
+ } finally{
+ stream.close();
}
+ IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
final byte[] readBuffer = new byte[2 * bufferSize];
int result;
+ IOStatisticsSource statisticsSource = null;
try (FSDataInputStream inputStream = fs.open(testPath)) {
+ statisticsSource = inputStream;
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
@@ -100,6 +109,8 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
inputStream.seek(0);
result = inputStream.read(readBuffer, 0, bufferSize);
}
+ IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
+
assertNotEquals("data read in final read()", -1, result);
assertArrayEquals(readBuffer, b);
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
index ea9fba6..965e02a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
@@ -401,7 +401,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
fs.create(new Path(src)).close();
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
.renamePath(src, "/testABC" + "/abc.txt", null,
- getTestTracingContext(fs, false));
+ getTestTracingContext(fs, false), null)
+ .getLeft();
AbfsHttpOperation result = abfsHttpRestOperation.getResult();
String url = result.getMaskedUrl();
String encodedUrl = result.getMaskedEncodedUrl();
@@ -418,7 +419,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
intercept(IOException.class, "sig=XXXX",
() -> getFileSystem().getAbfsClient()
.renamePath("testABC/test.xt", "testABC/abc.txt", null,
- getTestTracingContext(getFileSystem(), false)));
+ getTestTracingContext(getFileSystem(), false), null));
}
@Test
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
index 2198a6a..0226031 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
@@ -526,7 +526,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.renamePath(testFileName, newName, null,
- getTestTracingContext(fs, false));
+ getTestTracingContext(fs, false), null)
+ .getLeft();
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
new file mode 100644
index 0000000..8160cdc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+
+/**
+ * Helper methods for committer tests on ABFS.
+ */
+final class AbfsCommitTestHelper {
+ private AbfsCommitTestHelper() {
+ }
+
+ /**
+ * Prepare the test configuration.
+ * @param contractTestBinding test binding
+ * @return an extracted and patched configuration.
+ */
+ static Configuration prepareTestConfiguration(
+ ABFSContractTestBinding contractTestBinding) {
+ final Configuration conf =
+ contractTestBinding.getRawConfiguration();
+
+ // use ABFS Store operations
+ conf.set(OPT_STORE_OPERATIONS_CLASS,
+ AbfsManifestStoreOperations.NAME);
+
+ return conf;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
new file mode 100644
index 0000000..5575205
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.DurationInfo;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
+import static org.apache.hadoop.io.IOUtils.closeStream;
+
+/**
+ * Tests which create a yarn minicluster.
+ * These are all considered scale tests; the probe for
+ * scale tests being enabled is executed before the cluster
+ * is set up to avoid wasting time on non-scale runs.
+ */
+public abstract class AbstractAbfsClusterITest extends
+ AbstractManifestCommitterTest {
+
+ public static final int NO_OF_NODEMANAGERS = 2;
+
+ private final ABFSContractTestBinding binding;
+
+
+ /**
+ * The static cluster binding with the lifecycle of this test; served
+ * through instance-level methods for sharing across methods in the
+ * suite.
+ */
+ @SuppressWarnings("StaticNonFinalField")
+ private static ClusterBinding clusterBinding;
+
+ protected AbstractAbfsClusterITest() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ protected int getTestTimeoutMillis() {
+ return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ requireScaleTestsEnabled();
+ if (getClusterBinding() == null) {
+ clusterBinding = demandCreateClusterBinding();
+ }
+ assertNotNull("cluster is not bound", getClusterBinding());
+ }
+
+ @AfterClass
+ public static void teardownClusters() throws IOException {
+ terminateCluster(clusterBinding);
+ clusterBinding = null;
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ /**
+ * This is the cluster binding which every subclass must create.
+ */
+ protected static final class ClusterBinding {
+
+ private String clusterName;
+
+ private final MiniMRYarnCluster yarn;
+
+ public ClusterBinding(
+ final String clusterName,
+ final MiniMRYarnCluster yarn) {
+ this.clusterName = clusterName;
+ this.yarn = requireNonNull(yarn);
+ }
+
+
+ /**
+ * Get the cluster FS, which will either be HDFS or the local FS.
+ * @return a filesystem.
+ * @throws IOException failure
+ */
+ public FileSystem getClusterFS() throws IOException {
+ return FileSystem.getLocal(yarn.getConfig());
+ }
+
+ public MiniMRYarnCluster getYarn() {
+ return yarn;
+ }
+
+ public Configuration getConf() {
+ return getYarn().getConfig();
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void terminate() {
+ closeStream(getYarn());
+ }
+ }
+
+ /**
+ * Create the cluster binding.
+ * The configuration will be patched by propagating down options
+ * from the maven build (S3Guard binding etc) and turning off unwanted
+ * YARN features.
+ *
+ * If an HDFS cluster is requested,
+ * the HDFS and YARN clusters will share the same configuration, so
+ * the HDFS cluster binding is implicitly propagated to YARN.
+ * If one is not requested, the local filesystem is used as the cluster FS.
+ * @param conf configuration to start with.
+ * @return the cluster binding.
+ * @throws IOException failure.
+ */
+ protected static ClusterBinding createCluster(
+ final JobConf conf) throws IOException {
+ try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) {
+ conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+ // create a unique cluster name based on the current time in millis.
+ String timestamp = LocalDateTime.now().format(
+ DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
+ String clusterName = "yarn-" + timestamp;
+ MiniMRYarnCluster yarnCluster =
+ new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ return new ClusterBinding(clusterName, yarnCluster);
+ }
+ }
+
+ /**
+ * Terminate the cluster if it is not null.
+ * @param cluster the cluster
+ */
+ protected static void terminateCluster(ClusterBinding cluster) {
+ if (cluster != null) {
+ cluster.terminate();
+ }
+ }
+
+ /**
+ * Get the cluster binding for this subclass.
+ * @return the cluster binding
+ */
+ protected ClusterBinding getClusterBinding() {
+ return clusterBinding;
+ }
+
+ protected MiniMRYarnCluster getYarn() {
+ return getClusterBinding().getYarn();
+ }
+
+
+ /**
+ * We stage work into a temporary directory rather than directly under
+ * the user's home directory, as that is often rejected by CI test
+ * runners.
+ */
+ @Rule
+ public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
+
+
+ /**
+ * binding on demand rather than in a BeforeClass static method.
+ * Subclasses can override this to change the binding options.
+ * @return the cluster binding
+ */
+ protected ClusterBinding demandCreateClusterBinding() throws Exception {
+ return createCluster(new JobConf());
+ }
+
+ /**
+ * Create a job configuration.
+ * This creates a new job conf from the yarn
+ * cluster configuration then calls
+ * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
+ * @return the new job configuration.
+ * @throws IOException failure
+ */
+ protected JobConf newJobConf() throws IOException {
+ JobConf jobConf = new JobConf(getYarn().getConfig());
+ jobConf.addResource(getConfiguration());
+ applyCustomConfigOptions(jobConf);
+ return jobConf;
+ }
+
+ /**
+ * Patch the (job) configuration for this committer.
+ * @param jobConf configuration to patch
+ * @return a configuration which will run this configuration.
+ */
+ protected Configuration patchConfigurationForCommitter(
+ final Configuration jobConf) {
+ enableManifestCommitter(jobConf);
+ return jobConf;
+ }
+
+ /**
+ * Override point to let implementations tune the MR Job conf.
+ * @param jobConf configuration
+ */
+ protected void applyCustomConfigOptions(JobConf jobConf) throws IOException {
+
+ }
+
+
+ /**
+ * Assume that scale tests are enabled.
+ */
+ protected void requireScaleTestsEnabled() {
+ assumeScaleTestsEnabled(getConfiguration());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java
new file mode 100644
index 0000000..a597c35
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCleanupStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCleanupStage;
+
+/**
+ * Cleanup logic on ABFS.
+ */
+public class ITestAbfsCleanupStage extends TestCleanupStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsCleanupStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java
new file mode 100644
index 0000000..a0aaec8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCommitTaskStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCommitTaskStage;
+
+/**
+ * ABFS storage test of task committer.
+ */
+public class ITestAbfsCommitTaskStage extends TestCommitTaskStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsCommitTaskStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java
new file mode 100644
index 0000000..6621b80
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsCreateOutputDirectoriesStage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestCreateOutputDirectoriesStage;
+
+/**
+ * ABFS storage test of directory creation.
+ */
+public class ITestAbfsCreateOutputDirectoriesStage extends TestCreateOutputDirectoriesStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsCreateOutputDirectoriesStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java
new file mode 100644
index 0000000..4e4c4f5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsJobThroughManifestCommitter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.FixMethodOrder;
+import org.junit.runners.MethodSorters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestJobThroughManifestCommitter;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+
+/**
+ * Test the Manifest committer stages against ABFS.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ITestAbfsJobThroughManifestCommitter
+ extends TestJobThroughManifestCommitter {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsJobThroughManifestCommitter() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return enableManifestCommitter(prepareTestConfiguration(binding));
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ @Override
+ protected boolean shouldDeleteTestRootAtEndOfTestRun() {
+ return true;
+ }
+
+ /**
+ * Add read of manifest and validate of output's etags.
+ * @param attemptId attempt ID
+ * @param files files which were created.
+ * @param manifest manifest
+ * @throws IOException failure
+ */
+ @Override
+ protected void validateTaskAttemptManifest(String attemptId,
+ List<Path> files,
+ TaskManifest manifest) throws IOException {
+ super.validateTaskAttemptManifest(attemptId, files, manifest);
+ final List<FileEntry> commit = manifest.getFilesToCommit();
+ final ManifestStoreOperations operations = getStoreOperations();
+ for (FileEntry entry : commit) {
+ Assertions.assertThat(entry.getEtag())
+ .describedAs("Etag of %s", entry)
+ .isNotEmpty();
+ final FileStatus sourceStatus = operations.getFileStatus(entry.getSourcePath());
+ final String etag = ManifestCommitterSupport.getEtag(sourceStatus);
+ Assertions.assertThat(etag)
+ .describedAs("Etag of %s", sourceStatus)
+ .isEqualTo(entry.getEtag());
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java
new file mode 100644
index 0000000..acd693e3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsLoadManifestsStage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestLoadManifestsStage;
+
+/**
+ * ABFS storage test of saving and loading a large number
+ * of manifests.
+ */
+public class ITestAbfsLoadManifestsStage extends TestLoadManifestsStage {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsLoadManifestsStage() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java
new file mode 100644
index 0000000..aac06f9
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestCommitProtocol.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestManifestCommitProtocol;
+
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+
+/**
+ * Test the Manifest protocol against ABFS.
+ */
+public class ITestAbfsManifestCommitProtocol extends
+ TestManifestCommitProtocol {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsManifestCommitProtocol() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return enableManifestCommitter(prepareTestConfiguration(binding));
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+
+ @Override
+ protected String suitename() {
+ return "ITestAbfsManifestCommitProtocol";
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java
new file mode 100644
index 0000000..922782d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestStoreOperations.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.nio.charset.StandardCharsets;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test {@link AbfsManifestStoreOperations}.
+ * As this looks at etag handling through FS operations, it's actually testing how etags work
+ * in ABFS (preservation across renames) and in the client (are they consistent
+ * in LIST and HEAD calls).
+ *
+ * Skipped when tested against wasb-compatible stores.
+ */
+public class ITestAbfsManifestStoreOperations extends AbstractManifestCommitterTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestAbfsManifestStoreOperations.class);
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsManifestStoreOperations() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+
+ // skip tests on non-HNS stores
+ assumeTrue("Resilient rename not available",
+ getFileSystem().hasPathCapability(getContract().getTestPath(),
+ ETAGS_PRESERVED_IN_RENAME));
+
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return enableManifestCommitter(prepareTestConfiguration(binding));
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ /**
+ * basic consistency across operations, as well as being non-empty.
+ */
+ @Test
+ public void testEtagConsistencyAcrossListAndHead() throws Throwable {
+ describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
+ final Path path = methodPath();
+ final FileSystem fs = getFileSystem();
+ ContractTestUtils.touch(fs, path);
+ final ManifestStoreOperations operations = createManifestStoreOperations();
+ Assertions.assertThat(operations)
+ .describedAs("Store operations class loaded via Configuration")
+ .isInstanceOf(AbfsManifestStoreOperations.class);
+
+ final FileStatus st = operations.getFileStatus(path);
+ final String etag = operations.getEtag(st);
+ Assertions.assertThat(etag)
+ .describedAs("Etag of %s", st)
+ .isNotBlank();
+ LOG.info("etag of empty file is \"{}\"", etag);
+
+ final FileStatus[] statuses = fs.listStatus(path);
+ Assertions.assertThat(statuses)
+ .describedAs("List(%s)", path)
+ .hasSize(1);
+ final FileStatus lsStatus = statuses[0];
+ Assertions.assertThat(operations.getEtag(lsStatus))
+ .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
+ .isEqualTo(etag);
+ }
+
+ @Test
+ public void testEtagsOfDifferentDataDifferent() throws Throwable {
+ describe("Verify that two different blocks of data written have different tags");
+
+ final Path path = methodPath();
+ final FileSystem fs = getFileSystem();
+ Path src = new Path(path, "src");
+
+ ContractTestUtils.createFile(fs, src, true,
+ "data1234".getBytes(StandardCharsets.UTF_8));
+ final ManifestStoreOperations operations = createManifestStoreOperations();
+ final FileStatus srcStatus = operations.getFileStatus(src);
+ final String srcTag = operations.getEtag(srcStatus);
+ LOG.info("etag of file 1 is \"{}\"", srcTag);
+
+ // now overwrite with data of same length
+ // (ensure that path or length aren't used exclusively as tag)
+ ContractTestUtils.createFile(fs, src, true,
+ "1234data".getBytes(StandardCharsets.UTF_8));
+
+ // validate
+ final String tag2 = operations.getEtag(operations.getFileStatus(src));
+ LOG.info("etag of file 2 is \"{}\"", tag2);
+
+ Assertions.assertThat(tag2)
+ .describedAs("etag of updated file")
+ .isNotEqualTo(srcTag);
+ }
+
+ @Test
+ public void testEtagConsistencyAcrossRename() throws Throwable {
+ describe("Verify that when a file is renamed, the etag remains unchanged");
+ final Path path = methodPath();
+ final FileSystem fs = getFileSystem();
+ Path src = new Path(path, "src");
+ Path dest = new Path(path, "dest");
+
+ ContractTestUtils.createFile(fs, src, true,
+ "sample data".getBytes(StandardCharsets.UTF_8));
+ final ManifestStoreOperations operations = createManifestStoreOperations();
+ final FileStatus srcStatus = operations.getFileStatus(src);
+ final String srcTag = operations.getEtag(srcStatus);
+ LOG.info("etag of short file is \"{}\"", srcTag);
+
+ Assertions.assertThat(srcTag)
+ .describedAs("Etag of %s", srcStatus)
+ .isNotBlank();
+
+ // rename
+ operations.commitFile(new FileEntry(src, dest, 0, srcTag));
+
+ // validate
+ FileStatus destStatus = operations.getFileStatus(dest);
+ final String destTag = operations.getEtag(destStatus);
+ Assertions.assertThat(destTag)
+ .describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus)
+ .isEqualTo(srcTag);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java
new file mode 100644
index 0000000..5547d08
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsRenameStageFailure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestRenameStageFailure;
+
+/**
+ * Rename failure logic on ABFS.
+ * This will go through the resilient rename operation.
+ */
+public class ITestAbfsRenameStageFailure extends TestRenameStageFailure {
+
+ /**
+ * How many files to create.
+ */
+ private static final int FILES_TO_CREATE = 20;
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsRenameStageFailure() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+ @Override
+ protected boolean requireRenameResilience() {
+ return true;
+ }
+
+ @Override
+ protected int filesToCreate() {
+ return FILES_TO_CREATE;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java
new file mode 100644
index 0000000..d2fe9de
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTaskManifestFileIO.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestTaskManifestFileIO;
+
+/**
+ * Test Reading/writing manifest file through ABFS.
+ */
+public class ITestAbfsTaskManifestFileIO extends TestTaskManifestFileIO {
+
+ private final ABFSContractTestBinding binding;
+
+ public ITestAbfsTaskManifestFileIO() throws Exception {
+ binding = new ABFSContractTestBinding();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ binding.setup();
+ super.setup();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new AbfsFileSystemContract(conf, binding.isSecureMode());
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java
new file mode 100644
index 0000000..4b21b83
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import org.junit.Assume;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
+import org.apache.hadoop.examples.terasort.TeraValidate;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.functional.RemoteIterators;
+
+import static java.util.Optional.empty;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile;
+
+/**
+ * Runs Terasort against ABFS using the manifest committer.
+ * The tests run in sequence, so each operation is isolated.
+ * Scale test only (it is big and slow)
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@SuppressWarnings({"StaticNonFinalField", "OptionalUsedAsFieldOrParameterType"})
+public class ITestAbfsTerasort extends AbstractAbfsClusterITest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestAbfsTerasort.class);
+
+ public static final int EXPECTED_PARTITION_COUNT = 10;
+
+ public static final int PARTITION_SAMPLE_SIZE = 1000;
+
+ public static final int ROW_COUNT = 1000;
+
+ /**
+ * This has to be common across all test methods.
+ */
+ private static final Path TERASORT_PATH = new Path("/ITestAbfsTerasort");
+
+ /**
+ * Duration tracker created in the first of the test cases and closed
+ * in {@link #test_140_teracomplete()}.
+ */
+ private static Optional<DurationInfo> terasortDuration = empty();
+
+ /**
+ * Tracker of which stages are completed and how long they took.
+ */
+ private static final Map<String, DurationInfo> COMPLETED_STAGES = new HashMap<>();
+
+ /**
+ * FileSystem statistics are collected from the _SUCCESS markers.
+ */
+ protected static final IOStatisticsSnapshot JOB_IOSTATS =
+ snapshotIOStatistics();
+
+ /** Base path for all the terasort input and output paths. */
+ private Path terasortPath;
+
+ /** Input (teragen) path. */
+ private Path sortInput;
+
+ /** Path where sorted data goes. */
+ private Path sortOutput;
+
+ /** Path for validated job's output. */
+ private Path sortValidate;
+
+ public ITestAbfsTerasort() throws Exception {
+ }
+
+
+ @Override
+ public void setup() throws Exception {
+ // superclass calls requireScaleTestsEnabled();
+ super.setup();
+ prepareToTerasort();
+ }
+
+ /**
+ * Set up the job conf with the options for terasort chosen by the scale
+ * options.
+ * @param conf configuration
+ */
+ @Override
+ protected void applyCustomConfigOptions(JobConf conf) {
+ // small sample size for faster runs
+ conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
+ getSampleSizeForEachPartition());
+ conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
+ getExpectedPartitionCount());
+ conf.setBoolean(
+ TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
+ false);
+ }
+
+ private int getExpectedPartitionCount() {
+ return EXPECTED_PARTITION_COUNT;
+ }
+
+ private int getSampleSizeForEachPartition() {
+ return PARTITION_SAMPLE_SIZE;
+ }
+
+ protected int getRowCount() {
+ return ROW_COUNT;
+ }
+
+ /**
+ * Set up the terasort by initializing paths variables
+ * The paths used must be unique across parameterized runs but
+ * common across all test cases in a single parameterized run.
+ */
+ private void prepareToTerasort() {
+ terasortPath = getFileSystem().makeQualified(TERASORT_PATH);
+ sortInput = new Path(terasortPath, "sortin");
+ sortOutput = new Path(terasortPath, "sortout");
+ sortValidate = new Path(terasortPath, "validate");
+ }
+
+ /**
+ * Declare that a stage has completed.
+ * @param stage stage name/key in the map
+ * @param d duration.
+ */
+ private static void completedStage(final String stage,
+ final DurationInfo d) {
+ COMPLETED_STAGES.put(stage, d);
+ }
+
+ /**
+ * Declare a stage which is required for this test case.
+ * @param stage stage name
+ */
+ private static void requireStage(final String stage) {
+ Assume.assumeTrue(
+ "Required stage was not completed: " + stage,
+ COMPLETED_STAGES.get(stage) != null);
+ }
+
+ /**
+ * Execute a single stage in the terasort.
+ * Updates the completed stages map with the stage duration -if successful.
+ * @param stage Stage name for the stages map.
+ * @param jobConf job conf
+ * @param dest destination directory -the _SUCCESS file will be expected here.
+ * @param tool tool to run.
+ * @param args args for the tool.
+ * @param minimumFileCount minimum number of files to have been created
+ * @throws Exception any failure
+ */
+ private void executeStage(
+ final String stage,
+ final JobConf jobConf,
+ final Path dest,
+ final Tool tool,
+ final String[] args,
+ final int minimumFileCount) throws Exception {
+ int result;
+
+ // the duration info is created outside a try-with-resources
+ // clause as it is used later.
+ DurationInfo d = new DurationInfo(LOG, stage);
+ try {
+ result = ToolRunner.run(jobConf, tool, args);
+ } finally {
+ d.close();
+ }
+ dumpOutputTree(dest);
+ assertEquals(stage
+ + "(" + StringUtils.join(", ", args) + ")"
+ + " failed", 0, result);
+ final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest,
+ minimumFileCount, "");
+ JOB_IOSTATS.aggregate(successFile.getIOStatistics());
+
+ completedStage(stage, d);
+ }
+
+ /**
+ * Set up terasort by cleaning out the destination, and note the initial
+ * time before any of the jobs are executed.
+ *
+ * This is executed first <i>for each parameterized run</i>.
+ * It is where all variables which need to be reset for each run need
+ * to be reset.
+ */
+ @Test
+ public void test_100_terasort_setup() throws Throwable {
+ describe("Setting up for a terasort");
+
+ getFileSystem().delete(terasortPath, true);
+ terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort"));
+ }
+
+ @Test
+ public void test_110_teragen() throws Throwable {
+ describe("Teragen to %s", sortInput);
+ getFileSystem().delete(sortInput, true);
+
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ executeStage("teragen",
+ jobConf,
+ sortInput,
+ new TeraGen(),
+ new String[]{Integer.toString(getRowCount()), sortInput.toString()},
+ 1);
+ }
+
+
+ @Test
+ public void test_120_terasort() throws Throwable {
+ describe("Terasort from %s to %s", sortInput, sortOutput);
+ requireStage("teragen");
+ getFileSystem().delete(sortOutput, true);
+
+ loadSuccessFile(getFileSystem(), sortInput);
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ executeStage("terasort",
+ jobConf,
+ sortOutput,
+ new TeraSort(),
+ new String[]{sortInput.toString(), sortOutput.toString()},
+ 1);
+ }
+
+ @Test
+ public void test_130_teravalidate() throws Throwable {
+ describe("TeraValidate from %s to %s", sortOutput, sortValidate);
+ requireStage("terasort");
+ getFileSystem().delete(sortValidate, true);
+ loadSuccessFile(getFileSystem(), sortOutput);
+ JobConf jobConf = newJobConf();
+ patchConfigurationForCommitter(jobConf);
+ executeStage("teravalidate",
+ jobConf,
+ sortValidate,
+ new TeraValidate(),
+ new String[]{sortOutput.toString(), sortValidate.toString()},
+ 1);
+ }
+
+ /**
+ * Print the results, and save to the base dir as a CSV file.
+ * Why there? Makes it easy to list and compare.
+ */
+ @Test
+ public void test_140_teracomplete() throws Throwable {
+ terasortDuration.ifPresent(d -> {
+ d.close();
+ completedStage("overall", d);
+ });
+
+ // IO Statistics
+ IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, JOB_IOSTATS);
+
+ // and the summary
+ final StringBuilder results = new StringBuilder();
+ results.append("\"Operation\"\t\"Duration\"\n");
+
+ // this is how you dynamically create a function in a method
+ // for use afterwards.
+ // Works because there's no IOEs being raised in this sequence.
+ Consumer<String> stage = (s) -> {
+ DurationInfo duration = COMPLETED_STAGES.get(s);
+ results.append(String.format("\"%s\"\t\"%s\"\n",
+ s,
+ duration == null ? "" : duration));
+ };
+
+ stage.accept("teragen");
+ stage.accept("terasort");
+ stage.accept("teravalidate");
+ stage.accept("overall");
+ String text = results.toString();
+ File resultsFile = File.createTempFile("results", ".csv");
+ FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
+ LOG.info("Results are in {}\n{}", resultsFile, text);
+ }
+
+ /**
+ * Reset the duration so if two committer tests are run sequentially.
+ * Without this the total execution time is reported as from the start of
+ * the first test suite to the end of the second.
+ */
+ @Test
+ public void test_150_teracleanup() throws Throwable {
+ terasortDuration = Optional.empty();
+ }
+
+ @Test
+ public void test_200_directory_deletion() throws Throwable {
+ getFileSystem().delete(terasortPath, true);
+ }
+
+ /**
+ * Dump the files under a path -but not fail if the path is not present.,
+ * @param path path to dump
+ * @throws Exception any failure.
+ */
+ protected void dumpOutputTree(Path path) throws Exception {
+ LOG.info("Files under output directory {}", path);
+ try {
+ RemoteIterators.foreach(getFileSystem().listFiles(path, true),
+ (status) -> LOG.info("{}", status));
+ } catch (FileNotFoundException e) {
+ LOG.info("Output directory {} not found", path);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
new file mode 100644
index 0000000..3d49d62
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit and integration tests for the manifest committer.
+ * JSON job reports will be saved to
+ * {@code target/reports}
+ */
+package org.apache.hadoop.fs.azurebfs.commit;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
index 62bcca1..1319ea4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java
@@ -34,7 +34,7 @@ public class AbfsFileSystemContract extends AbstractBondedFSContract {
public static final String CONTRACT_XML = "abfs.xml";
private final boolean isSecure;
- protected AbfsFileSystemContract(final Configuration conf, boolean secure) {
+ public AbfsFileSystemContract(final Configuration conf, boolean secure) {
super(conf);
//insert the base features
addConfResource(CONTRACT_XML);
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
new file mode 100644
index 0000000..7d2d11c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/resources/core-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <include xmlns="http://www.w3.org/2001/XInclude" href="azure-test.xml">
+ <fallback/>
+ </include>
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org