You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/07/13 12:59:54 UTC

[hadoop] branch branch-3.3 updated: HDFS-13934. Multipart uploaders to be created through FileSystem/FileContext.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new a51d72f  HDFS-13934. Multipart uploaders to be created through FileSystem/FileContext.
a51d72f is described below

commit a51d72f0c63a8f356826619a8c1c8de0b372a9a6
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Mon Jul 13 13:30:02 2020 +0100

    HDFS-13934. Multipart uploaders to be created through FileSystem/FileContext.
    
    Contributed by Steve Loughran.
    
    Change-Id: Iebd34140c1a0aa71f44a3f4d0fee85f6bdf123a3
---
 .../org/apache/hadoop/fs/AbstractFileSystem.java   |  30 ++
 .../apache/hadoop/fs/CommonPathCapabilities.java   |   8 +
 .../java/org/apache/hadoop/fs/FileContext.java     |  27 ++
 .../main/java/org/apache/hadoop/fs/FileSystem.java |  32 +-
 .../org/apache/hadoop/fs/FilterFileSystem.java     |  13 +-
 .../main/java/org/apache/hadoop/fs/FilterFs.java   |   6 +
 .../org/apache/hadoop/fs/InternalOperations.java   |  36 +-
 .../org/apache/hadoop/fs/MultipartUploader.java    | 123 ++----
 .../apache/hadoop/fs/MultipartUploaderBuilder.java |  83 ++++
 .../apache/hadoop/fs/MultipartUploaderFactory.java |  76 ----
 .../hadoop/fs/impl/AbstractMultipartUploader.java  | 142 +++++++
 .../fs/{ => impl}/FileSystemMultipartUploader.java | 158 ++++++--
 .../impl/FileSystemMultipartUploaderBuilder.java   |  90 +++++
 .../org/apache/hadoop/fs/impl/FutureIOSupport.java |  28 +-
 .../fs/impl/MultipartUploaderBuilderImpl.java      | 215 +++++++++++
 .../org.apache.hadoop.fs.MultipartUploaderFactory  |  16 -
 .../site/markdown/filesystem/multipartuploader.md  | 137 +++++--
 .../org/apache/hadoop/fs/TestFilterFileSystem.java |  23 ++
 .../org/apache/hadoop/fs/TestHarFileSystem.java    |   3 +
 .../AbstractContractMultipartUploaderTest.java     | 348 +++++++++++------
 .../TestLocalFSContractMultipartUploader.java      |  61 ---
 .../apache/hadoop/hdfs/DistributedFileSystem.java  |   8 +
 .../hadoop/hdfs/client/DfsPathCapabilities.java    |   1 +
 .../apache/hadoop/hdfs/web/WebHdfsFileSystem.java  |   8 +
 .../org.apache.hadoop.fs.MultipartUploaderFactory  |  16 -
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  59 ++-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |   9 +-
 .../apache/hadoop/fs/s3a/S3AMultipartUploader.java | 216 -----------
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |  24 +-
 .../apache/hadoop/fs/s3a/WriteOperationHelper.java |  22 +-
 .../org/apache/hadoop/fs/s3a/WriteOperations.java  | 335 ++++++++++++++++
 .../hadoop/fs/s3a/impl/ContextAccessors.java       |   8 +
 .../hadoop/fs/s3a/impl/S3AMultipartUploader.java   | 420 +++++++++++++++++++++
 .../fs/s3a/impl/S3AMultipartUploaderBuilder.java   |  66 ++++
 .../apache/hadoop/fs/s3a/impl/StoreContext.java    |  37 +-
 .../hadoop/fs/s3a/impl/StoreContextBuilder.java    | 189 ++++++++++
 .../statistics/S3AMultipartUploaderStatistics.java |  37 +-
 .../S3AMultipartUploaderStatisticsImpl.java        |  98 +++++
 .../hadoop/fs/s3a/s3guard/BulkOperationState.java  |   4 +
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java      |  20 +-
 .../org.apache.hadoop.fs.MultipartUploader         |   2 +-
 .../org.apache.hadoop.fs.MultipartUploaderFactory  |  15 -
 .../s3a/ITestS3AContractMultipartUploader.java     |  52 +--
 .../fs/s3a/impl/TestPartialDeleteFailures.java     |  46 ++-
 .../TestS3AMultipartUploaderSupport.java           |  48 ++-
 45 files changed, 2582 insertions(+), 813 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
index 1df68b6..67dbc2e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
@@ -1383,4 +1383,34 @@ public abstract class AbstractFileSystem implements PathCapabilities {
       return false;
     }
   }
+
+  /**
+   * Create a multipart uploader.
+   * @param basePath file path under which all files are uploaded
+   * @return a MultipartUploaderBuilder object to build the uploader
+   * @throws IOException if some early checks cause IO failures.
+   * @throws UnsupportedOperationException if support is checked early.
+   */
+  @InterfaceStability.Unstable
+  public MultipartUploaderBuilder createMultipartUploader(Path basePath)
+      throws IOException {
+    methodNotSupported();
+    return null;
+  }
+
+  /**
+   * Helper method that throws an {@link UnsupportedOperationException} for the
+   * current {@link FileSystem} method being called.
+   */
+  protected final void methodNotSupported() {
+    // The order of the stacktrace elements is (from top to bottom):
+    //   - java.lang.Thread.getStackTrace
+    //   - org.apache.hadoop.fs.FileSystem.methodNotSupported
+    //   - <the FileSystem method>
+    // therefore, to find out the current method name, we use the element at
+    // index 2.
+    String name = Thread.currentThread().getStackTrace()[2].getMethodName();
+    throw new UnsupportedOperationException(getClass().getCanonicalName() +
+        " does not support method " + name);
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
index fb46ef8..539b3e2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
@@ -131,4 +131,12 @@ public final class CommonPathCapabilities {
   @InterfaceStability.Unstable
   public static final String FS_EXPERIMENTAL_BATCH_LISTING =
       "fs.capability.batch.listing";
+
+  /**
+   * Does the store support multipart uploading?
+   * Value: {@value}.
+   */
+  public static final String FS_MULTIPART_UPLOADER =
+      "fs.capability.multipart.uploader";
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index 364777f..e376efc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -2957,4 +2957,31 @@ public class FileContext implements PathCapabilities {
         (fs, p) -> fs.hasPathCapability(p, capability));
   }
 
+  /**
+   * Return a set of server default configuration values based on path.
+   * @param path path to fetch server defaults
+   * @return server default configuration values for path
+   * @throws IOException an I/O error occurred
+   */
+  public FsServerDefaults getServerDefaults(final Path path)
+      throws IOException {
+    return FsLinkResolution.resolve(this,
+        fixRelativePart(path),
+        (fs, p) -> fs.getServerDefaults(p));
+  }
+
+  /**
+   * Create a multipart uploader.
+   * @param basePath file path under which all files are uploaded
+   * @return a MultipartUploaderBuilder object to build the uploader
+   * @throws IOException if some early checks cause IO failures.
+   * @throws UnsupportedOperationException if support is checked early.
+   */
+  @InterfaceStability.Unstable
+  public MultipartUploaderBuilder createMultipartUploader(Path basePath)
+      throws IOException {
+    return FsLinkResolution.resolve(this,
+        fixRelativePart(basePath),
+        (fs, p) -> fs.createMultipartUploader(p));
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 358db74..86434c6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -132,22 +132,35 @@ import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapa
  * New methods may be marked as Unstable or Evolving for their initial release,
  * as a warning that they are new and may change based on the
  * experience of use in applications.
+ * <p></p>
  * <b>Important note for developers</b>
- *
- * If you're making changes here to the public API or protected methods,
+ * <p></p>
+ * If you are making changes here to the public API or protected methods,
  * you must review the following subclasses and make sure that
  * they are filtering/passing through new methods as appropriate.
+ * <p></p>
  *
- * {@link FilterFileSystem}: methods are passed through.
+ * {@link FilterFileSystem}: methods are passed through. If not,
+ * then {@code TestFilterFileSystem.MustNotImplement} must be
+ * updated with the unsupported interface.
+ * Furthermore, if the new API's support is probed for via
+ * {@link #hasPathCapability(Path, String)} then
+ * {@link FilterFileSystem#hasPathCapability(Path, String)}
+ * must return false, always.
+ * <p></p>
  * {@link ChecksumFileSystem}: checksums are created and
  * verified.
+ * <p></p>
  * {@code TestHarFileSystem} will need its {@code MustNotImplement}
  * interface updated.
+ * <p></p>
  *
  * There are some external places your changes will break things.
  * Do co-ordinate changes here.
+ * <p></p>
  *
  * HBase: HBoss
+ * <p></p>
  * Hive: HiveShim23
  * {@code shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java}
  *
@@ -4643,4 +4656,17 @@ public abstract class FileSystem extends Configured
 
   }
 
+  /**
+   * Create a multipart uploader.
+   * @param basePath file path under which all files are uploaded
+   * @return a MultipartUploaderBuilder object to build the uploader
+   * @throws IOException if some early checks cause IO failures.
+   * @throws UnsupportedOperationException if support is checked early.
+   */
+  @InterfaceStability.Unstable
+  public MultipartUploaderBuilder createMultipartUploader(Path basePath)
+      throws IOException {
+    methodNotSupported();
+    return null;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index cf12ea3..4241097 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Progressable;
 
+import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
+
 /****************************************************************
  * A <code>FilterFileSystem</code> contains
  * some other file system, which it uses as
@@ -728,7 +730,16 @@ public class FilterFileSystem extends FileSystem {
   @Override
   public boolean hasPathCapability(final Path path, final String capability)
       throws IOException {
-    return fs.hasPathCapability(path, capability);
+    switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
+    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
+    case CommonPathCapabilities.FS_EXPERIMENTAL_BATCH_LISTING:
+      // operations known to be unsupported, irrespective of what
+      // the wrapped class implements.
+      return false;
+    default:
+      // the feature is not implemented.
+      return fs.hasPathCapability(path, capability);
+    }
   }
 
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java
index e197506..27e75d8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java
@@ -448,4 +448,10 @@ public abstract class FilterFs extends AbstractFileSystem {
       throws IOException {
     return myFs.hasPathCapability(path, capability);
   }
+
+  @Override
+  public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
+      throws IOException {
+    return myFs.createMultipartUploader(basePath);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InternalOperations.java
similarity index 51%
copy from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
copy to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InternalOperations.java
index e9959c1..2db33ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InternalOperations.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,26 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemMultipartUploader;
-import org.apache.hadoop.fs.MultipartUploader;
-import org.apache.hadoop.fs.MultipartUploaderFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
 
 /**
- * Support for HDFS multipart uploads, built on
- * {@link FileSystem#concat(Path, Path[])}.
+ *   This method allows access to Package-scoped operations from classes
+ *   in org.apache.hadoop.fs.impl and other file system implementations
+ *   in the hadoop modules.
+ *   This is absolutely not for used by any other application or library.
  */
-public class DFSMultipartUploaderFactory extends MultipartUploaderFactory {
-  protected MultipartUploader createMultipartUploader(FileSystem fs,
-      Configuration conf) {
-    if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) {
-      return new FileSystemMultipartUploader(fs);
-    }
-    return null;
+@InterfaceAudience.Private
+public class InternalOperations {
+
+  @SuppressWarnings("deprecation") // rename w/ OVERWRITE
+  public void rename(FileSystem fs, final Path src, final Path dst,
+      final Options.Rename...options) throws IOException {
+    fs.rename(src, dst, options);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
index 7ed987e..89848dc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,45 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.fs;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 /**
  * MultipartUploader is an interface for copying files multipart and across
- * multiple nodes. Users should:
- * <ol>
- *   <li>Initialize an upload.</li>
- *   <li>Upload parts in any order.</li>
- *   <li>Complete the upload in order to have it materialize in the destination
- *   FS.</li>
- * </ol>
+ * multiple nodes.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.Public
 @InterfaceStability.Unstable
-public abstract class MultipartUploader implements Closeable {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(MultipartUploader.class);
+public interface MultipartUploader extends Closeable {
 
-  /**
-   * Perform any cleanup.
-   * The upload is not required to support any operations after this.
-   * @throws IOException problems on close.
-   */
-  @Override
-  public void close() throws IOException {
-  }
 
   /**
    * Initialize a multipart upload.
@@ -61,94 +42,64 @@ public abstract class MultipartUploader implements Closeable {
    * @return unique identifier associating part uploads.
    * @throws IOException IO failure
    */
-  public abstract UploadHandle initialize(Path filePath) throws IOException;
+  CompletableFuture<UploadHandle> startUpload(Path filePath)
+      throws IOException;
 
   /**
    * Put part as part of a multipart upload.
    * It is possible to have parts uploaded in any order (or in parallel).
-   * @param filePath Target path for upload (same as {@link #initialize(Path)}).
+   * @param uploadId Identifier from {@link #startUpload(Path)}.
+   * @param partNumber Index of the part relative to others.
+   * @param filePath Target path for upload (as {@link #startUpload(Path)}).
    * @param inputStream Data for this part. Implementations MUST close this
    * stream after reading in the data.
-   * @param partNumber Index of the part relative to others.
-   * @param uploadId Identifier from {@link #initialize(Path)}.
    * @param lengthInBytes Target length to read from the stream.
    * @return unique PartHandle identifier for the uploaded part.
    * @throws IOException IO failure
    */
-  public abstract PartHandle putPart(Path filePath, InputStream inputStream,
-      int partNumber, UploadHandle uploadId, long lengthInBytes)
+  CompletableFuture<PartHandle> putPart(
+      UploadHandle uploadId,
+      int partNumber,
+      Path filePath,
+      InputStream inputStream,
+      long lengthInBytes)
       throws IOException;
 
   /**
    * Complete a multipart upload.
-   * @param filePath Target path for upload (same as {@link #initialize(Path)}.
+   * @param uploadId Identifier from {@link #startUpload(Path)}.
+   * @param filePath Target path for upload (as {@link #startUpload(Path)}.
    * @param handles non-empty map of part number to part handle.
-   *          from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
-   * @param multipartUploadId Identifier from {@link #initialize(Path)}.
+   *          from {@link #putPart(UploadHandle, int, Path, InputStream, long)}.
    * @return unique PathHandle identifier for the uploaded file.
    * @throws IOException IO failure
    */
-  public abstract PathHandle complete(Path filePath,
-      Map<Integer, PartHandle> handles,
-      UploadHandle multipartUploadId)
+  CompletableFuture<PathHandle> complete(
+      UploadHandle uploadId,
+      Path filePath,
+      Map<Integer, PartHandle> handles)
       throws IOException;
 
   /**
    * Aborts a multipart upload.
-   * @param filePath Target path for upload (same as {@link #initialize(Path)}.
-   * @param multipartUploadId Identifier from {@link #initialize(Path)}.
+   * @param uploadId Identifier from {@link #startUpload(Path)}.
+   * @param filePath Target path for upload (same as {@link #startUpload(Path)}.
    * @throws IOException IO failure
+   * @return a future; the operation will have completed
    */
-  public abstract void abort(Path filePath, UploadHandle multipartUploadId)
+  CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)
       throws IOException;
 
   /**
-   * Utility method to validate uploadIDs.
-   * @param uploadId Upload ID
-   * @throws IllegalArgumentException invalid ID
-   */
-  protected void checkUploadId(byte[] uploadId)
-      throws IllegalArgumentException {
-    checkArgument(uploadId != null, "null uploadId");
-    checkArgument(uploadId.length > 0,
-        "Empty UploadId is not valid");
-  }
-
-  /**
-   * Utility method to validate partHandles.
-   * @param partHandles handles
-   * @throws IllegalArgumentException if the parts are invalid
+   * Best effort attempt to aborts multipart uploads under a path.
+   * Not all implementations support this, and those which do may
+   * be vulnerable to eventually consistent listings of current uploads
+   * -some may be missed.
+   * @param path path to abort uploads under.
+   * @return a future to the number of entries aborted;
+   * -1 if aborting is unsupported
+   * @throws IOException IO failure
    */
-  protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
-    checkArgument(!partHandles.isEmpty(),
-        "Empty upload");
-    partHandles.keySet()
-        .stream()
-        .forEach(key ->
-            checkArgument(key > 0,
-                "Invalid part handle index %s", key));
-  }
+  CompletableFuture<Integer> abortUploadsUnderPath(Path path) throws IOException;
 
-  /**
-   * Check all the arguments to the
-   * {@link #putPart(Path, InputStream, int, UploadHandle, long)} operation.
-   * @param filePath Target path for upload (same as {@link #initialize(Path)}).
-   * @param inputStream Data for this part. Implementations MUST close this
-   * stream after reading in the data.
-   * @param partNumber Index of the part relative to others.
-   * @param uploadId Identifier from {@link #initialize(Path)}.
-   * @param lengthInBytes Target length to read from the stream.
-   * @throws IllegalArgumentException invalid argument
-   */
-  protected void checkPutArguments(Path filePath,
-      InputStream inputStream,
-      int partNumber,
-      UploadHandle uploadId,
-      long lengthInBytes) throws IllegalArgumentException {
-    checkArgument(filePath != null, "null filePath");
-    checkArgument(inputStream != null, "null inputStream");
-    checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
-    checkArgument(uploadId != null, "null uploadId");
-    checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
-  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderBuilder.java
new file mode 100644
index 0000000..381bfaa
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * Builder interface for Multipart readers.
+ * @param <S>
+ * @param <B>
+ */
+public interface MultipartUploaderBuilder<S extends MultipartUploader, B extends MultipartUploaderBuilder<S, B>>
+    extends FSBuilder<S, B> {
+
+  /**
+   * Set permission for the file.
+   */
+  B permission(@Nonnull FsPermission perm);
+
+  /**
+   * Set the size of the buffer to be used.
+   */
+  B bufferSize(int bufSize);
+
+  /**
+   * Set replication factor.
+   */
+  B replication(short replica);
+
+  /**
+   * Set block size.
+   */
+  B blockSize(long blkSize);
+
+  /**
+   * Create an FSDataOutputStream at the specified path.
+   */
+  B create();
+
+  /**
+   * Set to true to overwrite the existing file.
+   * Set it to false, an exception will be thrown when calling {@link #build()}
+   * if the file exists.
+   */
+  B overwrite(boolean overwrite);
+
+  /**
+   * Append to an existing file (optional operation).
+   */
+  B append();
+
+  /**
+   * Set checksum opt.
+   */
+  B checksumOpt(@Nonnull Options.ChecksumOpt chksumOpt);
+
+  /**
+   * Create the FSDataOutputStream to write on the file system.
+   *
+   * @throws IllegalArgumentException if the parameters are not valid.
+   * @throws IOException on errors when file system creates or appends the file.
+   */
+  S build() throws IllegalArgumentException, IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
deleted file mode 100644
index e35b6bf..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.ServiceLoader;
-
-/**
- * {@link ServiceLoader}-driven uploader API for storage services supporting
- * multipart uploads.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public abstract class MultipartUploaderFactory {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(MultipartUploaderFactory.class);
-
-  /**
-   * Multipart Uploaders listed as services.
-   */
-  private static ServiceLoader<MultipartUploaderFactory> serviceLoader =
-      ServiceLoader.load(MultipartUploaderFactory.class,
-          MultipartUploaderFactory.class.getClassLoader());
-
-  // Iterate through the serviceLoader to avoid lazy loading.
-  // Lazy loading would require synchronization in concurrent use cases.
-  static {
-    Iterator<MultipartUploaderFactory> iterServices = serviceLoader.iterator();
-    while (iterServices.hasNext()) {
-      iterServices.next();
-    }
-  }
-
-  /**
-   * Get the multipart loader for a specific filesystem.
-   * @param fs filesystem
-   * @param conf configuration
-   * @return an uploader, or null if one was found.
-   * @throws IOException failure during the creation process.
-   */
-  public static MultipartUploader get(FileSystem fs, Configuration conf)
-      throws IOException {
-    MultipartUploader mpu = null;
-    for (MultipartUploaderFactory factory : serviceLoader) {
-      mpu = factory.createMultipartUploader(fs, conf);
-      if (mpu != null) {
-        break;
-      }
-    }
-    return mpu;
-  }
-
-  protected abstract MultipartUploader createMultipartUploader(FileSystem fs,
-      Configuration conf) throws IOException;
-}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java
new file mode 100644
index 0000000..d8b7fe0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UploadHandle;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Standard base class for Multipart Uploaders.
+ */
+public abstract class AbstractMultipartUploader implements MultipartUploader {
+
+  /**
+   * Base path of upload.
+   */
+  private final Path basePath;
+
+  /**
+   * Instantiate.
+   * @param basePath base path
+   */
+  protected AbstractMultipartUploader(final Path basePath) {
+    this.basePath = Objects.requireNonNull(basePath, "null path");
+  }
+
+  /**
+   * Perform any cleanup.
+   * The upload is not required to support any operations after this.
+   * @throws IOException problems on close.
+   */
+  @Override
+  public void close() throws IOException {
+  }
+
+  protected Path getBasePath() {
+    return basePath;
+  }
+
+  /**
+   * Validate a path.
+   * @param path path to check.
+   */
+  protected void checkPath(Path path) {
+    Objects.requireNonNull(path, "null path");
+    Preconditions.checkArgument(path.toString().startsWith(basePath.toString()),
+        "Path %s is not under %s", path, basePath);
+  }
+
+  /**
+   * Utility method to validate uploadIDs.
+   * @param uploadId Upload ID
+   * @throws IllegalArgumentException invalid ID
+   */
+  protected void checkUploadId(byte[] uploadId)
+      throws IllegalArgumentException {
+    checkArgument(uploadId != null, "null uploadId");
+    checkArgument(uploadId.length > 0,
+        "Empty UploadId is not valid");
+  }
+
+  /**
+   * Utility method to validate partHandles.
+   * @param partHandles handles
+   * @throws IllegalArgumentException if the parts are invalid
+   */
+  protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
+    checkArgument(!partHandles.isEmpty(),
+        "Empty upload");
+    partHandles.keySet()
+        .stream()
+        .forEach(key ->
+            checkArgument(key > 0,
+                "Invalid part handle index %s", key));
+  }
+
+  /**
+   * Check all the arguments to the
+   * {@link MultipartUploader#putPart(UploadHandle, int, Path, InputStream, long)}
+   * operation.
+   * @param filePath Target path for upload (as {@link #startUpload(Path)}).
+   * @param inputStream Data for this part. Implementations MUST close this
+   * stream after reading in the data.
+   * @param partNumber Index of the part relative to others.
+   * @param uploadId Identifier from {@link #startUpload(Path)}.
+   * @param lengthInBytes Target length to read from the stream.
+   * @throws IllegalArgumentException invalid argument
+   */
+  protected void checkPutArguments(Path filePath,
+      InputStream inputStream,
+      int partNumber,
+      UploadHandle uploadId,
+      long lengthInBytes) throws IllegalArgumentException {
+    checkPath(filePath);
+    checkArgument(inputStream != null, "null inputStream");
+    checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
+    checkArgument(uploadId != null, "null uploadId");
+    checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
+  }
+
+  /**
+   * {@inheritDoc}.
+   * @param path path to abort uploads under.
+   * @return a future to -1.
+   * @throws IOException
+   */
+  public CompletableFuture<Integer> abortUploadsUnderPath(Path path)
+      throws IOException {
+    checkPath(path);
+    CompletableFuture<Integer> f = new CompletableFuture<>();
+    f.complete(-1);
+    return f;
+  }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
similarity index 52%
rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
index b77c244..ae0def0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java
@@ -14,24 +14,42 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.fs;
+
+package org.apache.hadoop.fs.impl;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BBPartHandle;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InternalOperations;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathHandle;
+import org.apache.hadoop.fs.UploadHandle;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 import static org.apache.hadoop.fs.Path.mergePaths;
@@ -50,40 +68,82 @@ import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class FileSystemMultipartUploader extends MultipartUploader {
+public class FileSystemMultipartUploader extends AbstractMultipartUploader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      FileSystemMultipartUploader.class);
 
   private final FileSystem fs;
 
-  public FileSystemMultipartUploader(FileSystem fs) {
+  private final FileSystemMultipartUploaderBuilder builder;
+
+  private final FsPermission permission;
+
+  private final long blockSize;
+
+  private final Options.ChecksumOpt checksumOpt;
+
+  public FileSystemMultipartUploader(
+      final FileSystemMultipartUploaderBuilder builder,
+      FileSystem fs) {
+    super(builder.getPath());
+    this.builder = builder;
     this.fs = fs;
+    blockSize = builder.getBlockSize();
+    checksumOpt = builder.getChecksumOpt();
+    permission = builder.getPermission();
   }
 
   @Override
-  public UploadHandle initialize(Path filePath) throws IOException {
-    Path collectorPath = createCollectorPath(filePath);
-    fs.mkdirs(collectorPath, FsPermission.getDirDefault());
+  public CompletableFuture<UploadHandle> startUpload(Path filePath)
+      throws IOException {
+    checkPath(filePath);
+    return FutureIOSupport.eval(() -> {
+      Path collectorPath = createCollectorPath(filePath);
+      fs.mkdirs(collectorPath, FsPermission.getDirDefault());
 
-    ByteBuffer byteBuffer = ByteBuffer.wrap(
-        collectorPath.toString().getBytes(Charsets.UTF_8));
-    return BBUploadHandle.from(byteBuffer);
+      ByteBuffer byteBuffer = ByteBuffer.wrap(
+          collectorPath.toString().getBytes(Charsets.UTF_8));
+      return BBUploadHandle.from(byteBuffer);
+    });
   }
 
   @Override
-  public PartHandle putPart(Path filePath, InputStream inputStream,
-      int partNumber, UploadHandle uploadId, long lengthInBytes)
+  public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
+      int partNumber, Path filePath,
+      InputStream inputStream,
+      long lengthInBytes)
       throws IOException {
     checkPutArguments(filePath, inputStream, partNumber, uploadId,
         lengthInBytes);
+    return FutureIOSupport.eval(() -> innerPutPart(filePath,
+        inputStream, partNumber, uploadId, lengthInBytes));
+  }
+
+  private PartHandle innerPutPart(Path filePath,
+      InputStream inputStream,
+      int partNumber,
+      UploadHandle uploadId,
+      long lengthInBytes)
+      throws IOException {
     byte[] uploadIdByteArray = uploadId.toByteArray();
     checkUploadId(uploadIdByteArray);
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
     Path partPath =
         mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR),
-            new Path(Integer.toString(partNumber) + ".part")));
-    try(FSDataOutputStream fsDataOutputStream =
-            fs.createFile(partPath).build()) {
-      IOUtils.copy(inputStream, fsDataOutputStream, 4096);
+            new Path(partNumber + ".part")));
+    final FSDataOutputStreamBuilder fileBuilder = fs.createFile(partPath);
+    if (checksumOpt != null) {
+      fileBuilder.checksumOpt(checksumOpt);
+    }
+    if (permission != null) {
+      fileBuilder.permission(permission);
+    }
+    try (FSDataOutputStream fsDataOutputStream =
+             fileBuilder.blockSize(blockSize).build()) {
+      IOUtils.copy(inputStream, fsDataOutputStream,
+          this.builder.getBufferSize());
     } finally {
       cleanupWithLogger(LOG, inputStream);
     }
@@ -106,16 +166,36 @@ public class FileSystemMultipartUploader extends MultipartUploader {
 
   private long totalPartsLen(List<Path> partHandles) throws IOException {
     long totalLen = 0;
-    for (Path p: partHandles) {
+    for (Path p : partHandles) {
       totalLen += fs.getFileStatus(p).getLen();
     }
     return totalLen;
   }
 
   @Override
-  @SuppressWarnings("deprecation") // rename w/ OVERWRITE
-  public PathHandle complete(Path filePath, Map<Integer, PartHandle> handleMap,
-      UploadHandle multipartUploadId) throws IOException {
+  public CompletableFuture<PathHandle> complete(
+      UploadHandle uploadId,
+      Path filePath,
+      Map<Integer, PartHandle> handleMap) throws IOException {
+
+    checkPath(filePath);
+    return FutureIOSupport.eval(() ->
+        innerComplete(uploadId, filePath, handleMap));
+  }
+
+  /**
+   * The upload complete operation.
+   * @param multipartUploadId the ID of the upload
+   * @param filePath path
+   * @param handleMap map of handles
+   * @return the path handle
+   * @throws IOException failure
+   */
+  private PathHandle innerComplete(
+      UploadHandle multipartUploadId, Path filePath,
+      Map<Integer, PartHandle> handleMap) throws IOException {
+
+    checkPath(filePath);
 
     checkUploadId(multipartUploadId.toByteArray());
 
@@ -133,6 +213,13 @@ public class FileSystemMultipartUploader extends MultipartUploader {
         })
         .collect(Collectors.toList());
 
+    int count = partHandles.size();
+    // built up to identify duplicates -if the size of this set is
+    // below that of the number of parts, then there's a duplicate entry.
+    Set<Path> values = new HashSet<>(count);
+    values.addAll(partHandles);
+    Preconditions.checkArgument(values.size() == count,
+        "Duplicate PartHandles");
     byte[] uploadIdByteArray = multipartUploadId.toByteArray();
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
@@ -146,35 +233,30 @@ public class FileSystemMultipartUploader extends MultipartUploader {
       fs.create(filePathInsideCollector).close();
       fs.concat(filePathInsideCollector,
           partHandles.toArray(new Path[handles.size()]));
-      fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
+      new InternalOperations()
+          .rename(fs, filePathInsideCollector, filePath,
+              Options.Rename.OVERWRITE);
     }
     fs.delete(collectorPath, true);
     return getPathHandle(filePath);
   }
 
   @Override
-  public void abort(Path filePath, UploadHandle uploadId) throws IOException {
+  public CompletableFuture<Void> abort(UploadHandle uploadId,
+      Path filePath)
+      throws IOException {
+    checkPath(filePath);
     byte[] uploadIdByteArray = uploadId.toByteArray();
     checkUploadId(uploadIdByteArray);
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
 
-    // force a check for a file existing; raises FNFE if not found
-    fs.getFileStatus(collectorPath);
-    fs.delete(collectorPath, true);
-  }
-
-  /**
-   * Factory for creating MultipartUploaderFactory objects for file://
-   * filesystems.
-   */
-  public static class Factory extends MultipartUploaderFactory {
-    protected MultipartUploader createMultipartUploader(FileSystem fs,
-        Configuration conf) {
-      if (fs.getScheme().equals("file")) {
-        return new FileSystemMultipartUploader(fs);
-      }
+    return FutureIOSupport.eval(() -> {
+      // force a check for a file existing; raises FNFE if not found
+      fs.getFileStatus(collectorPath);
+      fs.delete(collectorPath, true);
       return null;
-    }
+    });
   }
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploaderBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploaderBuilder.java
new file mode 100644
index 0000000..7c4d995
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploaderBuilder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * Builder for {@link FileSystemMultipartUploader}.
+ */
+public class FileSystemMultipartUploaderBuilder extends
+    MultipartUploaderBuilderImpl<FileSystemMultipartUploader, FileSystemMultipartUploaderBuilder> {
+
+  public FileSystemMultipartUploaderBuilder(
+      @Nonnull final FileSystem fileSystem,
+      @Nonnull final Path path) {
+    super(fileSystem, path);
+  }
+
+  @Override
+  public FileSystemMultipartUploaderBuilder getThisBuilder() {
+    return this;
+  }
+
+  @Override
+  public FileSystemMultipartUploader build()
+      throws IllegalArgumentException, IOException {
+    return new FileSystemMultipartUploader(this, getFS());
+  }
+
+  @Override
+  public FileSystem getFS() {
+    return super.getFS();
+  }
+
+  @Override
+  public FsPermission getPermission() {
+    return super.getPermission();
+  }
+
+  @Override
+  public int getBufferSize() {
+    return super.getBufferSize();
+  }
+
+  @Override
+  public short getReplication() {
+    return super.getReplication();
+  }
+
+  @Override
+  public EnumSet<CreateFlag> getFlags() {
+    return super.getFlags();
+  }
+
+  @Override
+  public Options.ChecksumOpt getChecksumOpt() {
+    return super.getChecksumOpt();
+  }
+
+  @Override
+  protected long getBlockSize() {
+    return super.getBlockSize();
+  }
+
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
index 26856e5..f13d701 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.impl;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -52,7 +53,7 @@ public final class FutureIOSupport {
    * @throws IOException if something went wrong
    * @throws RuntimeException any nested RTE thrown
    */
-  public static <T> T awaitFuture(final Future<T> future)
+  public static <T> T  awaitFuture(final Future<T> future)
       throws InterruptedIOException, IOException, RuntimeException {
     try {
       return future.get();
@@ -224,4 +225,29 @@ public final class FutureIOSupport {
       }
     }
   }
+
+  /**
+   * Evaluate a CallableRaisingIOE in the current thread,
+   * converting IOEs to RTEs and propagating.
+   * @param callable callable to invoke
+   * @param <T> Return type.
+   * @return the evaluated result.
+   * @throws UnsupportedOperationException fail fast if unsupported
+   * @throws IllegalArgumentException invalid argument
+   */
+  public static <T> CompletableFuture<T> eval(
+      FunctionsRaisingIOE.CallableRaisingIOE<T> callable) {
+    CompletableFuture<T> result = new CompletableFuture<>();
+    try {
+      result.complete(callable.apply());
+    } catch (UnsupportedOperationException | IllegalArgumentException tx) {
+      // fail fast here
+      throw tx;
+    } catch (Throwable tx) {
+      // fail lazily here to ensure callers expect all File IO operations to
+      // surface later
+      result.completeExceptionally(tx);
+    }
+    return result;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/MultipartUploaderBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/MultipartUploaderBuilderImpl.java
new file mode 100644
index 0000000..6c3336e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/MultipartUploaderBuilderImpl.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderBuilder;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+
+/**
+ * Builder for {@link MultipartUploader} implementations.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class MultipartUploaderBuilderImpl
+    <S extends MultipartUploader, B extends MultipartUploaderBuilder<S, B>>
+    extends AbstractFSBuilderImpl<S, B>
+    implements MultipartUploaderBuilder<S, B> {
+
+  private final FileSystem fs;
+
+  private FsPermission permission;
+
+  private int bufferSize;
+
+  private short replication;
+
+  private long blockSize;
+
+  private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
+
+  private ChecksumOpt checksumOpt;
+
+  /**
+   * Return the concrete implementation of the builder instance.
+   */
+  public abstract B getThisBuilder();
+
+  /**
+   * Construct from a {@link FileContext}.
+   *
+   * @param fc FileContext
+   * @param p path.
+   * @throws IOException failure
+   */
+  protected MultipartUploaderBuilderImpl(@Nonnull FileContext fc,
+      @Nonnull Path p) throws IOException {
+    super(checkNotNull(p));
+    checkNotNull(fc);
+    this.fs = null;
+
+    FsServerDefaults defaults = fc.getServerDefaults(p);
+    bufferSize = defaults.getFileBufferSize();
+    replication = defaults.getReplication();
+    blockSize = defaults.getBlockSize();
+  }
+
+  /**
+   * Constructor.
+   */
+  protected MultipartUploaderBuilderImpl(@Nonnull FileSystem fileSystem,
+      @Nonnull Path p) {
+    super(fileSystem.makeQualified(checkNotNull(p)));
+    checkNotNull(fileSystem);
+    fs = fileSystem;
+    bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+        IO_FILE_BUFFER_SIZE_DEFAULT);
+    replication = fs.getDefaultReplication(p);
+    blockSize = fs.getDefaultBlockSize(p);
+  }
+
+  protected FileSystem getFS() {
+    checkNotNull(fs);
+    return fs;
+  }
+
+  protected FsPermission getPermission() {
+    if (permission == null) {
+      permission = FsPermission.getFileDefault();
+    }
+    return permission;
+  }
+
+  /**
+   * Set permission for the file.
+   */
+  @Override
+  public B permission(@Nonnull final FsPermission perm) {
+    checkNotNull(perm);
+    permission = perm;
+    return getThisBuilder();
+  }
+
+  protected int getBufferSize() {
+    return bufferSize;
+  }
+
+  /**
+   * Set the size of the buffer to be used.
+   */
+  @Override
+  public B bufferSize(int bufSize) {
+    bufferSize = bufSize;
+    return getThisBuilder();
+  }
+
+  protected short getReplication() {
+    return replication;
+  }
+
+  /**
+   * Set replication factor.
+   */
+  @Override
+  public B replication(short replica) {
+    replication = replica;
+    return getThisBuilder();
+  }
+
+  protected long getBlockSize() {
+    return blockSize;
+  }
+
+  /**
+   * Set block size.
+   */
+  @Override
+  public B blockSize(long blkSize) {
+    blockSize = blkSize;
+    return getThisBuilder();
+  }
+
+  protected EnumSet<CreateFlag> getFlags() {
+    return flags;
+  }
+
+  /**
+   * Create an FSDataOutputStream at the specified path.
+   */
+  @Override
+  public B create() {
+    flags.add(CreateFlag.CREATE);
+    return getThisBuilder();
+  }
+
+  /**
+   * Set to true to overwrite the existing file.
+   * Set it to false, an exception will be thrown when calling {@link #build()}
+   * if the file exists.
+   */
+  @Override
+  public B overwrite(boolean overwrite) {
+    if (overwrite) {
+      flags.add(CreateFlag.OVERWRITE);
+    } else {
+      flags.remove(CreateFlag.OVERWRITE);
+    }
+    return getThisBuilder();
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   */
+  @Override
+  public B append() {
+    flags.add(CreateFlag.APPEND);
+    return getThisBuilder();
+  }
+
+  protected ChecksumOpt getChecksumOpt() {
+    return checksumOpt;
+  }
+
+  /**
+   * Set checksum opt.
+   */
+  @Override
+  public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
+    checkNotNull(chksumOpt);
+    checksumOpt = chksumOpt;
+    return getThisBuilder();
+  }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
deleted file mode 100644
index f0054fe..0000000
--- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.hadoop.fs.FileSystemMultipartUploader$Factory
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md
index 629c0c4..906c592 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md
@@ -14,14 +14,14 @@
 
 
 <!--  ============================================================= -->
-<!--  CLASS: MultipartUploader -->
+<!--  INTERFACE: MultipartUploader -->
 <!--  ============================================================= -->
 
-# class `org.apache.hadoop.fs.MultipartUploader`
+# interface `org.apache.hadoop.fs.MultipartUploader`
 
 <!-- MACRO{toc|fromDepth=1|toDepth=2} -->
 
-The abstract `MultipartUploader` class is the original class to upload a file
+The `MultipartUploader` can upload a file
 using multiple parts to Hadoop-supported filesystems. The benefits of a
 multipart upload is that the file can be uploaded from multiple clients or
 processes in parallel and the results will not be visible to other clients until
@@ -30,13 +30,12 @@ the `complete` function is called.
 When implemented by an object store, uploaded data may incur storage charges,
 even before it is visible in the filesystems. Users of this API must be diligent
 and always perform best-effort attempts to complete or abort the upload.
+The `abortUploadsUnderPath(path)` operation can help here.
 
 ## Invariants
 
-All the requirements of a valid MultipartUploader are considered implicit
+All the requirements of a valid `MultipartUploader` are considered implicit
 econditions and postconditions:
-all operations on a valid MultipartUploader MUST result in a new
-MultipartUploader that is also valid.
 
 The operations of a single multipart upload may take place across different
 instance of a multipart uploader, across different processes and hosts.
@@ -45,16 +44,28 @@ It is therefore a requirement that:
 1. All state needed to upload a part, complete an upload or abort an upload
 must be contained within or retrievable from an upload handle.
 
-1. If an upload handle is marshalled to another process, then, if the
-receiving process has the correct permissions, it may participate in the
-upload, by uploading one or more parts, by completing an upload, and/or by
-aborting the upload.
+1. That handle MUST be serializable; it MUST be deserializable to different
+processes executing the exact same version of Hadoop.
+
+1. different hosts/processes MAY upload different parts, sequentially or
+simultaneously. The order in which they are uploaded to the filesystem
+MUST NOT constrain the order in which the data is stored in the final file.
+
+1. An upload MAY be completed on a different instance than any which uploaded
+parts.
+
+1. The output of an upload MUST NOT be visible at the final destination
+until the upload may complete.
+
+1. It is not an error if a single multipart uploader instance initiates
+or completes multiple uploads files to the same destination sequentially,
+irrespective of whether or not the store supports concurrent uploads.
 
 ## Concurrency
 
 Multiple processes may upload parts of a multipart upload simultaneously.
 
-If a call is made to `initialize(path)` to a destination where an active
+If a call is made to `startUpload(path)` to a destination where an active
 upload is in progress, implementations MUST perform one of the two operations.
 
 * Reject the call as a duplicate.
@@ -70,9 +81,17 @@ the in-progress upload, if it has not completed, must not be included in
 the final file, in whole or in part. Implementations SHOULD raise an error
 in the `putPart()` operation.
 
+# Serialization Compatibility
+
+Users MUST NOT expect that serialized PathHandle versions are compatible across
+* different multipart uploader implementations.
+* different versions of the same implementation.
+
+That is: all clients MUST use the exact same version of Hadoop.
+
 ## Model
 
-A File System which supports Multipart Uploads extends the existing model
+A FileSystem/FileContext which supports Multipart Uploads extends the existing model
 `(Directories, Files, Symlinks)` to one of `(Directories, Files, Symlinks, Uploads)`
 `Uploads` of type `Map[UploadHandle -> Map[PartHandle -> UploadPart]`.
 
@@ -112,11 +131,40 @@ However, if Part Handles are rapidly recycled, there is a risk that the nominall
 idempotent operation `abort(FS, uploadHandle)` could unintentionally cancel a
 successor operation which used the same Upload Handle.
 
+## Asynchronous API
+
+All operations return `CompletableFuture<>` types which must be
+subsequently evaluated to get their return values.
+
+1. The execution of the operation MAY be a blocking operation in on the call thread.
+1. If not, it SHALL be executed in a separate thread and MUST complete by the time the
+future evaluation returns.
+1. Some/All preconditions MAY be evaluated at the time of initial invocation,
+1. All those which are not evaluated at that time, MUST Be evaluated during the execution
+of the future.
+
+
+What this means is that when an implementation interacts with a fast file system/store all preconditions
+including the existence of files MAY be evaluated early, whereas and implementation interacting with a
+remote object store whose probes are slow MAY verify preconditions in the asynchronous phase -especially
+those which interact with the remote store.
+
+Java CompletableFutures do not work well with checked exceptions. The Hadoop codease is still evolving the
+details of the exception handling here, as more use is made of the asynchronous APIs. Assume that any
+precondition failure which declares that an `IOException` MUST be raised may have that operation wrapped in a
+`RuntimeException` of some form if evaluated in the future; this also holds for any other `IOException`
+raised during the operations.
+
+### `close()`
+
+Applications MUST call `close()` after using an uploader; this is so it may release other
+objects, update statistics, etc.
+
 ## State Changing Operations
 
-### `UploadHandle initialize(Path path)`
+### `CompletableFuture<UploadHandle> startUpload(Path)`
 
-Initialized a Multipart Upload, returning an upload handle for use in
+Starts a Multipart Upload, ultimately returning an `UploadHandle` for use in
 subsequent operations.
 
 #### Preconditions
@@ -128,17 +176,15 @@ if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOE
 ```
 
 If a filesystem does not support concurrent uploads to a destination,
-then the following precondition is added
+then the following precondition is added:
 
 ```python
 if path in values(FS.Uploads) raise PathExistsException, IOException
-
 ```
 
-
 #### Postconditions
 
-The outcome of this operation is that the filesystem state is updated with a new
+Once the initialization operation completes, the filesystem state is updated with a new
 active upload, with a new handle, this handle being returned to the caller.
 
 ```python
@@ -147,9 +193,10 @@ FS' = FS where FS'.Uploads(handle') == {}
 result = handle'
 ```
 
-### `PartHandle putPart(Path path, InputStream inputStream, int partNumber, UploadHandle uploadHandle, long lengthInBytes)`
+### `CompletableFuture<PartHandle> putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)`
 
-Upload a part for the multipart upload.
+Upload a part for the specific multipart upload, eventually being returned an opaque part handle
+represting this part of the specified upload.
 
 #### Preconditions
 
@@ -170,10 +217,12 @@ FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
 result = partHandle'
 ```
 
-The data is stored in the filesystem, pending completion.
+The data is stored in the filesystem, pending completion. It MUST NOT be visible at the destination path.
+It MAY be visible in a temporary path somewhere in the file system;
+This is implementation-specific and MUST NOT be relied upon.
 
 
-### `PathHandle complete(Path path, Map<Integer, PartHandle> parts, UploadHandle multipartUploadId)`
+### ` CompletableFuture<PathHandle> complete(UploadHandle uploadId, Path filePath, Map<Integer, PartHandle> handles)`
 
 Complete the multipart upload.
 
@@ -188,11 +237,23 @@ uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
 FS.Uploads(uploadHandle).path == path
 if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
 parts.size() > 0
+forall k in keys(parts): k > 0
+forall k in keys(parts):
+  not exists(k2 in keys(parts)) where (parts[k] == parts[k2])
 ```
 
-If there are handles in the MPU which aren't included in the map, then the omitted
-parts will not be a part of the resulting file. It is up to the implementation
-of the MultipartUploader to make sure the leftover parts are cleaned up.
+All keys MUST be greater than zero, and there MUST not be any duplicate
+references to the same parthandle.
+These validations MAY be performed at any point during the operation.
+After a failure, there is no guarantee that a `complete()` call for this
+upload with a valid map of paths will complete.
+Callers SHOULD invoke `abort()` after any such failure to ensure cleanup.
+
+if `putPart()` operations For this `uploadHandle` were performed But whose
+`PathHandle` Handles were not included in this request -the omitted
+parts SHALL NOT be a part of the resulting file.
+
+The MultipartUploader MUST clean up any such outstanding entries.
 
 In the case of backing stores that support directories (local filesystem, HDFS,
 etc), if, at the point of completion, there is now a directory at the
@@ -206,14 +267,14 @@ exists(FS', path') and result = PathHandle(path')
 FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
 ```
 
-The PathHandle is returned by the complete operation so subsequent operations
+The `PathHandle` is returned by the complete operation so subsequent operations
 will be able to identify that the data has not changed in the meantime.
 
 The order of parts in the uploaded by file is that of the natural order of
-parts: part 1 is ahead of part 2, etc.
+parts in the map: part 1 is ahead of part 2, etc.
 
 
-### `void abort(Path path, UploadHandle multipartUploadId)`
+### `CompletableFuture<Void> abort(UploadHandle uploadId, Path filePath)`
 
 Abort a multipart upload. The handle becomes invalid and not subject to reuse.
 
@@ -233,3 +294,23 @@ FS' = FS where not uploadHandle in keys(FS'.uploads)
 ```
 A subsequent call to `abort()` with the same handle will fail, unless
 the handle has been recycled.
+
+### `CompletableFuture<Integer> abortUploadsUnderPath(Path path)`
+
+Perform a best-effort cleanup of all uploads under a path.
+
+returns a future which resolves to.
+
+    -1 if unsuppported
+    >= 0 if supported
+
+Because it is best effort a strict postcondition isn't possible.
+The ideal postcondition is all uploads under the path are aborted,
+and the count is the number of uploads aborted:
+
+```python
+FS'.uploads forall upload in FS.uploads:
+    not isDescendant(FS, path, upload.path)
+return len(forall upload in FS.uploads:
+               isDescendant(FS, path, upload.path))
+```
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index f0057a6..6cd4506 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -137,6 +137,12 @@ public class TestFilterFileSystem {
     void setQuota(Path f, long namespaceQuota, long storagespaceQuota);
     void setQuotaByStorageType(Path f, StorageType type, long quota);
     StorageStatistics getStorageStatistics();
+
+    /*
+    Not passed through as the inner implementation will miss features
+    of the filter such as checksums.
+     */
+    MultipartUploaderBuilder createMultipartUploader(Path basePath);
   }
 
   @Test
@@ -278,6 +284,23 @@ public class TestFilterFileSystem {
     verify(mockFs).rename(eq(src), eq(dst), eq(opt));
   }
 
+  /**
+   * Verify that filterFS always returns false, even if local/rawlocal
+   * ever implement multipart uploads.
+   */
+  @Test
+  public void testFilterPathCapabilites() throws Exception {
+    try (FilterFileSystem flfs = new FilterLocalFileSystem()) {
+      flfs.initialize(URI.create("filter:/"), conf);
+      Path src = new Path("/src");
+      assertFalse(
+          "hasPathCapability(FS_MULTIPART_UPLOADER) should have failed for "
+              + flfs,
+          flfs.hasPathCapability(src,
+              CommonPathCapabilities.FS_MULTIPART_UPLOADER));
+    }
+  }
+
   private void checkInit(FilterFileSystem fs, boolean expectInit)
       throws Exception {
     URI uri = URI.create("filter:/");
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index 2097633..8050ce6 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -248,6 +248,9 @@ public class TestHarFileSystem {
     CompletableFuture<FSDataInputStream> openFileWithOptions(
         Path path,
         OpenFileParameters parameters) throws IOException;
+
+    MultipartUploaderBuilder createMultipartUploader(Path basePath)
+        throws IOException;
   }
 
   @Test
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 7a8f083..3192696 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -26,8 +26,10 @@ import java.security.MessageDigest;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 
 import com.google.common.base.Charsets;
+import org.assertj.core.api.Assertions;
 import org.junit.Assume;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -35,22 +37,31 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.CommonPathCapabilities;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MultipartUploader;
-import org.apache.hadoop.fs.MultipartUploaderFactory;
 import org.apache.hadoop.fs.PartHandle;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.UploadHandle;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.DurationInfo;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
+import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 import static org.apache.hadoop.test.LambdaTestUtils.eventually;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
+/**
+ * Tests of multipart uploads.
+ * <p></p>
+ * <i>Note</i>: some of the tests get a random uploader between
+ * the two which are available. If tests fail intermittently,
+ * it may be because different uploaders are being selected.
+ */
 public abstract class AbstractContractMultipartUploaderTest extends
     AbstractFSContractTestBase {
 
@@ -63,36 +74,44 @@ public abstract class AbstractContractMultipartUploaderTest extends
    */
   protected static final int SMALL_FILE = 100;
 
-  private MultipartUploader mpu;
-  private MultipartUploader mpu2;
+  protected static final int CONSISTENCY_INTERVAL = 1000;
+
+  private MultipartUploader uploader0;
+  private MultipartUploader uploader1;
   private final Random random = new Random();
   private UploadHandle activeUpload;
   private Path activeUploadPath;
 
-  protected String getMethodName() {
-    return methodName.getMethodName();
-  }
-
   @Override
   public void setup() throws Exception {
     super.setup();
-    Configuration conf = getContract().getConf();
-    mpu = MultipartUploaderFactory.get(getFileSystem(), conf);
-    mpu2 = MultipartUploaderFactory.get(getFileSystem(), conf);
+
+    final FileSystem fs = getFileSystem();
+    Path testPath = getContract().getTestPath();
+    uploader0 = fs.createMultipartUploader(testPath).build();
+    uploader1 = fs.createMultipartUploader(testPath).build();
   }
 
   @Override
   public void teardown() throws Exception {
-    if (mpu!= null && activeUpload != null) {
+    MultipartUploader uploader = getUploader(1);
+    if (uploader != null) {
+      if (activeUpload != null) {
+          abortUploadQuietly(activeUpload, activeUploadPath);
+      }
       try {
-        mpu.abort(activeUploadPath, activeUpload);
-      } catch (FileNotFoundException ignored) {
-        /* this is fine */
+        // round off with an abort of all uploads
+        Path teardown = getContract().getTestPath();
+        LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
+        CompletableFuture<Integer> f
+            = uploader.abortUploadsUnderPath(teardown);
+        f.get();
       } catch (Exception e) {
-        LOG.info("in teardown", e);
+        LOG.warn("Exeception in teardown", e);
       }
     }
-    cleanupWithLogger(LOG, mpu, mpu2);
+
+    cleanupWithLogger(LOG, uploader0, uploader1);
     super.teardown();
   }
 
@@ -192,16 +211,16 @@ public abstract class AbstractContractMultipartUploaderTest extends
    * @param index index of upload
    * @return an uploader
    */
-  protected MultipartUploader mpu(int index) {
-    return (index % 2 == 0) ? mpu : mpu2;
+  protected MultipartUploader getUploader(int index) {
+    return (index % 2 == 0) ? uploader0 : uploader1;
   }
 
   /**
    * Pick a multipart uploader at random.
    * @return an uploader
    */
-  protected MultipartUploader randomMpu() {
-    return mpu(random.nextInt(10));
+  protected MultipartUploader getRandomUploader() {
+    return getUploader(random.nextInt(10));
   }
 
   /**
@@ -211,39 +230,71 @@ public abstract class AbstractContractMultipartUploaderTest extends
   @Test
   public void testSingleUpload() throws Exception {
     Path file = methodPath();
-    UploadHandle uploadHandle = initializeUpload(file);
+    UploadHandle uploadHandle = startUpload(file);
     Map<Integer, PartHandle> partHandles = new HashMap<>();
     MessageDigest origDigest = DigestUtils.getMd5Digest();
     int size = SMALL_FILE;
     byte[] payload = generatePayload(1, size);
     origDigest.update(payload);
+    // use a single uploader
+    // note: the same is used here as it found a bug in the S3Guard
+    // DDB bulk operation state upload -the previous operation had
+    // added an entry to the ongoing state; this second call
+    // was interpreted as an inconsistent write.
+    MultipartUploader completer = uploader0;
+    // and upload with uploader 1 to validate cross-uploader uploads
     PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
     partHandles.put(1, partHandle);
-    PathHandle fd = completeUpload(file, uploadHandle, partHandles,
-        origDigest,
-        size);
+    PathHandle fd = complete(completer, uploadHandle, file,
+        partHandles);
+
+    validateUpload(file, origDigest, size);
 
+    // verify that if the implementation processes data immediately
+    // then a second attempt at the upload will fail.
     if (finalizeConsumesUploadIdImmediately()) {
       intercept(FileNotFoundException.class,
-          () -> mpu.complete(file, partHandles, uploadHandle));
+          () -> complete(completer, uploadHandle, file, partHandles));
     } else {
-      PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
+      // otherwise, the same or other uploader can try again.
+      PathHandle fd2 = complete(completer, uploadHandle, file, partHandles);
       assertArrayEquals("Path handles differ", fd.toByteArray(),
           fd2.toByteArray());
     }
   }
 
   /**
-   * Initialize an upload.
+   * Complete IO for a specific uploader; await the response.
+   * @param uploader uploader
+   * @param uploadHandle Identifier
+   * @param file  Target path for upload
+   * @param partHandles handles map of part number to part handle
+   * @return unique PathHandle identifier for the uploaded file.
+   */
+  protected PathHandle complete(
+      final MultipartUploader uploader,
+      final UploadHandle uploadHandle,
+      final Path file,
+      final Map<Integer, PartHandle> partHandles)
+      throws IOException {
+    try (DurationInfo d =
+             new DurationInfo(LOG, "Complete upload to %s", file)) {
+      return awaitFuture(
+          uploader.complete(uploadHandle, file, partHandles));
+    }
+  }
+
+  /**
+   * start an upload.
    * This saves the path and upload handle as the active
    * upload, for aborting in teardown
    * @param dest destination
    * @return the handle
    * @throws IOException failure to initialize
    */
-  protected UploadHandle initializeUpload(final Path dest) throws IOException {
+  protected UploadHandle startUpload(final Path dest) throws IOException {
     activeUploadPath = dest;
-    activeUpload = randomMpu().initialize(dest);
+    activeUpload = awaitFuture(getRandomUploader().startUpload(dest));
     return activeUpload;
   }
 
@@ -283,12 +334,17 @@ public abstract class AbstractContractMultipartUploaderTest extends
       final int index,
       final byte[] payload) throws IOException {
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-    PartHandle partHandle = mpu(index)
-        .putPart(file,
-            new ByteArrayInputStream(payload),
-            index,
-            uploadHandle,
-            payload.length);
+    PartHandle partHandle;
+    try (DurationInfo d =
+             new DurationInfo(LOG, "Put part %d (size %s) %s",
+                 index,
+                 payload.length,
+                 file)) {
+      partHandle = awaitFuture(getUploader(index)
+          .putPart(uploadHandle, index, file,
+              new ByteArrayInputStream(payload),
+              payload.length));
+    }
     timer.end("Uploaded part %s", index);
     LOG.info("Upload bandwidth {} MB/s",
         timer.bandwidthDescription(payload.length));
@@ -296,7 +352,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
   }
 
   /**
-   * Complete an upload with the active MPU instance.
+   * Complete an upload with a random uploader.
    * @param file destination
    * @param uploadHandle handle
    * @param partHandles map of handles
@@ -312,37 +368,65 @@ public abstract class AbstractContractMultipartUploaderTest extends
       final int expectedLength) throws IOException {
     PathHandle fd = complete(file, uploadHandle, partHandles);
 
-    FileStatus status = verifyPathExists(getFileSystem(),
-        "Completed file", file);
-    assertEquals("length of " + status,
-        expectedLength, status.getLen());
+    validateUpload(file, origDigest, expectedLength);
+    return fd;
+  }
+
+  /**
+   * Complete an upload with a random uploader.
+   * @param file destination
+   * @param origDigest digest of source data (may be null)
+   * @param expectedLength expected length of result.
+   * @throws IOException IO failure
+   */
+  private void validateUpload(final Path file,
+      final MessageDigest origDigest,
+      final int expectedLength) throws IOException {
+    verifyPathExists(getFileSystem(),
+         "Completed file", file);
+    verifyFileLength(file, expectedLength);
 
     if (origDigest != null) {
       verifyContents(file, origDigest, expectedLength);
     }
-    return fd;
   }
 
   /**
    * Verify the contents of a file.
    * @param file path
    * @param origDigest digest
-   * @param expectedLength expected length (for logging B/W)
+   * @param expectedLength expected length (for logging download bandwidth)
    * @throws IOException IO failure
    */
   protected void verifyContents(final Path file,
                                 final MessageDigest origDigest,
                                 final int expectedLength) throws IOException {
     ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
-    assertArrayEquals("digest of source and " + file
-            + " differ",
-        origDigest.digest(), digest(file));
+    Assertions.assertThat(digest(file))
+        .describedAs("digest of uploaded file %s", file)
+        .isEqualTo(origDigest.digest());
     timer2.end("Completed digest", file);
     LOG.info("Download bandwidth {} MB/s",
         timer2.bandwidthDescription(expectedLength));
   }
 
   /**
+   * Verify the length of a file.
+   * @param file path
+   * @param expectedLength expected length
+   * @throws IOException IO failure
+   */
+  private void verifyFileLength(final Path file, final long expectedLength)
+      throws IOException {
+    FileStatus st = getFileSystem().getFileStatus(file);
+    Assertions.assertThat(st)
+        .describedAs("Uploaded file %s", st)
+        .matches(FileStatus::isFile)
+        .extracting(FileStatus::getLen)
+        .isEqualTo(expectedLength);
+  }
+
+  /**
    * Perform the inner complete without verification.
    * @param file destination path
    * @param uploadHandle upload handle
@@ -353,21 +437,37 @@ public abstract class AbstractContractMultipartUploaderTest extends
   private PathHandle complete(final Path file,
       final UploadHandle uploadHandle,
       final Map<Integer, PartHandle> partHandles) throws IOException {
-    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-    PathHandle fd = randomMpu().complete(file, partHandles, uploadHandle);
-    timer.end("Completed upload to %s", file);
-    return fd;
+    return complete(getRandomUploader(), uploadHandle, file,
+        partHandles);
   }
 
   /**
    * Abort an upload.
-   * @param file path
    * @param uploadHandle handle
+   * @param file path
    * @throws IOException failure
    */
-  private void abortUpload(final Path file, UploadHandle uploadHandle)
+  private void abortUpload(UploadHandle uploadHandle,
+      final Path file)
       throws IOException {
-    randomMpu().abort(file, uploadHandle);
+    try (DurationInfo d =
+             new DurationInfo(LOG, "Abort upload to %s", file)) {
+      awaitFuture(getRandomUploader().abort(uploadHandle, file));
+    }
+  }
+
+  /**
+   * Abort an upload; swallows exceptions.
+   * @param uploadHandle handle
+   * @param file path
+   */
+  private void abortUploadQuietly(UploadHandle uploadHandle, Path file) {
+    try {
+      abortUpload(uploadHandle, file);
+    } catch (FileNotFoundException ignored) {
+    } catch (Exception e) {
+      LOG.info("aborting {}: {}", file, e.toString());
+    }
   }
 
   /**
@@ -377,10 +477,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
   @Test
   public void testMultipartUpload() throws Exception {
     Path file = methodPath();
-    UploadHandle uploadHandle = initializeUpload(file);
+    UploadHandle uploadHandle = startUpload(file);
     Map<Integer, PartHandle> partHandles = new HashMap<>();
     MessageDigest origDigest = DigestUtils.getMd5Digest();
-    final int payloadCount = getTestPayloadCount();
+    int payloadCount = getTestPayloadCount();
     for (int i = 1; i <= payloadCount; ++i) {
       PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
           origDigest);
@@ -400,16 +500,16 @@ public abstract class AbstractContractMultipartUploaderTest extends
     FileSystem fs = getFileSystem();
     Path file = path("testMultipartUpload");
     try (MultipartUploader uploader =
-             MultipartUploaderFactory.get(fs, null)) {
-      UploadHandle uploadHandle = uploader.initialize(file);
+        fs.createMultipartUploader(file).build()) {
+      UploadHandle uploadHandle = uploader.startUpload(file).get();
 
       Map<Integer, PartHandle> partHandles = new HashMap<>();
       MessageDigest origDigest = DigestUtils.getMd5Digest();
       byte[] payload = new byte[0];
       origDigest.update(payload);
       InputStream is = new ByteArrayInputStream(payload);
-      PartHandle partHandle = uploader.putPart(file, is, 1, uploadHandle,
-          payload.length);
+      PartHandle partHandle = awaitFuture(
+          uploader.putPart(uploadHandle, 1, file, is, payload.length));
       partHandles.put(1, partHandle);
       completeUpload(file, uploadHandle, partHandles, origDigest, 0);
     }
@@ -422,7 +522,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
   @Test
   public void testUploadEmptyBlock() throws Exception {
     Path file = methodPath();
-    UploadHandle uploadHandle = initializeUpload(file);
+    UploadHandle uploadHandle = startUpload(file);
     Map<Integer, PartHandle> partHandles = new HashMap<>();
     partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
     completeUpload(file, uploadHandle, partHandles, null, 0);
@@ -435,10 +535,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
   @Test
   public void testMultipartUploadReverseOrder() throws Exception {
     Path file = methodPath();
-    UploadHandle uploadHandle = initializeUpload(file);
+    UploadHandle uploadHandle = startUpload(file);
     Map<Integer, PartHandle> partHandles = new HashMap<>();
     MessageDigest origDigest = DigestUtils.getMd5Digest();
-    final int payloadCount = getTestPayloadCount();
+    int payloadCount = getTestPayloadCount();
     for (int i = 1; i <= payloadCount; ++i) {
       byte[] payload = generatePayload(i);
       origDigest.update(payload);
@@ -459,7 +559,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
       throws Exception {
     describe("Upload in reverse order and the part numbers are not contiguous");
     Path file = methodPath();
-    UploadHandle uploadHandle = initializeUpload(file);
+    UploadHandle uploadHandle = startUpload(file);
     MessageDigest origDigest = DigestUtils.getMd5Digest();
     int payloadCount = 2 * getTestPayloadCount();
     for (int i = 2; i <= payloadCount; i += 2) {
@@ -482,22 +582,22 @@ public abstract class AbstractContractMultipartUploaderTest extends
   public void testMultipartUploadAbort() throws Exception {
     describe("Upload and then abort it before completing");
     Path file = methodPath();
-    UploadHandle uploadHandle = initializeUpload(file);
-    int end = 10;
+    UploadHandle uploadHandle = startUpload(file);
     Map<Integer, PartHandle> partHandles = new HashMap<>();
     for (int i = 12; i > 10; i--) {
       partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
     }
-    abortUpload(file, uploadHandle);
+    abortUpload(uploadHandle, file);
 
     String contents = "ThisIsPart49\n";
     int len = contents.getBytes(Charsets.UTF_8).length;
     InputStream is = IOUtils.toInputStream(contents, "UTF-8");
 
     intercept(IOException.class,
-        () -> mpu.putPart(file, is, 49, uploadHandle, len));
+        () -> awaitFuture(
+            uploader0.putPart(uploadHandle, 49, file, is, len)));
     intercept(IOException.class,
-        () -> mpu.complete(file, partHandles, uploadHandle));
+        () -> complete(uploader0, uploadHandle, file, partHandles));
 
     assertPathDoesNotExist("Uploaded file should not exist", file);
 
@@ -505,9 +605,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
     // consumed by finalization operations (complete, abort).
     if (finalizeConsumesUploadIdImmediately()) {
       intercept(FileNotFoundException.class,
-          () -> abortUpload(file, uploadHandle));
+          () -> abortUpload(uploadHandle, file));
     } else {
-      abortUpload(file, uploadHandle);
+      abortUpload(uploadHandle, file);
     }
   }
 
@@ -519,31 +619,55 @@ public abstract class AbstractContractMultipartUploaderTest extends
     Path file = methodPath();
     ByteBuffer byteBuffer = ByteBuffer.wrap(
         "invalid-handle".getBytes(Charsets.UTF_8));
-    UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
     intercept(FileNotFoundException.class,
-        () -> abortUpload(file, uploadHandle));
+        () -> abortUpload(BBUploadHandle.from(byteBuffer), file));
   }
 
   /**
-   * Trying to abort with a handle of size 0 must fail.
+   * Trying to abort an upload with no data does not create a file.
    */
   @Test
   public void testAbortEmptyUpload() throws Exception {
     describe("initialize upload and abort before uploading data");
     Path file = methodPath();
-    abortUpload(file, initializeUpload(file));
+    abortUpload(startUpload(file), file);
     assertPathDoesNotExist("Uploaded file should not exist", file);
   }
 
+
+  /**
+   * Trying to abort an upload with no data does not create a file.
+   */
+  @Test
+  public void testAbortAllPendingUploads() throws Exception {
+    describe("initialize upload and abort the pending upload");
+    Path path = methodPath();
+    Path file = new Path(path, "child");
+    UploadHandle upload = startUpload(file);
+    try {
+      CompletableFuture<Integer> oF
+          = getRandomUploader().abortUploadsUnderPath(path.getParent());
+      int abortedUploads = awaitFuture(oF);
+      if (abortedUploads >= 0) {
+        // uploads can be aborted
+        Assertions.assertThat(abortedUploads)
+            .describedAs("Number of uploads aborted")
+            .isGreaterThanOrEqualTo(1);
+        assertPathDoesNotExist("Uploaded file should not exist", file);
+      }
+    } finally {
+      abortUploadQuietly(upload, file);
+    }
+  }
+
   /**
    * Trying to abort with a handle of size 0 must fail.
    */
   @Test
   public void testAbortEmptyUploadHandle() throws Exception {
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
-    UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
     intercept(IllegalArgumentException.class,
-        () -> abortUpload(methodPath(), uploadHandle));
+        () -> abortUpload(BBUploadHandle.from(byteBuffer), methodPath()));
   }
 
   /**
@@ -553,10 +677,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
   public void testCompleteEmptyUpload() throws Exception {
     describe("Expect an empty MPU to fail, but still be abortable");
     Path dest = methodPath();
-    UploadHandle handle = initializeUpload(dest);
+    UploadHandle handle = startUpload(dest);
     intercept(IllegalArgumentException.class,
-        () -> mpu.complete(dest, new HashMap<>(), handle));
-    abortUpload(dest, handle);
+        () -> complete(uploader0, handle, dest, new HashMap<>()));
+    abortUpload(handle, dest);
   }
 
   /**
@@ -571,7 +695,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
     byte[] payload = generatePayload(1);
     InputStream is = new ByteArrayInputStream(payload);
     intercept(IllegalArgumentException.class,
-        () -> mpu.putPart(dest, is, 1, emptyHandle, payload.length));
+        () -> uploader0.putPart(emptyHandle, 1, dest, is, payload.length));
   }
 
   /**
@@ -581,7 +705,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
   public void testCompleteEmptyUploadID() throws Exception {
     describe("Expect IllegalArgumentException when complete uploadID is empty");
     Path dest = methodPath();
-    UploadHandle realHandle = initializeUpload(dest);
+    UploadHandle realHandle = startUpload(dest);
     UploadHandle emptyHandle =
         BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
     Map<Integer, PartHandle> partHandles = new HashMap<>();
@@ -590,14 +714,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
     partHandles.put(1, partHandle);
 
     intercept(IllegalArgumentException.class,
-        () -> mpu.complete(dest, partHandles, emptyHandle));
+        () -> complete(uploader0, emptyHandle, dest, partHandles));
 
     // and, while things are setup, attempt to complete with
     // a part index of 0
     partHandles.clear();
     partHandles.put(0, partHandle);
     intercept(IllegalArgumentException.class,
-        () -> mpu.complete(dest, partHandles, realHandle));
+        () -> complete(uploader0, realHandle, dest, partHandles));
   }
 
   /**
@@ -610,7 +734,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
   public void testDirectoryInTheWay() throws Exception {
     FileSystem fs = getFileSystem();
     Path file = methodPath();
-    UploadHandle uploadHandle = initializeUpload(file);
+    UploadHandle uploadHandle = startUpload(file);
     Map<Integer, PartHandle> partHandles = new HashMap<>();
     int size = SMALL_FILE;
     PartHandle partHandle = putPart(file, uploadHandle, 1,
@@ -622,7 +746,7 @@ public abstract class AbstractContractMultipartUploaderTest extends
         () -> completeUpload(file, uploadHandle, partHandles, null,
             size));
     // abort should still work
-    abortUpload(file, uploadHandle);
+    abortUpload(uploadHandle, file);
   }
 
   @Test
@@ -630,46 +754,44 @@ public abstract class AbstractContractMultipartUploaderTest extends
 
     // if the FS doesn't support concurrent uploads, this test is
     // required to fail during the second initialization.
-    final boolean concurrent = supportsConcurrentUploadsToSamePath();
+    boolean concurrent = supportsConcurrentUploadsToSamePath();
 
     describe("testing concurrent uploads, MPU support for this is "
         + concurrent);
-    final FileSystem fs = getFileSystem();
-    final Path file = methodPath();
-    final int size1 = SMALL_FILE;
-    final int partId1 = 1;
-    final byte[] payload1 = generatePayload(partId1, size1);
-    final MessageDigest digest1 = DigestUtils.getMd5Digest();
+    Path file = methodPath();
+    int size1 = SMALL_FILE;
+    int partId1 = 1;
+    byte[] payload1 = generatePayload(partId1, size1);
+    MessageDigest digest1 = DigestUtils.getMd5Digest();
     digest1.update(payload1);
-    final UploadHandle upload1 = initializeUpload(file);
-    final Map<Integer, PartHandle> partHandles1 = new HashMap<>();
+    UploadHandle upload1 = startUpload(file);
+    Map<Integer, PartHandle> partHandles1 = new HashMap<>();
 
     // initiate part 2
     // by using a different size, it's straightforward to see which
     // version is visible, before reading/digesting the contents
-    final int size2 = size1 * 2;
-    final int partId2 = 2;
-    final byte[] payload2 = generatePayload(partId1, size2);
-    final MessageDigest digest2 = DigestUtils.getMd5Digest();
+    int size2 = size1 * 2;
+    int partId2 = 2;
+    byte[] payload2 = generatePayload(partId1, size2);
+    MessageDigest digest2 = DigestUtils.getMd5Digest();
     digest2.update(payload2);
 
-    final UploadHandle upload2;
+    UploadHandle upload2;
     try {
-      upload2 = initializeUpload(file);
+      upload2 = startUpload(file);
       Assume.assumeTrue(
           "The Filesystem is unexpectedly supporting concurrent uploads",
           concurrent);
     } catch (IOException e) {
       if (!concurrent) {
         // this is expected, so end the test
-        LOG.debug("Expected exception raised on concurrent uploads {}", e);
+        LOG.debug("Expected exception raised on concurrent uploads", e);
         return;
       } else {
         throw e;
       }
     }
-    final Map<Integer, PartHandle> partHandles2 = new HashMap<>();
-
+    Map<Integer, PartHandle> partHandles2 = new HashMap<>();
 
     assertNotEquals("Upload handles match", upload1, upload2);
 
@@ -686,13 +808,21 @@ public abstract class AbstractContractMultipartUploaderTest extends
     // now upload part 2.
     complete(file, upload2, partHandles2);
     // and await the visible length to match
-    eventually(timeToBecomeConsistentMillis(), 500,
-        () -> {
-          FileStatus status = fs.getFileStatus(file);
-          assertEquals("File length in " + status,
-              size2, status.getLen());
-        });
+    eventually(timeToBecomeConsistentMillis(),
+        () -> verifyFileLength(file, size2),
+        new LambdaTestUtils.ProportionalRetryInterval(
+            CONSISTENCY_INTERVAL,
+            timeToBecomeConsistentMillis()));
 
     verifyContents(file, digest2, size2);
   }
+
+  @Test
+  public void testPathCapabilities() throws Throwable {
+    FileSystem fs = getFileSystem();
+    Assertions.assertThat(fs.hasPathCapability(getContract().getTestPath(),
+        CommonPathCapabilities.FS_MULTIPART_UPLOADER))
+        .describedAs("fs %s, lacks multipart upload capability", fs)
+        .isTrue();
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
deleted file mode 100644
index f675ddf..0000000
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.contract.localfs;
-
-import org.junit.Assume;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
-import org.apache.hadoop.fs.contract.AbstractFSContract;
-
-/**
- * Test the FileSystemMultipartUploader on local file system.
- */
-public class TestLocalFSContractMultipartUploader
-    extends AbstractContractMultipartUploaderTest {
-
-  @Override
-  public void setup() throws Exception {
-    Assume.assumeTrue("Skipping until HDFS-13934", false);
-    super.setup();
-  }
-
-  @Override
-  protected AbstractFSContract createContract(Configuration conf) {
-    return new LocalFSContract(conf);
-  }
-
-  /**
-   * There is no real need to upload any particular size.
-   * @return 1 kilobyte
-   */
-  @Override
-  protected int partSizeInBytes() {
-    return 1024;
-  }
-
-  @Override
-  protected boolean finalizeConsumesUploadIdImmediately() {
-    return true;
-  }
-
-  @Override
-  protected boolean supportsConcurrentUploadsToSamePath() {
-    return true;
-  }
-}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 68476a5..7af89ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
 import org.apache.hadoop.fs.InvalidPathHandleException;
 import org.apache.hadoop.fs.PartialListing;
+import org.apache.hadoop.fs.MultipartUploaderBuilder;
 import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Options;
@@ -66,6 +67,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.impl.FileSystemMultipartUploaderBuilder;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -3608,4 +3610,10 @@ public class DistributedFileSystem extends FileSystem
 
     return super.hasPathCapability(p, capability);
   }
+
+  @Override
+  public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
+      throws IOException {
+    return new FileSystemMultipartUploaderBuilder(this, basePath);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java
index 6cad69a..30e7e00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java
@@ -47,6 +47,7 @@ public final class DfsPathCapabilities {
     case CommonPathCapabilities.FS_CHECKSUMS:
     case CommonPathCapabilities.FS_CONCAT:
     case CommonPathCapabilities.FS_LIST_CORRUPT_FILE_BLOCKS:
+    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
     case CommonPathCapabilities.FS_PATHHANDLES:
     case CommonPathCapabilities.FS_PERMISSIONS:
     case CommonPathCapabilities.FS_SNAPSHOTS:
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index 783b337..0d1ce76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -76,10 +76,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
+import org.apache.hadoop.fs.MultipartUploaderBuilder;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.PathCapabilities;
 import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.impl.FileSystemMultipartUploaderBuilder;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@@ -2117,6 +2119,12 @@ public class WebHdfsFileSystem extends FileSystem
     return super.hasPathCapability(p, capability);
   }
 
+  @Override
+  public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
+      throws IOException {
+    return new FileSystemMultipartUploaderBuilder(this, basePath);
+  }
+
   /**
    * This class is used for opening, reading, and seeking files while using the
    * WebHdfsFileSystem. This class will invoke the retry policy when performing
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
deleted file mode 100644
index b153fd9..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.hadoop.hdfs.DFSMultipartUploaderFactory
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index fa0251a..e5b08f1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -108,8 +108,11 @@ import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
 import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.RenameOperation;
+import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
+import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatisticsImpl;
 import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
 import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
 import org.apache.hadoop.io.IOUtils;
@@ -4493,6 +4496,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
           ETAG_CHECKSUM_ENABLED_DEFAULT);
 
+    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
+      return true;
+
     default:
       return super.hasPathCapability(p, capability);
     }
@@ -4722,6 +4728,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     return result;
   }
 
+  @Override
+  public S3AMultipartUploaderBuilder createMultipartUploader(
+      final Path basePath)
+      throws IOException {
+    StoreContext ctx = createStoreContext();
+    return new S3AMultipartUploaderBuilder(this,
+        getWriteOperationHelper(),
+        ctx,
+        basePath,
+        new S3AMultipartUploaderStatisticsImpl(ctx::incrementStatistic));
+  }
+
   /**
    * Build an immutable store context.
    * If called while the FS is being initialized,
@@ -4731,24 +4749,24 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   @InterfaceAudience.Private
   public StoreContext createStoreContext() {
-    return new StoreContext(
-        getUri(),
-        getBucket(),
-        getConf(),
-        getUsername(),
-        owner,
-        boundedThreadPool,
-        executorCapacity,
-        invoker,
-        getInstrumentation(),
-        getStorageStatistics(),
-        getInputPolicy(),
-        changeDetectionPolicy,
-        enableMultiObjectsDelete,
-        metadataStore,
-        useListV1,
-        new ContextAccessorsImpl(),
-        getTtlTimeProvider());
+    return new StoreContextBuilder().setFsURI(getUri())
+        .setBucket(getBucket())
+        .setConfiguration(getConf())
+        .setUsername(getUsername())
+        .setOwner(owner)
+        .setExecutor(boundedThreadPool)
+        .setExecutorCapacity(executorCapacity)
+        .setInvoker(invoker)
+        .setInstrumentation(getInstrumentation())
+        .setStorageStatistics(getStorageStatistics())
+        .setInputPolicy(getInputPolicy())
+        .setChangeDetectionPolicy(changeDetectionPolicy)
+        .setMultiObjectDeleteEnabled(enableMultiObjectsDelete)
+        .setMetadataStore(metadataStore)
+        .setUseListV1(useListV1)
+        .setContextAccessors(new ContextAccessorsImpl())
+        .setTimeProvider(getTtlTimeProvider())
+        .build();
   }
 
   /**
@@ -4776,5 +4794,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     public String getBucketLocation() throws IOException {
       return S3AFileSystem.this.getBucketLocation();
     }
+
+    @Override
+    public Path makeQualified(final Path path) {
+      return S3AFileSystem.this.makeQualified(path);
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index b9918b5..cb0a434 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -193,7 +193,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
       S3GUARD_METADATASTORE_AUTHORITATIVE_DIRECTORIES_UPDATED,
       STORE_IO_THROTTLED,
       DELEGATION_TOKENS_ISSUED,
-      FILES_DELETE_REJECTED
+      FILES_DELETE_REJECTED,
+      MULTIPART_INSTANTIATED,
+      MULTIPART_PART_PUT,
+      MULTIPART_PART_PUT_BYTES,
+      MULTIPART_UPLOAD_ABORTED,
+      MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED,
+      MULTIPART_UPLOAD_COMPLETED,
+      MULTIPART_UPLOAD_STARTED
   };
 
   private static final Statistic[] GAUGES_TO_CREATE = {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
deleted file mode 100644
index cf58751..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.s3a;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BBPartHandle;
-import org.apache.hadoop.fs.BBUploadHandle;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.MultipartUploader;
-import org.apache.hadoop.fs.MultipartUploaderFactory;
-import org.apache.hadoop.fs.PartHandle;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathHandle;
-import org.apache.hadoop.fs.UploadHandle;
-
-import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
-
-/**
- * MultipartUploader for S3AFileSystem. This uses the S3 multipart
- * upload mechanism.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class S3AMultipartUploader extends MultipartUploader {
-
-  private final S3AFileSystem s3a;
-
-  /** Header for Parts: {@value}. */
-
-  public static final String HEADER = "S3A-part01";
-
-  public S3AMultipartUploader(FileSystem fs, Configuration conf) {
-    Preconditions.checkArgument(fs instanceof S3AFileSystem,
-        "Wrong filesystem: expected S3A but got %s", fs);
-    s3a = (S3AFileSystem) fs;
-  }
-
-  @Override
-  public UploadHandle initialize(Path filePath) throws IOException {
-    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
-    String key = s3a.pathToKey(filePath);
-    String uploadId = writeHelper.initiateMultiPartUpload(key);
-    return BBUploadHandle.from(ByteBuffer.wrap(
-        uploadId.getBytes(Charsets.UTF_8)));
-  }
-
-  @Override
-  public PartHandle putPart(Path filePath, InputStream inputStream,
-      int partNumber, UploadHandle uploadId, long lengthInBytes)
-      throws IOException {
-    checkPutArguments(filePath, inputStream, partNumber, uploadId,
-        lengthInBytes);
-    byte[] uploadIdBytes = uploadId.toByteArray();
-    checkUploadId(uploadIdBytes);
-    String key = s3a.pathToKey(filePath);
-    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
-    String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
-        Charsets.UTF_8);
-    UploadPartRequest request = writeHelper.newUploadPartRequest(key,
-        uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 0L);
-    UploadPartResult result = writeHelper.uploadPart(request);
-    String eTag = result.getETag();
-    return BBPartHandle.from(
-        ByteBuffer.wrap(
-            buildPartHandlePayload(eTag, lengthInBytes)));
-  }
-
-  @Override
-  public PathHandle complete(Path filePath,
-      Map<Integer, PartHandle> handleMap,
-      UploadHandle uploadId)
-      throws IOException {
-    byte[] uploadIdBytes = uploadId.toByteArray();
-    checkUploadId(uploadIdBytes);
-
-    checkPartHandles(handleMap);
-    List<Map.Entry<Integer, PartHandle>> handles =
-        new ArrayList<>(handleMap.entrySet());
-    handles.sort(Comparator.comparingInt(Map.Entry::getKey));
-    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
-    String key = s3a.pathToKey(filePath);
-
-    String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
-        Charsets.UTF_8);
-    ArrayList<PartETag> eTags = new ArrayList<>();
-    eTags.ensureCapacity(handles.size());
-    long totalLength = 0;
-    for (Map.Entry<Integer, PartHandle> handle : handles) {
-      byte[] payload = handle.getValue().toByteArray();
-      Pair<Long, String> result = parsePartHandlePayload(payload);
-      totalLength += result.getLeft();
-      eTags.add(new PartETag(handle.getKey(), result.getRight()));
-    }
-    AtomicInteger errorCount = new AtomicInteger(0);
-    CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
-        key, uploadIdStr, eTags, totalLength, errorCount);
-
-    byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
-    return (PathHandle) () -> ByteBuffer.wrap(eTag);
-  }
-
-  @Override
-  public void abort(Path filePath, UploadHandle uploadId) throws IOException {
-    final byte[] uploadIdBytes = uploadId.toByteArray();
-    checkUploadId(uploadIdBytes);
-    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
-    String key = s3a.pathToKey(filePath);
-    String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
-        Charsets.UTF_8);
-    writeHelper.abortMultipartCommit(key, uploadIdString);
-  }
-
-  /**
-   * Factory for creating MultipartUploader objects for s3a:// FileSystems.
-   */
-  public static class Factory extends MultipartUploaderFactory {
-    @Override
-    protected MultipartUploader createMultipartUploader(FileSystem fs,
-        Configuration conf) {
-      if (FS_S3A.equals(fs.getScheme())) {
-        return new S3AMultipartUploader(fs, conf);
-      }
-      return null;
-    }
-  }
-
-  /**
-   * Build the payload for marshalling.
-   * @param eTag upload etag
-   * @param len length
-   * @return a byte array to marshall.
-   * @throws IOException error writing the payload
-   */
-  @VisibleForTesting
-  static byte[] buildPartHandlePayload(String eTag, long len)
-      throws IOException {
-    Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
-        "Empty etag");
-    Preconditions.checkArgument(len >= 0,
-        "Invalid length");
-
-    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-    try(DataOutputStream output = new DataOutputStream(bytes)) {
-      output.writeUTF(HEADER);
-      output.writeLong(len);
-      output.writeUTF(eTag);
-    }
-    return bytes.toByteArray();
-  }
-
-  /**
-   * Parse the payload marshalled as a part handle.
-   * @param data handle data
-   * @return the length and etag
-   * @throws IOException error reading the payload
-   */
-  @VisibleForTesting
-  static Pair<Long, String> parsePartHandlePayload(byte[] data)
-      throws IOException {
-
-    try(DataInputStream input =
-            new DataInputStream(new ByteArrayInputStream(data))) {
-      final String header = input.readUTF();
-      if (!HEADER.equals(header)) {
-        throw new IOException("Wrong header string: \"" + header + "\"");
-      }
-      final long len = input.readLong();
-      final String etag = input.readUTF();
-      if (len < 0) {
-        throw new IOException("Negative length");
-      }
-      return Pair.of(len, etag);
-    }
-  }
-
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 1d3d475..8153169 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -234,7 +234,29 @@ public enum Statistic {
       "Rate of S3 request throttling"),
 
   DELEGATION_TOKENS_ISSUED("delegation_tokens_issued",
-      "Number of delegation tokens issued");
+      "Number of delegation tokens issued"),
+
+  MULTIPART_INSTANTIATED(
+      "multipart_instantiated",
+      "Multipart Uploader Instantiated"),
+  MULTIPART_PART_PUT(
+      "multipart_part_put",
+      "Multipart Part Put Operation"),
+  MULTIPART_PART_PUT_BYTES(
+      "multipart_part_put_bytes",
+      "Multipart Part Put Bytes"),
+  MULTIPART_UPLOAD_ABORTED(
+      "multipart_upload_aborted",
+      "Multipart Upload Aborted"),
+  MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED(
+      "multipart_upload_abort_under_path_invoked",
+      "Multipart Upload Abort Udner Path Invoked"),
+  MULTIPART_UPLOAD_COMPLETED(
+      "multipart_upload_completed",
+      "Multipart Upload Completed"),
+  MULTIPART_UPLOAD_STARTED(
+      "multipart_upload_started",
+      "Multipart Upload Started");
 
   private static final Map<String, Statistic> SYMBOL_MAP =
       new HashMap<>(Statistic.values().length);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index ab53486..26d0942 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -87,7 +87,7 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class WriteOperationHelper {
+public class WriteOperationHelper implements WriteOperations {
   private static final Logger LOG =
       LoggerFactory.getLogger(WriteOperationHelper.class);
 
@@ -254,11 +254,11 @@ public class WriteOperationHelper {
       Retried retrying,
       @Nullable BulkOperationState operationState) throws IOException {
     if (partETags.isEmpty()) {
-      throw new IOException(
-          "No upload parts in multipart upload to " + destKey);
+      throw new PathIOException(destKey,
+          "No upload parts in multipart upload");
     }
     CompleteMultipartUploadResult uploadResult =
-        invoker.retry("Completing multipart commit", destKey,
+        invoker.retry("Completing multipart upload", destKey,
             true,
             retrying,
             () -> {
@@ -560,8 +560,20 @@ public class WriteOperationHelper {
    */
   public BulkOperationState initiateCommitOperation(
       Path path) throws IOException {
+    return initiateOperation(path, BulkOperationState.OperationType.Commit);
+  }
+
+  /**
+   * Initiate a commit operation through any metastore.
+   * @param path path under which the writes will all take place.
+   * @param operationType operation to initiate
+   * @return an possibly null operation state from the metastore.
+   * @throws IOException failure to instantiate.
+   */
+  public BulkOperationState initiateOperation(final Path path,
+      final BulkOperationState.OperationType operationType) throws IOException {
     return S3Guard.initiateBulkWrite(owner.getMetadataStore(),
-        BulkOperationState.OperationType.Commit, path);
+        operationType, path);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
new file mode 100644
index 0000000..95cbd7e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.MultipartUpload;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.SelectObjectContentRequest;
+import com.amazonaws.services.s3.model.SelectObjectContentResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.services.s3.transfer.model.UploadResult;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+
+/**
+ * Operations to update the store.
+ * This is effectively a private internal API for classes used as part
+ * of the S3A implementation.
+ * New extension points SHOULD use this interface -provided there
+ * is no plan to backport to previous versions. In those situations,
+ * use `WriteOperationHelper` directly.
+ * @since Hadoop 3.3.0
+ */
+public interface WriteOperations {
+
+  /**
+   * Execute a function with retry processing.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param operation operation to execute
+   * @param <T> type of return value
+   * @return the result of the call
+   * @throws IOException any IOE raised, or translated exception
+   */
+  <T> T retry(String action,
+      String path,
+      boolean idempotent,
+      Invoker.Operation<T> operation)
+      throws IOException;
+
+  /**
+   * Create a {@link PutObjectRequest} request against the specific key.
+   * @param destKey destination key
+   * @param inputStream source data.
+   * @param length size, if known. Use -1 for not known
+   * @return the request
+   */
+  PutObjectRequest createPutObjectRequest(String destKey,
+      InputStream inputStream, long length);
+
+  /**
+   * Create a {@link PutObjectRequest} request to upload a file.
+   * @param dest key to PUT to.
+   * @param sourceFile source file
+   * @return the request
+   */
+  PutObjectRequest createPutObjectRequest(String dest,
+      File sourceFile);
+
+  /**
+   * Callback on a successful write.
+   * @param length length of the write
+   */
+  void writeSuccessful(long length);
+
+  /**
+   * Callback on a write failure.
+   * @param ex Any exception raised which triggered the failure.
+   */
+  void writeFailed(Exception ex);
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   * @param length size, if known. Use -1 for not known
+   * @return a new metadata instance
+   */
+  ObjectMetadata newObjectMetadata(long length);
+
+  /**
+   * Start the multipart upload process.
+   * Retry policy: retrying, translated.
+   * @param destKey destination of upload
+   * @return the upload result containing the ID
+   * @throws IOException IO problem
+   */
+  @Retries.RetryTranslated
+  String initiateMultiPartUpload(String destKey) throws IOException;
+
+  /**
+   * This completes a multipart upload to the destination key via
+   * {@code finalizeMultipartUpload()}.
+   * Retry policy: retrying, translated.
+   * Retries increment the {@code errorCount} counter.
+   * @param destKey destination
+   * @param uploadId multipart operation Id
+   * @param partETags list of partial uploads
+   * @param length length of the upload
+   * @param errorCount a counter incremented by 1 on every error; for
+   * use in statistics
+   * @return the result of the operation.
+   * @throws IOException if problems arose which could not be retried, or
+   * the retry count was exceeded
+   */
+  @Retries.RetryTranslated
+  CompleteMultipartUploadResult completeMPUwithRetries(
+      String destKey,
+      String uploadId,
+      List<PartETag> partETags,
+      long length,
+      AtomicInteger errorCount)
+      throws IOException;
+
+  /**
+   * Abort a multipart upload operation.
+   * @param destKey destination key of the upload
+   * @param uploadId multipart operation Id
+   * @param retrying callback invoked on every retry
+   * @throws IOException failure to abort
+   * @throws FileNotFoundException if the abort ID is unknown
+   */
+  @Retries.RetryTranslated
+  void abortMultipartUpload(String destKey, String uploadId,
+      Invoker.Retried retrying)
+      throws IOException;
+
+  /**
+   * Abort a multipart commit operation.
+   * @param upload upload to abort.
+   * @throws IOException on problems.
+   */
+  @Retries.RetryTranslated
+  void abortMultipartUpload(MultipartUpload upload)
+      throws IOException;
+
+  /**
+   * Abort multipart uploads under a path: limited to the first
+   * few hundred.
+   * @param prefix prefix for uploads to abort
+   * @return a count of aborts
+   * @throws IOException trouble; FileNotFoundExceptions are swallowed.
+   */
+  @Retries.RetryTranslated
+  int abortMultipartUploadsUnderPath(String prefix)
+      throws IOException;
+
+  /**
+   * Abort a multipart commit operation.
+   * @param destKey destination key of ongoing operation
+   * @param uploadId multipart operation Id
+   * @throws IOException on problems.
+   * @throws FileNotFoundException if the abort ID is unknown
+   */
+  @Retries.RetryTranslated
+  void abortMultipartCommit(String destKey, String uploadId)
+      throws IOException;
+
+  /**
+   * Create and initialize a part request of a multipart upload.
+   * Exactly one of: {@code uploadStream} or {@code sourceFile}
+   * must be specified.
+   * A subset of the file may be posted, by providing the starting point
+   * in {@code offset} and a length of block in {@code size} equal to
+   * or less than the remaining bytes.
+   * @param destKey destination key of ongoing operation
+   * @param uploadId ID of ongoing upload
+   * @param partNumber current part number of the upload
+   * @param size amount of data
+   * @param uploadStream source of data to upload
+   * @param sourceFile optional source file.
+   * @param offset offset in file to start reading.
+   * @return the request.
+   * @throws IllegalArgumentException if the parameters are invalid -including
+   * @throws PathIOException if the part number is out of range.
+   */
+  UploadPartRequest newUploadPartRequest(
+      String destKey,
+      String uploadId,
+      int partNumber,
+      int size,
+      InputStream uploadStream,
+      File sourceFile,
+      Long offset) throws PathIOException;
+
+  /**
+   * PUT an object directly (i.e. not via the transfer manager).
+   * Byte length is calculated from the file length, or, if there is no
+   * file, from the content length of the header.
+   * @param putObjectRequest the request
+   * @return the upload initiated
+   * @throws IOException on problems
+   */
+  @Retries.RetryTranslated
+  PutObjectResult putObject(PutObjectRequest putObjectRequest)
+      throws IOException;
+
+  /**
+   * PUT an object via the transfer manager.
+   * @param putObjectRequest the request
+   * @return the result of the operation
+   * @throws IOException on problems
+   */
+  @Retries.RetryTranslated
+  UploadResult uploadObject(PutObjectRequest putObjectRequest)
+      throws IOException;
+
+  /**
+   * Revert a commit by deleting the file.
+   * Relies on retry code in filesystem
+   * @throws IOException on problems
+   * @param destKey destination key
+   * @param operationState operational state for a bulk update
+   */
+  @Retries.OnceTranslated
+  void revertCommit(String destKey,
+      @Nullable BulkOperationState operationState) throws IOException;
+
+  /**
+   * This completes a multipart upload to the destination key via
+   * {@code finalizeMultipartUpload()}.
+   * Retry policy: retrying, translated.
+   * Retries increment the {@code errorCount} counter.
+   * @param destKey destination
+   * @param uploadId multipart operation Id
+   * @param partETags list of partial uploads
+   * @param length length of the upload
+   * @param operationState operational state for a bulk update
+   * @return the result of the operation.
+   * @throws IOException if problems arose which could not be retried, or
+   * the retry count was exceeded
+   */
+  @Retries.RetryTranslated
+  CompleteMultipartUploadResult commitUpload(
+      String destKey,
+      String uploadId,
+      List<PartETag> partETags,
+      long length,
+      @Nullable BulkOperationState operationState)
+      throws IOException;
+
+  /**
+   * Initiate a commit operation through any metastore.
+   * @param path path under which the writes will all take place.
+   * @return an possibly null operation state from the metastore.
+   * @throws IOException failure to instantiate.
+   */
+  BulkOperationState initiateCommitOperation(
+      Path path) throws IOException;
+
+  /**
+   * Initiate a commit operation through any metastore.
+   * @param path path under which the writes will all take place.
+   * @param operationType operation to initiate
+   * @return an possibly null operation state from the metastore.
+   * @throws IOException failure to instantiate.
+   */
+  BulkOperationState initiateOperation(Path path,
+      BulkOperationState.OperationType operationType) throws IOException;
+
+  /**
+   * Upload part of a multi-partition file.
+   * @param request request
+   * @return the result of the operation.
+   * @throws IOException on problems
+   */
+  @Retries.RetryTranslated
+  UploadPartResult uploadPart(UploadPartRequest request)
+      throws IOException;
+
+  /**
+   * Get the configuration of this instance; essentially the owning
+   * filesystem configuration.
+   * @return the configuration.
+   */
+  Configuration getConf();
+
+  /**
+   * Create a S3 Select request for the destination path.
+   * This does not build the query.
+   * @param path pre-qualified path for query
+   * @return the request
+   */
+  SelectObjectContentRequest newSelectRequest(Path path);
+
+  /**
+   * Execute an S3 Select operation.
+   * On a failure, the request is only logged at debug to avoid the
+   * select exception being printed.
+   * @param source source for selection
+   * @param request Select request to issue.
+   * @param action the action for use in exception creation
+   * @return response
+   * @throws IOException failure
+   */
+  @Retries.RetryTranslated
+  SelectObjectContentResult select(
+      Path source,
+      SelectObjectContentRequest request,
+      String action)
+      throws IOException;
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
index b10cc6d..d39c649 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
@@ -73,4 +73,12 @@ public interface ContextAccessors {
    */
   @Retries.RetryTranslated
   String getBucketLocation() throws IOException;
+
+  /**
+   * Qualify a path.
+   *
+   * @param path path to qualify/normalize
+   * @return possibly new path.
+   */
+  Path makeQualified(Path path);
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
new file mode 100644
index 0000000..9f131dd
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BBPartHandle;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathHandle;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.UploadHandle;
+import org.apache.hadoop.fs.impl.AbstractMultipartUploader;
+import org.apache.hadoop.fs.s3a.WriteOperations;
+import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatistics;
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
+
+/**
+ * MultipartUploader for S3AFileSystem. This uses the S3 multipart
+ * upload mechanism.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3AMultipartUploader extends AbstractMultipartUploader {
+
+  private final S3AMultipartUploaderBuilder builder;
+
+  /** Header for serialized Parts: {@value}. */
+
+  public static final String HEADER = "S3A-part01";
+
+  private final WriteOperations writeOperations;
+
+  private final StoreContext context;
+
+  private final S3AMultipartUploaderStatistics statistics;
+
+  /**
+   * Bulk state; demand created and then retained.
+   */
+  private BulkOperationState operationState;
+
+  /**
+   * Was an operation state requested but not returned?
+   */
+  private boolean noOperationState;
+
+  /**
+   * Instatiate; this is called by the builder.
+   * @param builder builder
+   * @param writeOperations writeOperations
+   * @param context s3a context
+   * @param statistics statistics callbacks
+   */
+  S3AMultipartUploader(
+      final S3AMultipartUploaderBuilder builder,
+      final WriteOperations writeOperations,
+      final StoreContext context,
+      final S3AMultipartUploaderStatistics statistics) {
+    super(context.makeQualified(builder.getPath()));
+    this.builder = builder;
+    this.writeOperations = writeOperations;
+    this.context = context;
+    this.statistics = statistics;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (operationState != null) {
+      operationState.close();
+    }
+    super.close();
+  }
+
+  /**
+   * Retrieve the operation state; create one on demand if needed
+   * <i>and there has been no unsuccessful attempt to create one.</i>
+   * @return an active operation state.
+   * @throws IOException failure
+   */
+  private synchronized BulkOperationState retrieveOperationState()
+      throws IOException {
+    if (operationState == null && !noOperationState) {
+      operationState = writeOperations.initiateOperation(getBasePath(),
+          BulkOperationState.OperationType.Upload);
+      noOperationState = operationState != null;
+    }
+    return operationState;
+  }
+
+  @Override
+  public CompletableFuture<UploadHandle> startUpload(
+      final Path filePath)
+      throws IOException {
+    Path dest = context.makeQualified(filePath);
+    checkPath(dest);
+    String key = context.pathToKey(dest);
+    return context.submit(new CompletableFuture<>(),
+        () -> {
+          String uploadId = writeOperations.initiateMultiPartUpload(key);
+          statistics.uploadStarted();
+          return BBUploadHandle.from(ByteBuffer.wrap(
+              uploadId.getBytes(Charsets.UTF_8)));
+        });
+  }
+
+  @Override
+  public CompletableFuture<PartHandle> putPart(
+      final UploadHandle uploadId,
+      final int partNumber,
+      final Path filePath,
+      final InputStream inputStream,
+      final long lengthInBytes)
+      throws IOException {
+    Path dest = context.makeQualified(filePath);
+    checkPutArguments(dest, inputStream, partNumber, uploadId,
+        lengthInBytes);
+    byte[] uploadIdBytes = uploadId.toByteArray();
+    checkUploadId(uploadIdBytes);
+    String key = context.pathToKey(dest);
+    String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    return context.submit(new CompletableFuture<>(),
+        () -> {
+          UploadPartRequest request = writeOperations.newUploadPartRequest(key,
+              uploadIdString, partNumber, (int) lengthInBytes, inputStream,
+              null, 0L);
+          UploadPartResult result = writeOperations.uploadPart(request);
+          statistics.partPut(lengthInBytes);
+          String eTag = result.getETag();
+          return BBPartHandle.from(
+              ByteBuffer.wrap(
+                  buildPartHandlePayload(
+                      filePath.toUri().toString(),
+                      uploadIdString,
+                      result.getPartNumber(),
+                      eTag,
+                      lengthInBytes)));
+        });
+  }
+
+  @Override
+  public CompletableFuture<PathHandle> complete(
+      final UploadHandle uploadHandle,
+      final Path filePath,
+      final Map<Integer, PartHandle> handleMap)
+      throws IOException {
+    Path dest = context.makeQualified(filePath);
+    checkPath(dest);
+    byte[] uploadIdBytes = uploadHandle.toByteArray();
+    checkUploadId(uploadIdBytes);
+    checkPartHandles(handleMap);
+    List<Map.Entry<Integer, PartHandle>> handles =
+        new ArrayList<>(handleMap.entrySet());
+    handles.sort(Comparator.comparingInt(Map.Entry::getKey));
+    int count = handles.size();
+    String key = context.pathToKey(dest);
+
+    String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    ArrayList<PartETag> eTags = new ArrayList<>();
+    eTags.ensureCapacity(handles.size());
+    long totalLength = 0;
+    // built up to identify duplicates -if the size of this set is
+    // below that of the number of parts, then there's a duplicate entry.
+    Set<Integer> ids = new HashSet<>(count);
+
+    for (Map.Entry<Integer, PartHandle> handle : handles) {
+      PartHandlePayload payload = parsePartHandlePayload(
+          handle.getValue().toByteArray());
+      payload.validate(uploadIdStr, filePath);
+      ids.add(payload.getPartNumber());
+      totalLength += payload.getLen();
+      eTags.add(new PartETag(handle.getKey(), payload.getEtag()));
+    }
+    Preconditions.checkArgument(ids.size() == count,
+        "Duplicate PartHandles");
+
+    // retrieve/create operation state for scalability of completion.
+    final BulkOperationState state = retrieveOperationState();
+    long finalLen = totalLength;
+    return context.submit(new CompletableFuture<>(),
+        () -> {
+          CompleteMultipartUploadResult result =
+              writeOperations.commitUpload(
+                  key,
+                  uploadIdStr,
+                  eTags,
+                  finalLen,
+                  state);
+
+          byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
+          statistics.uploadCompleted();
+          return (PathHandle) () -> ByteBuffer.wrap(eTag);
+        });
+  }
+
+  @Override
+  public CompletableFuture<Void> abort(
+      final UploadHandle uploadId,
+      final Path filePath)
+      throws IOException {
+    Path dest = context.makeQualified(filePath);
+    checkPath(dest);
+    final byte[] uploadIdBytes = uploadId.toByteArray();
+    checkUploadId(uploadIdBytes);
+    String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    return context.submit(new CompletableFuture<>(),
+        () -> {
+          writeOperations.abortMultipartCommit(
+              context.pathToKey(dest),
+              uploadIdString);
+          statistics.uploadAborted();
+          return null;
+        });
+  }
+
+  /**
+   * Upload all MPUs under the path.
+   * @param path path to abort uploads under.
+   * @return a future which eventually returns the number of entries found
+   * @throws IOException submission failure
+   */
+  @Override
+  public CompletableFuture<Integer> abortUploadsUnderPath(final Path path)
+      throws IOException {
+    statistics.abortUploadsUnderPathInvoked();
+    return context.submit(new CompletableFuture<>(),
+        () ->
+            writeOperations.abortMultipartUploadsUnderPath(
+                context.pathToKey(path)));
+  }
+
+  /**
+   * Build the payload for marshalling.
+   *
+   * @param partNumber part number from response
+   * @param etag upload etag
+   * @param len length
+   * @return a byte array to marshall.
+   * @throws IOException error writing the payload
+   */
+  @VisibleForTesting
+  static byte[] buildPartHandlePayload(
+      final String path,
+      final String uploadId,
+      final int partNumber,
+      final String etag,
+      final long len)
+      throws IOException {
+
+    return new PartHandlePayload(path, uploadId, partNumber, len, etag)
+        .toBytes();
+  }
+
+  /**
+   * Parse the payload marshalled as a part handle.
+   * @param data handle data
+   * @return the length and etag
+   * @throws IOException error reading the payload
+   */
+  @VisibleForTesting
+  static PartHandlePayload parsePartHandlePayload(
+      final byte[] data)
+      throws IOException {
+
+    try (DataInputStream input =
+             new DataInputStream(new ByteArrayInputStream(data))) {
+      final String header = input.readUTF();
+      if (!HEADER.equals(header)) {
+        throw new IOException("Wrong header string: \"" + header + "\"");
+      }
+      final String path = input.readUTF();
+      final String uploadId = input.readUTF();
+      final int partNumber = input.readInt();
+      final long len = input.readLong();
+      final String etag = input.readUTF();
+      if (len < 0) {
+        throw new IOException("Negative length");
+      }
+      return new PartHandlePayload(path, uploadId, partNumber, len, etag);
+    }
+  }
+
+  /**
+   * Payload of a part handle; serializes
+   * the fields using DataInputStream and DataOutputStream.
+   */
+  @VisibleForTesting
+  static final class PartHandlePayload {
+
+    private final String path;
+
+    private final String uploadId;
+
+    private final int partNumber;
+
+    private final long len;
+
+    private final String etag;
+
+    private PartHandlePayload(
+        final String path,
+        final String uploadId,
+        final int partNumber,
+        final long len,
+        final String etag) {
+      Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
+          "Empty etag");
+      Preconditions.checkArgument(StringUtils.isNotEmpty(path),
+          "Empty path");
+      Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
+          "Empty uploadId");
+      Preconditions.checkArgument(len >= 0,
+          "Invalid length");
+
+      this.path = path;
+      this.uploadId = uploadId;
+      this.partNumber = partNumber;
+      this.len = len;
+      this.etag = etag;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public int getPartNumber() {
+      return partNumber;
+    }
+
+    public long getLen() {
+      return len;
+    }
+
+    public String getEtag() {
+      return etag;
+    }
+
+    public String getUploadId() {
+      return uploadId;
+    }
+
+    public byte[] toBytes()
+        throws IOException {
+      Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
+          "Empty etag");
+      Preconditions.checkArgument(len >= 0,
+          "Invalid length");
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      try (DataOutputStream output = new DataOutputStream(bytes)) {
+        output.writeUTF(HEADER);
+        output.writeUTF(path);
+        output.writeUTF(uploadId);
+        output.writeInt(partNumber);
+        output.writeLong(len);
+        output.writeUTF(etag);
+      }
+      return bytes.toByteArray();
+    }
+
+    public void validate(String uploadIdStr, Path filePath)
+        throws PathIOException {
+      String destUri = filePath.toUri().toString();
+      if (!destUri.equals(path)) {
+        throw new PathIOException(destUri,
+            "Multipart part path mismatch: " + path);
+      }
+      if (!uploadIdStr.equals(uploadId)) {
+        throw new PathIOException(destUri,
+            "Multipart part ID mismatch: " + uploadId);
+      }
+    }
+  }
+
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploaderBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploaderBuilder.java
new file mode 100644
index 0000000..3bf1a7d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploaderBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.MultipartUploaderBuilderImpl;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.WriteOperations;
+import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatistics;
+
+/**
+ * Builder for S3A multipart uploaders.
+ */
+public class S3AMultipartUploaderBuilder extends
+    MultipartUploaderBuilderImpl<S3AMultipartUploader, S3AMultipartUploaderBuilder> {
+
+  private final WriteOperations writeOperations;
+
+  private final StoreContext context;
+
+  private final S3AMultipartUploaderStatistics statistics;
+
+  public S3AMultipartUploaderBuilder(
+      @Nonnull final S3AFileSystem fileSystem,
+      @Nonnull final WriteOperations writeOperations,
+      @Nonnull final StoreContext context,
+      @Nonnull final Path p,
+      @Nonnull final S3AMultipartUploaderStatistics statistics) {
+    super(fileSystem, p);
+    this.writeOperations = writeOperations;
+    this.context = context;
+    this.statistics = statistics;
+  }
+
+  @Override
+  public S3AMultipartUploaderBuilder getThisBuilder() {
+    return this;
+  }
+
+  @Override
+  public S3AMultipartUploader build()
+      throws IllegalArgumentException, IOException {
+    return new S3AMultipartUploader(this, writeOperations, context, statistics);
+  }
+
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
index 88480db..e307c8d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.impl;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
 
@@ -37,6 +39,7 @@ import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 
 /**
@@ -49,9 +52,10 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
  * their own.
  *
  * <i>Warning:</i> this really is private and unstable. Do not use
- * outside the org.apache.hadoop.fs.s3a package.
+ * outside the org.apache.hadoop.fs.s3a package, or in extension points
+ * such as DelegationTokens.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("S3A Filesystem and extensions")
 @InterfaceStability.Unstable
 public class StoreContext {
 
@@ -114,8 +118,7 @@ public class StoreContext {
 
   /**
    * Instantiate.
-   * No attempt to use a builder here as outside tests
-   * this should only be created in the S3AFileSystem.
+   * @deprecated as public method: use {@link StoreContextBuilder}.
    */
   public StoreContext(
       final URI fsURI,
@@ -227,6 +230,16 @@ public class StoreContext {
   }
 
   /**
+   * Qualify a path.
+   *
+   * @param path path to qualify/normalize
+   * @return possibly new path.
+   */
+  public Path makeQualified(Path path) {
+    return contextAccessors.makeQualified(path);
+  }
+
+  /**
    * Get the storage statistics of this filesystem.
    * @return the storage statistics
    */
@@ -351,4 +364,20 @@ public class StoreContext {
         ? k + "/"
         : k;
   }
+
+  /**
+   * Submit a closure for execution in the executor
+   * returned by {@link #getExecutor()}.
+   * @param <T> type of future
+   * @param future future for the result.
+   * @param call callable to invoke.
+   * @return the future passed in
+   */
+  public <T> CompletableFuture<T> submit(
+      final CompletableFuture<T> future,
+      final Callable<T> call) {
+    getExecutor().submit(() ->
+        LambdaUtils.eval(future, call));
+    return future;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
new file mode 100644
index 0000000..a5e0dad
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.net.URI;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Builder for the store context.
+ */
+public class StoreContextBuilder {
+
+  private URI fsURI;
+
+  private String bucket;
+
+  private Configuration configuration;
+
+  private String username;
+
+  private UserGroupInformation owner;
+
+  private ListeningExecutorService executor;
+
+  private int executorCapacity;
+
+  private Invoker invoker;
+
+  private S3AInstrumentation instrumentation;
+
+  private S3AStorageStatistics storageStatistics;
+
+  private S3AInputPolicy inputPolicy = S3AInputPolicy.Normal;
+
+  private ChangeDetectionPolicy changeDetectionPolicy;
+
+  private boolean multiObjectDeleteEnabled = true;
+
+  private MetadataStore metadataStore;
+
+  private boolean useListV1 = false;
+
+  private ContextAccessors contextAccessors;
+
+  private ITtlTimeProvider timeProvider;
+
+  public StoreContextBuilder setFsURI(final URI fsURI) {
+    this.fsURI = fsURI;
+    return this;
+  }
+
+  public StoreContextBuilder setBucket(final String b) {
+    this.bucket = b;
+    return this;
+  }
+
+  public StoreContextBuilder setConfiguration(final Configuration conf) {
+    this.configuration = conf;
+    return this;
+  }
+
+  public StoreContextBuilder setUsername(final String user) {
+    this.username = user;
+    return this;
+  }
+
+  public StoreContextBuilder setOwner(final UserGroupInformation ugi) {
+    this.owner = ugi;
+    return this;
+  }
+
+  public StoreContextBuilder setExecutor(
+      final ListeningExecutorService ex) {
+    this.executor = ex;
+    return this;
+  }
+
+  public StoreContextBuilder setExecutorCapacity(
+      final int capacity) {
+    this.executorCapacity = capacity;
+    return this;
+  }
+
+  public StoreContextBuilder setInvoker(final Invoker invoke) {
+    this.invoker = invoke;
+    return this;
+  }
+
+  public StoreContextBuilder setInstrumentation(
+      final S3AInstrumentation instr) {
+    this.instrumentation = instr;
+    return this;
+  }
+
+  public StoreContextBuilder setStorageStatistics(
+      final S3AStorageStatistics sstats) {
+    this.storageStatistics = sstats;
+    return this;
+  }
+
+  public StoreContextBuilder setInputPolicy(
+      final S3AInputPolicy policy) {
+    this.inputPolicy = policy;
+    return this;
+  }
+
+  public StoreContextBuilder setChangeDetectionPolicy(
+      final ChangeDetectionPolicy policy) {
+    this.changeDetectionPolicy = policy;
+    return this;
+  }
+
+  public StoreContextBuilder setMultiObjectDeleteEnabled(
+      final boolean enabled) {
+    this.multiObjectDeleteEnabled = enabled;
+    return this;
+  }
+
+  public StoreContextBuilder setMetadataStore(
+      final MetadataStore store) {
+    this.metadataStore = store;
+    return this;
+  }
+
+  public StoreContextBuilder setUseListV1(
+      final boolean useV1) {
+    this.useListV1 = useV1;
+    return this;
+  }
+
+  public StoreContextBuilder setContextAccessors(
+      final ContextAccessors accessors) {
+    this.contextAccessors = accessors;
+    return this;
+  }
+
+  public StoreContextBuilder setTimeProvider(
+      final ITtlTimeProvider provider) {
+    this.timeProvider = provider;
+    return this;
+  }
+
+  @SuppressWarnings("deprecation")
+  public StoreContext build() {
+    return new StoreContext(fsURI,
+        bucket,
+        configuration,
+        username,
+        owner,
+        executor,
+        executorCapacity,
+        invoker,
+        instrumentation,
+        storageStatistics,
+        inputPolicy,
+        changeDetectionPolicy,
+        multiObjectDeleteEnabled,
+        metadataStore,
+        useListV1,
+        contextAccessors,
+        timeProvider);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/statistics/S3AMultipartUploaderStatistics.java
similarity index 51%
rename from hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
rename to hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/statistics/S3AMultipartUploaderStatistics.java
index e9959c1..2cd74ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/statistics/S3AMultipartUploaderStatistics.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,26 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemMultipartUploader;
-import org.apache.hadoop.fs.MultipartUploader;
-import org.apache.hadoop.fs.MultipartUploaderFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+package org.apache.hadoop.fs.s3a.impl.statistics;
+
+import java.io.Closeable;
 
 /**
- * Support for HDFS multipart uploads, built on
- * {@link FileSystem#concat(Path, Path[])}.
+ * Statistics for the S3A multipart uploader.
  */
-public class DFSMultipartUploaderFactory extends MultipartUploaderFactory {
-  protected MultipartUploader createMultipartUploader(FileSystem fs,
-      Configuration conf) {
-    if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) {
-      return new FileSystemMultipartUploader(fs);
-    }
-    return null;
-  }
+public interface S3AMultipartUploaderStatistics extends Closeable {
+
+  void instantiated();
+
+  void uploadStarted();
+
+  void partPut(long lengthInBytes);
+
+  void uploadCompleted();
+
+  void uploadAborted();
+
+  void abortUploadsUnderPathInvoked();
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/statistics/S3AMultipartUploaderStatisticsImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/statistics/S3AMultipartUploaderStatisticsImpl.java
new file mode 100644
index 0000000..70e4785
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/statistics/S3AMultipartUploaderStatisticsImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.statistics;
+
+
+import java.io.IOException;
+import java.util.function.BiConsumer;
+
+import org.apache.hadoop.fs.s3a.Statistic;
+
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_INSTANTIATED;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_PART_PUT;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_PART_PUT_BYTES;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_ABORTED;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED;
+import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_STARTED;
+
+/**
+ * Implementation of the uploader statistics.
+ * This takes a function to update some counter and will update
+ * this value when things change, so it can be bonded to arbitrary
+ * statistic collectors.
+ */
+public final class S3AMultipartUploaderStatisticsImpl implements
+    S3AMultipartUploaderStatistics {
+
+  /**
+   * The operation to increment a counter/statistic by a value.
+   */
+  private final BiConsumer<Statistic, Long> incrementCallback;
+
+  /**
+   * Constructor.
+   * @param incrementCallback  The operation to increment a
+   * counter/statistic by a value.
+   */
+  public S3AMultipartUploaderStatisticsImpl(
+      final BiConsumer<Statistic, Long> incrementCallback) {
+    this.incrementCallback = incrementCallback;
+  }
+
+  private void inc(Statistic op, long count) {
+    incrementCallback.accept(op, count);
+  }
+
+  @Override
+  public void instantiated() {
+    inc(MULTIPART_INSTANTIATED, 1);
+  }
+
+  @Override
+  public void uploadStarted() {
+    inc(MULTIPART_UPLOAD_STARTED, 1);
+  }
+
+  @Override
+  public void partPut(final long lengthInBytes) {
+    inc(MULTIPART_PART_PUT, 1);
+    inc(MULTIPART_PART_PUT_BYTES, lengthInBytes);
+  }
+
+  @Override
+  public void uploadCompleted() {
+    inc(MULTIPART_UPLOAD_COMPLETED, 1);
+  }
+
+  @Override
+  public void uploadAborted() {
+    inc(MULTIPART_UPLOAD_ABORTED, 1);
+  }
+
+  @Override
+  public void abortUploadsUnderPathInvoked() {
+    inc(MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED, 1);
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java
index fcb3dce..b4974b7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java
@@ -102,5 +102,9 @@ public class BulkOperationState implements Closeable {
      * Mkdir operation.
      */
     Mkdir,
+    /**
+     * Multipart upload operation.
+     */
+    Upload
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 38b38fb..b131320 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -912,17 +912,27 @@ public class DynamoDBMetadataStore implements MetadataStore,
       DDBPathMetadata oldEntry = ancestorState.put(path, entry);
       boolean addAncestors = true;
       if (oldEntry != null) {
-        if (!oldEntry.getFileStatus().isDirectory()
-            || !entry.getFileStatus().isDirectory()) {
-          // check for and warn if the existing bulk operation overwrote it.
-          // this should never occur outside tests explicitly creating it
+        // check for and warn if the existing bulk operation has an inconsistent
+        // entry.
+        // two directories or two files are both allowed.
+        // file-over-file can happen in multipart uploaders when the same
+        // uploader is overwriting file entries to the same destination as
+        // part of its bulk operation.
+        boolean oldWasDir = oldEntry.getFileStatus().isDirectory();
+        boolean newIsDir = entry.getFileStatus().isDirectory();
+        if ((oldWasDir && !newIsDir)
+            || (!oldWasDir && newIsDir)) {
           LOG.warn("Overwriting a S3Guard file created in the operation: {}",
               oldEntry);
           LOG.warn("With new entry: {}", entry);
           // restore the old state
           ancestorState.put(path, oldEntry);
           // then raise an exception
-          throw new PathIOException(path.toString(), E_INCONSISTENT_UPDATE);
+          throw new PathIOException(path.toString(),
+              String.format("%s old %s new %s",
+                  E_INCONSISTENT_UPDATE,
+                  oldEntry,
+                  entry));
         } else {
           // a directory is already present. Log and continue.
           LOG.debug("Directory at {} being updated with value {}",
diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader
index d16846b..68a4c79 100644
--- a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader
+++ b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.hadoop.fs.s3a.S3AMultipartUploader
+org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader
diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
deleted file mode 100644
index 2e4bc24..0000000
--- a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 059312a..8222fff 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -15,25 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.fs.contract.s3a;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.hadoop.fs.contract.s3a;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 
-import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.DEFAULT_SCALE_TESTS_ENABLED;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_HUGE_PARTITION_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_SCALE_TESTS_ENABLED;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
 import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
 
 /**
  * Test MultipartUploader with S3A.
+ * <p></p>
  * Although not an S3A Scale test subclass, it uses the -Dscale option
  * to enable it, and partition size option to control the size of
  * parts uploaded.
@@ -41,14 +44,11 @@ import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_H
 public class ITestS3AContractMultipartUploader extends
     AbstractContractMultipartUploaderTest {
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ITestS3AContractMultipartUploader.class);
-
   private int partitionSize;
 
   /**
    * S3 requires a minimum part size of 5MB (except the last part).
-   * @return 5MB
+   * @return 5MB+ value
    */
   @Override
   protected int partSizeInBytes() {
@@ -127,36 +127,14 @@ public class ITestS3AContractMultipartUploader extends
   }
 
   /**
-   * Extend superclass teardown with actions to help clean up the S3 store,
-   * including aborting uploads under the test path.
-   */
-  @Override
-  public void teardown() throws Exception {
-    Path teardown = path("teardown").getParent();
-    S3AFileSystem fs = getFileSystem();
-    if (fs != null) {
-      WriteOperationHelper helper = fs.getWriteOperationHelper();
-      try {
-        LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
-        int count = helper.abortMultipartUploadsUnderPath(
-            fs.pathToKey(teardown));
-        LOG.info("Found {} incomplete uploads", count);
-      } catch (Exception e) {
-        LOG.warn("Exeception in teardown", e);
-      }
-    }
-    super.teardown();
-  }
-
-  /**
    * S3 has no concept of directories, so this test does not apply.
    */
   public void testDirectoryInTheWay() throws Exception {
-    // no-op
+    skip("unsupported");
   }
 
   @Override
   public void testMultipartUploadReverseOrder() throws Exception {
-    ContractTestUtils.skip("skipped for speed");
+    skip("skipped for speed");
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index 244d2ee..c9d872e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -203,29 +203,31 @@ public class TestPartialDeleteFailures {
       OperationTrackingStore store) throws URISyntaxException, IOException {
     URI name = new URI("s3a://bucket");
     Configuration conf = new Configuration();
-    return new StoreContext(
-        name,
-        "bucket",
-        conf,
-        "alice",
-        UserGroupInformation.getCurrentUser(),
-        BlockingThreadPoolExecutorService.newInstance(
+    return new StoreContextBuilder().setFsURI(name)
+        .setBucket("bucket")
+        .setConfiguration(conf)
+        .setUsername("alice")
+        .setOwner(UserGroupInformation.getCurrentUser())
+        .setExecutor(BlockingThreadPoolExecutorService.newInstance(
             4,
             4,
             10, TimeUnit.SECONDS,
-            "s3a-transfer-shared"),
-        Constants.DEFAULT_EXECUTOR_CAPACITY,
-        new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT),
-        new S3AInstrumentation(name),
-        new S3AStorageStatistics(),
-        S3AInputPolicy.Normal,
-        ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
-            ChangeDetectionPolicy.Source.ETag, false),
-        multiDelete,
-        store,
-        false,
-        CONTEXT_ACCESSORS,
-        new S3Guard.TtlTimeProvider(conf));
+            "s3a-transfer-shared"))
+        .setExecutorCapacity(Constants.DEFAULT_EXECUTOR_CAPACITY)
+        .setInvoker(
+            new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
+        .setInstrumentation(new S3AInstrumentation(name))
+        .setStorageStatistics(new S3AStorageStatistics())
+        .setInputPolicy(S3AInputPolicy.Normal)
+        .setChangeDetectionPolicy(
+            ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
+                ChangeDetectionPolicy.Source.ETag, false))
+        .setMultiObjectDeleteEnabled(multiDelete)
+        .setMetadataStore(store)
+        .setUseListV1(false)
+        .setContextAccessors(CONTEXT_ACCESSORS)
+        .setTimeProvider(new S3Guard.TtlTimeProvider(conf))
+        .build();
   }
 
   private static class MinimalContextAccessor implements ContextAccessors {
@@ -251,6 +253,10 @@ public class TestPartialDeleteFailures {
       return null;
     }
 
+    @Override
+    public Path makeQualified(final Path path) {
+      return path;
+    }
   }
   /**
    * MetadataStore which tracks what is deleted and added.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java
similarity index 56%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java
index 4825d26..71305aa 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java
@@ -16,51 +16,60 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.s3a;
+package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.EOFException;
 import java.io.IOException;
 
 import org.junit.Test;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.test.HadoopTestBase;
 
-import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.*;
-import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.parsePartHandlePayload;
+import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.PartHandlePayload;
+import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.buildPartHandlePayload;
+import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.parsePartHandlePayload;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
- * Test multipart upload support methods and classes.
+ * Unit test of multipart upload support methods and classes.
  */
 public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
 
+  public static final String PATH = "s3a://bucket/path";
+
+  public static final String UPLOAD = "01";
+
   @Test
   public void testRoundTrip() throws Throwable {
-    Pair<Long, String> result = roundTrip("tag", 1);
-    assertEquals("tag", result.getRight());
-    assertEquals(1, result.getLeft().longValue());
+    PartHandlePayload result = roundTrip(999, "tag", 1);
+    assertEquals(PATH, result.getPath());
+    assertEquals(UPLOAD, result.getUploadId());
+    assertEquals(999, result.getPartNumber());
+    assertEquals("tag", result.getEtag());
+    assertEquals(1, result.getLen());
   }
 
   @Test
   public void testRoundTrip2() throws Throwable {
     long len = 1L + Integer.MAX_VALUE;
-    Pair<Long, String> result = roundTrip("11223344",
-        len);
-    assertEquals("11223344", result.getRight());
-    assertEquals(len, result.getLeft().longValue());
+    PartHandlePayload result =
+        roundTrip(1, "11223344", len);
+    assertEquals(1, result.getPartNumber());
+    assertEquals("11223344", result.getEtag());
+    assertEquals(len, result.getLen());
   }
 
   @Test
   public void testNoEtag() throws Throwable {
     intercept(IllegalArgumentException.class,
-        () -> buildPartHandlePayload("", 1));
+        () -> buildPartHandlePayload(PATH, UPLOAD,
+            0, "", 1));
   }
 
   @Test
   public void testNoLen() throws Throwable {
     intercept(IllegalArgumentException.class,
-        () -> buildPartHandlePayload("tag", -1));
+        () -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1));
   }
 
   @Test
@@ -71,14 +80,17 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
 
   @Test
   public void testBadHeader() throws Throwable {
-    byte[] bytes = buildPartHandlePayload("tag", 1);
-    bytes[2]='f';
+    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1);
+    bytes[2] = 'f';
     intercept(IOException.class, "header",
         () -> parsePartHandlePayload(bytes));
   }
 
-  private Pair<Long, String> roundTrip(final String tag, final long len) throws IOException {
-    byte[] bytes = buildPartHandlePayload(tag, len);
+  private PartHandlePayload roundTrip(
+      int partNumber,
+      String tag,
+      long len) throws IOException {
+    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len);
     return parsePartHandlePayload(bytes);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org