You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/02/04 17:10:33 UTC

[hadoop] branch branch-3.2 updated: HDFS-13713. Add specification of Multipart Upload API to FS specification, with contract tests.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new bdd17be  HDFS-13713. Add specification of Multipart Upload API to FS specification, with contract tests.
bdd17be is described below

commit bdd17be9eceb144697b9cd5bb1f785567ec633be
Author: Steve Loughran <st...@apache.org>
AuthorDate: Mon Feb 4 17:10:19 2019 +0000

    HDFS-13713. Add specification of Multipart Upload API to FS specification, with contract tests.
    
    Contributed by Ewan Higgs and Steve Loughran.
    
    (cherry picked from commit c1d24f848345f6d34a2ac2d570d49e9787a0df6a)
---
 .../hadoop/fs/FileSystemMultipartUploader.java     |  36 +-
 .../org/apache/hadoop/fs/MultipartUploader.java    |  88 +++-
 .../apache/hadoop/fs/MultipartUploaderFactory.java |   7 +
 .../src/site/markdown/filesystem/index.md          |   1 +
 .../site/markdown/filesystem/multipartuploader.md  | 235 +++++++++
 .../AbstractContractMultipartUploaderTest.java     | 565 ++++++++++++++++-----
 .../TestLocalFSContractMultipartUploader.java      |  10 +
 .../hdfs/TestHDFSContractMultipartUploader.java    |  15 +
 .../apache/hadoop/fs/s3a/S3AMultipartUploader.java |  31 +-
 .../s3a/ITestS3AContractMultipartUploader.java     |  63 ++-
 .../fs/s3a/TestS3AMultipartUploaderSupport.java    |   2 +-
 11 files changed, 873 insertions(+), 180 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
index 94c7861..b77c244 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
@@ -19,21 +19,23 @@ package org.apache.hadoop.fs;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
 
 import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 import static org.apache.hadoop.fs.Path.mergePaths;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
  * A MultipartUploader that uses the basic FileSystem commands.
@@ -70,7 +72,8 @@ public class FileSystemMultipartUploader extends MultipartUploader {
   public PartHandle putPart(Path filePath, InputStream inputStream,
       int partNumber, UploadHandle uploadId, long lengthInBytes)
       throws IOException {
-
+    checkPutArguments(filePath, inputStream, partNumber, uploadId,
+        lengthInBytes);
     byte[] uploadIdByteArray = uploadId.toByteArray();
     checkUploadId(uploadIdByteArray);
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
@@ -82,16 +85,17 @@ public class FileSystemMultipartUploader extends MultipartUploader {
             fs.createFile(partPath).build()) {
       IOUtils.copy(inputStream, fsDataOutputStream, 4096);
     } finally {
-      org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream);
+      cleanupWithLogger(LOG, inputStream);
     }
     return BBPartHandle.from(ByteBuffer.wrap(
         partPath.toString().getBytes(Charsets.UTF_8)));
   }
 
   private Path createCollectorPath(Path filePath) {
+    String uuid = UUID.randomUUID().toString();
     return mergePaths(filePath.getParent(),
         mergePaths(new Path(filePath.getName().split("\\.")[0]),
-            mergePaths(new Path("_multipart"),
+            mergePaths(new Path("_multipart_" + uuid),
                 new Path(Path.SEPARATOR))));
   }
 
@@ -110,21 +114,16 @@ public class FileSystemMultipartUploader extends MultipartUploader {
 
   @Override
   @SuppressWarnings("deprecation") // rename w/ OVERWRITE
-  public PathHandle complete(Path filePath,
-      List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
-      throws IOException {
+  public PathHandle complete(Path filePath, Map<Integer, PartHandle> handleMap,
+      UploadHandle multipartUploadId) throws IOException {
 
     checkUploadId(multipartUploadId.toByteArray());
 
-    if (handles.isEmpty()) {
-      throw new IOException("Empty upload");
-    }
-    // If destination already exists, we believe we already completed it.
-    if (fs.exists(filePath)) {
-      return getPathHandle(filePath);
-    }
+    checkPartHandles(handleMap);
+    List<Map.Entry<Integer, PartHandle>> handles =
+        new ArrayList<>(handleMap.entrySet());
+    handles.sort(Comparator.comparingInt(Map.Entry::getKey));
 
-    handles.sort(Comparator.comparing(Pair::getKey));
     List<Path> partHandles = handles
         .stream()
         .map(pair -> {
@@ -134,7 +133,10 @@ public class FileSystemMultipartUploader extends MultipartUploader {
         })
         .collect(Collectors.toList());
 
-    Path collectorPath = createCollectorPath(filePath);
+    byte[] uploadIdByteArray = multipartUploadId.toByteArray();
+    Path collectorPath = new Path(new String(uploadIdByteArray, 0,
+        uploadIdByteArray.length, Charsets.UTF_8));
+
     boolean emptyFile = totalPartsLen(partHandles) == 0;
     if (emptyFile) {
       fs.create(filePath).close();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
index b56dbf8..7ed987e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
@@ -17,40 +17,45 @@
  */
 package org.apache.hadoop.fs;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.List;
+import java.util.Map;
 
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 /**
  * MultipartUploader is an interface for copying files multipart and across
  * multiple nodes. Users should:
  * <ol>
- *   <li>Initialize an upload</li>
- *   <li>Upload parts in any order</li>
+ *   <li>Initialize an upload.</li>
+ *   <li>Upload parts in any order.</li>
  *   <li>Complete the upload in order to have it materialize in the destination
- *   FS</li>
+ *   FS.</li>
  * </ol>
- *
- * Implementers should make sure that the complete function should make sure
- * that 'complete' will reorder parts if the destination FS doesn't already
- * do it for them.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public abstract class MultipartUploader {
+public abstract class MultipartUploader implements Closeable {
   public static final Logger LOG =
       LoggerFactory.getLogger(MultipartUploader.class);
 
   /**
+   * Perform any cleanup.
+   * The upload is not required to support any operations after this.
+   * @throws IOException problems on close.
+   */
+  @Override
+  public void close() throws IOException {
+  }
+
+  /**
    * Initialize a multipart upload.
    * @param filePath Target path for upload.
    * @return unique identifier associating part uploads.
@@ -59,8 +64,8 @@ public abstract class MultipartUploader {
   public abstract UploadHandle initialize(Path filePath) throws IOException;
 
   /**
-   * Put part as part of a multipart upload. It should be possible to have
-   * parts uploaded in any order (or in parallel).
+   * Put part as part of a multipart upload.
+   * It is possible to have parts uploaded in any order (or in parallel).
    * @param filePath Target path for upload (same as {@link #initialize(Path)}).
    * @param inputStream Data for this part. Implementations MUST close this
    * stream after reading in the data.
@@ -77,15 +82,15 @@ public abstract class MultipartUploader {
   /**
    * Complete a multipart upload.
    * @param filePath Target path for upload (same as {@link #initialize(Path)}.
-   * @param handles non-empty list of identifiers with associated part numbers
+   * @param handles non-empty map of part number to part handle.
    *          from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
-   *          Depending on the backend, the list order may be significant.
    * @param multipartUploadId Identifier from {@link #initialize(Path)}.
    * @return unique PathHandle identifier for the uploaded file.
-   * @throws IOException IO failure or the handle list is empty.
+   * @throws IOException IO failure
    */
   public abstract PathHandle complete(Path filePath,
-      List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
+      Map<Integer, PartHandle> handles,
+      UploadHandle multipartUploadId)
       throws IOException;
 
   /**
@@ -98,13 +103,52 @@ public abstract class MultipartUploader {
       throws IOException;
 
   /**
-   * Utility method to validate uploadIDs
-   * @param uploadId
-   * @throws IllegalArgumentException
+   * Utility method to validate uploadIDs.
+   * @param uploadId Upload ID
+   * @throws IllegalArgumentException invalid ID
    */
   protected void checkUploadId(byte[] uploadId)
       throws IllegalArgumentException {
-    Preconditions.checkArgument(uploadId.length > 0,
+    checkArgument(uploadId != null, "null uploadId");
+    checkArgument(uploadId.length > 0,
         "Empty UploadId is not valid");
   }
+
+  /**
+   * Utility method to validate partHandles.
+   * @param partHandles handles
+   * @throws IllegalArgumentException if the parts are invalid
+   */
+  protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
+    checkArgument(!partHandles.isEmpty(),
+        "Empty upload");
+    partHandles.keySet()
+        .stream()
+        .forEach(key ->
+            checkArgument(key > 0,
+                "Invalid part handle index %s", key));
+  }
+
+  /**
+   * Check all the arguments to the
+   * {@link #putPart(Path, InputStream, int, UploadHandle, long)} operation.
+   * @param filePath Target path for upload (same as {@link #initialize(Path)}).
+   * @param inputStream Data for this part. Implementations MUST close this
+   * stream after reading in the data.
+   * @param partNumber Index of the part relative to others.
+   * @param uploadId Identifier from {@link #initialize(Path)}.
+   * @param lengthInBytes Target length to read from the stream.
+   * @throws IllegalArgumentException invalid argument
+   */
+  protected void checkPutArguments(Path filePath,
+      InputStream inputStream,
+      int partNumber,
+      UploadHandle uploadId,
+      long lengthInBytes) throws IllegalArgumentException {
+    checkArgument(filePath != null, "null filePath");
+    checkArgument(inputStream != null, "null inputStream");
+    checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
+    checkArgument(uploadId != null, "null uploadId");
+    checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
index 8b35232..e35b6bf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
@@ -52,6 +52,13 @@ public abstract class MultipartUploaderFactory {
     }
   }
 
+  /**
+   * Get the multipart loader for a specific filesystem.
+   * @param fs filesystem
+   * @param conf configuration
+   * @return an uploader, or null if one was found.
+   * @throws IOException failure during the creation process.
+   */
   public static MultipartUploader get(FileSystem fs, Configuration conf)
       throws IOException {
     MultipartUploader mpu = null;
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
index 532b6c7..6b4399e 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
@@ -36,3 +36,4 @@ HDFS as these are commonly expected by Hadoop client applications.
 1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
 2. [Testing with the Filesystem specification](testing.html)
 2. [Extending the specification and its tests](extending.html)
+1. [Uploading a file using Multiple Parts](multipartuploader.html)
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md
new file mode 100644
index 0000000..629c0c4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md
@@ -0,0 +1,235 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+
+<!--  ============================================================= -->
+<!--  CLASS: MultipartUploader -->
+<!--  ============================================================= -->
+
+# class `org.apache.hadoop.fs.MultipartUploader`
+
+<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
+
+The abstract `MultipartUploader` class is the original class to upload a file
+using multiple parts to Hadoop-supported filesystems. The benefits of a
+multipart upload is that the file can be uploaded from multiple clients or
+processes in parallel and the results will not be visible to other clients until
+the `complete` function is called.
+
+When implemented by an object store, uploaded data may incur storage charges,
+even before it is visible in the filesystems. Users of this API must be diligent
+and always perform best-effort attempts to complete or abort the upload.
+
+## Invariants
+
+All the requirements of a valid MultipartUploader are considered implicit
+econditions and postconditions:
+all operations on a valid MultipartUploader MUST result in a new
+MultipartUploader that is also valid.
+
+The operations of a single multipart upload may take place across different
+instance of a multipart uploader, across different processes and hosts.
+It is therefore a requirement that:
+
+1. All state needed to upload a part, complete an upload or abort an upload
+must be contained within or retrievable from an upload handle.
+
+1. If an upload handle is marshalled to another process, then, if the
+receiving process has the correct permissions, it may participate in the
+upload, by uploading one or more parts, by completing an upload, and/or by
+aborting the upload.
+
+## Concurrency
+
+Multiple processes may upload parts of a multipart upload simultaneously.
+
+If a call is made to `initialize(path)` to a destination where an active
+upload is in progress, implementations MUST perform one of the two operations.
+
+* Reject the call as a duplicate.
+* Permit both to proceed, with the final output of the file being
+that of _exactly one of the two uploads_.
+
+Which upload succeeds is undefined. Users must not expect consistent
+behavior across filesystems, across filesystem instances *or even
+across different requests.
+
+If a multipart upload is completed or aborted while a part upload is in progress,
+the in-progress upload, if it has not completed, must not be included in
+the final file, in whole or in part. Implementations SHOULD raise an error
+in the `putPart()` operation.
+
+## Model
+
+A File System which supports Multipart Uploads extends the existing model
+`(Directories, Files, Symlinks)` to one of `(Directories, Files, Symlinks, Uploads)`
+`Uploads` of type `Map[UploadHandle -> Map[PartHandle -> UploadPart]`.
+
+
+The `Uploads` element of the state tuple is a map of all active uploads.
+
+```python
+Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`
+```
+
+An UploadHandle is a non-empty list of bytes.
+```python
+UploadHandle: List[byte]
+len(UploadHandle) > 0
+```
+
+Clients *MUST* treat this as opaque. What is core to this features design is that the handle is valid from
+across clients: the handle may be serialized on host `hostA`, deserialized on `hostB` and still used
+to extend or complete the upload.
+
+```python
+UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])
+```
+
+Similarly, the `PartHandle` type is also a non-empty list of opaque bytes, again, marshallable between hosts.
+
+```python
+PartHandle: List[byte]
+```
+
+It is implicit that each `UploadHandle` in `FS.Uploads` is unique.
+Similarly, each `PartHandle` in the map of `[PartHandle -> UploadPart]` must also be unique.
+
+1. There is no requirement that Part Handles are unique across uploads.
+1. There is no requirement that Upload Handles are unique over time.
+However, if Part Handles are rapidly recycled, there is a risk that the nominally
+idempotent operation `abort(FS, uploadHandle)` could unintentionally cancel a
+successor operation which used the same Upload Handle.
+
+## State Changing Operations
+
+### `UploadHandle initialize(Path path)`
+
+Initialized a Multipart Upload, returning an upload handle for use in
+subsequent operations.
+
+#### Preconditions
+
+```python
+if path == "/" : raise IOException
+
+if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
+```
+
+If a filesystem does not support concurrent uploads to a destination,
+then the following precondition is added
+
+```python
+if path in values(FS.Uploads) raise PathExistsException, IOException
+
+```
+
+
+#### Postconditions
+
+The outcome of this operation is that the filesystem state is updated with a new
+active upload, with a new handle, this handle being returned to the caller.
+
+```python
+handle' = UploadHandle where not handle' in keys(FS.Uploads)
+FS' = FS where FS'.Uploads(handle') == {}
+result = handle'
+```
+
+### `PartHandle putPart(Path path, InputStream inputStream, int partNumber, UploadHandle uploadHandle, long lengthInBytes)`
+
+Upload a part for the multipart upload.
+
+#### Preconditions
+
+
+```python
+uploadHandle in keys(FS.Uploads)
+partNumber >= 1
+lengthInBytes >= 0
+len(inputStream) >= lengthInBytes
+```
+
+#### Postconditions
+
+```python
+data' = inputStream(0..lengthInBytes)
+partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts)
+FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
+result = partHandle'
+```
+
+The data is stored in the filesystem, pending completion.
+
+
+### `PathHandle complete(Path path, Map<Integer, PartHandle> parts, UploadHandle multipartUploadId)`
+
+Complete the multipart upload.
+
+A Filesystem may enforce a minimum size of each part, excluding the last part uploaded.
+
+If a part is out of this range, an `IOException` MUST be raised.
+
+#### Preconditions
+
+```python
+uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
+FS.Uploads(uploadHandle).path == path
+if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
+parts.size() > 0
+```
+
+If there are handles in the MPU which aren't included in the map, then the omitted
+parts will not be a part of the resulting file. It is up to the implementation
+of the MultipartUploader to make sure the leftover parts are cleaned up.
+
+In the case of backing stores that support directories (local filesystem, HDFS,
+etc), if, at the point of completion, there is now a directory at the
+destination then a `PathIsDirectoryException` or other `IOException` must be thrown.
+
+#### Postconditions
+
+```python
+UploadData' == ordered concatention of all data in the map of parts, ordered by key
+exists(FS', path') and result = PathHandle(path')
+FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
+```
+
+The PathHandle is returned by the complete operation so subsequent operations
+will be able to identify that the data has not changed in the meantime.
+
+The order of parts in the uploaded by file is that of the natural order of
+parts: part 1 is ahead of part 2, etc.
+
+
+### `void abort(Path path, UploadHandle multipartUploadId)`
+
+Abort a multipart upload. The handle becomes invalid and not subject to reuse.
+
+#### Preconditions
+
+
+```python
+uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
+```
+
+#### Postconditions
+
+The upload handle is no longer known.
+
+```python
+FS' = FS where not uploadHandle in keys(FS'.uploads)
+```
+A subsequent call to `abort()` with the same handle will fail, unless
+the handle has been recycled.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 7cee5a6..7a8f083 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -23,15 +23,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
 
 import com.google.common.base.Charsets;
+import org.junit.Assume;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BBUploadHandle;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,11 +47,64 @@ import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.UploadHandle;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 public abstract class AbstractContractMultipartUploaderTest extends
     AbstractFSContractTestBase {
 
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
+
+  /**
+   * Size of very small uploads.
+   * Enough to be non empty, not big enough to cause delays on uploads.
+   */
+  protected static final int SMALL_FILE = 100;
+
+  private MultipartUploader mpu;
+  private MultipartUploader mpu2;
+  private final Random random = new Random();
+  private UploadHandle activeUpload;
+  private Path activeUploadPath;
+
+  protected String getMethodName() {
+    return methodName.getMethodName();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    Configuration conf = getContract().getConf();
+    mpu = MultipartUploaderFactory.get(getFileSystem(), conf);
+    mpu2 = MultipartUploaderFactory.get(getFileSystem(), conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    if (mpu!= null && activeUpload != null) {
+      try {
+        mpu.abort(activeUploadPath, activeUpload);
+      } catch (FileNotFoundException ignored) {
+        /* this is fine */
+      } catch (Exception e) {
+        LOG.info("in teardown", e);
+      }
+    }
+    cleanupWithLogger(LOG, mpu, mpu2);
+    super.teardown();
+  }
+
+  /**
+   * Get a test path based on the method name.
+   * @return a path to use in the test
+   * @throws IOException failure to build the path name up.
+   */
+  protected Path methodPath() throws IOException {
+    return path(getMethodName());
+  }
+
   /**
    * The payload is the part number repeated for the length of the part.
    * This makes checking the correctness of the upload straightforward.
@@ -55,9 +112,19 @@ public abstract class AbstractContractMultipartUploaderTest extends
    * @return the bytes to upload.
    */
   private byte[] generatePayload(int partNumber) {
-    int sizeInBytes = partSizeInBytes();
-    ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
-    for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) {
+    return generatePayload(partNumber, partSizeInBytes());
+  }
+
+  /**
+   * Generate a payload of a given size; part number is used
+   * for all the values.
+   * @param partNumber part number
+   * @param size size in bytes
+   * @return the bytes to upload.
+   */
+  private byte[] generatePayload(final int partNumber, final int size) {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    for (int i=0; i < size /(Integer.SIZE/Byte.SIZE); ++i) {
       buffer.putInt(partNumber);
     }
     return buffer.array();
@@ -70,11 +137,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
    * @throws IOException failure to read or digest the file.
    */
   protected byte[] digest(Path path) throws IOException {
-    FileSystem fs = getFileSystem();
-    try (InputStream in = fs.open(path)) {
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    try (InputStream in = getFileSystem().open(path)) {
       byte[] fdData = IOUtils.toByteArray(in);
       MessageDigest newDigest = DigestUtils.getMd5Digest();
-      return newDigest.digest(fdData);
+      byte[] digest = newDigest.digest(fdData);
+      return digest;
+    } finally {
+      timer.end("Download and digest of path %s", path);
     }
   }
 
@@ -93,74 +163,230 @@ public abstract class AbstractContractMultipartUploaderTest extends
   }
 
   /**
+   * How long in milliseconds for propagation of
+   * store changes, including update/delete/list
+   * to be everywhere.
+   * If 0: the FS is consistent.
+   * @return a time in milliseconds.
+   */
+  protected int timeToBecomeConsistentMillis() {
+    return 0;
+  }
+
+  /**
+   * Does a call to finalize an upload (either complete or abort) consume the
+   * uploadID immediately or is it reaped at a later point in time?
+   * @return true if the uploadID will be consumed immediately (and no longer
+   * resuable).
+   */
+  protected abstract boolean finalizeConsumesUploadIdImmediately();
+
+  /**
+   * Does the store support concurrent uploads to the same destination path?
+   * @return true if concurrent uploads are supported.
+   */
+  protected abstract boolean supportsConcurrentUploadsToSamePath();
+
+  /**
+   * Pick a multipart uploader from the index value.
+   * @param index index of upload
+   * @return an uploader
+   */
+  protected MultipartUploader mpu(int index) {
+    return (index % 2 == 0) ? mpu : mpu2;
+  }
+
+  /**
+   * Pick a multipart uploader at random.
+   * @return an uploader
+   */
+  protected MultipartUploader randomMpu() {
+    return mpu(random.nextInt(10));
+  }
+
+  /**
    * Assert that a multipart upload is successful.
    * @throws Exception failure
    */
   @Test
   public void testSingleUpload() throws Exception {
-    FileSystem fs = getFileSystem();
-    Path file = path("testSingleUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    Path file = methodPath();
+    UploadHandle uploadHandle = initializeUpload(file);
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
     MessageDigest origDigest = DigestUtils.getMd5Digest();
-    byte[] payload = generatePayload(1);
+    int size = SMALL_FILE;
+    byte[] payload = generatePayload(1, size);
     origDigest.update(payload);
-    InputStream is = new ByteArrayInputStream(payload);
-    PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle,
-        payload.length);
-    partHandles.add(Pair.of(1, partHandle));
-    PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles,
+    PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
+    partHandles.put(1, partHandle);
+    PathHandle fd = completeUpload(file, uploadHandle, partHandles,
         origDigest,
-        payload.length);
+        size);
+
+    if (finalizeConsumesUploadIdImmediately()) {
+      intercept(FileNotFoundException.class,
+          () -> mpu.complete(file, partHandles, uploadHandle));
+    } else {
+      PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
+      assertArrayEquals("Path handles differ", fd.toByteArray(),
+          fd2.toByteArray());
+    }
+  }
+
+  /**
+   * Initialize an upload.
+   * This saves the path and upload handle as the active
+   * upload, for aborting in teardown
+   * @param dest destination
+   * @return the handle
+   * @throws IOException failure to initialize
+   */
+  protected UploadHandle initializeUpload(final Path dest) throws IOException {
+    activeUploadPath = dest;
+    activeUpload = randomMpu().initialize(dest);
+    return activeUpload;
+  }
 
-    // Complete is idempotent
-    PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
-    assertArrayEquals("Path handles differ", fd.toByteArray(),
-        fd2.toByteArray());
+  /**
+   * Generate then upload a part.
+   * @param file destination
+   * @param uploadHandle handle
+   * @param index index of part
+   * @param origDigest digest to build up. May be null
+   * @return the part handle
+   * @throws IOException IO failure.
+   */
+  protected PartHandle buildAndPutPart(
+      final Path file,
+      final UploadHandle uploadHandle,
+      final int index,
+      final MessageDigest origDigest) throws IOException {
+    byte[] payload = generatePayload(index);
+    if (origDigest != null) {
+      origDigest.update(payload);
+    }
+    return putPart(file, uploadHandle, index, payload);
   }
 
+  /**
+   * Put a part.
+   * The entire byte array is uploaded.
+   * @param file destination
+   * @param uploadHandle handle
+   * @param index index of part
+   * @param payload byte array of payload
+   * @return the part handle
+   * @throws IOException IO failure.
+   */
+  protected PartHandle putPart(final Path file,
+      final UploadHandle uploadHandle,
+      final int index,
+      final byte[] payload) throws IOException {
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    PartHandle partHandle = mpu(index)
+        .putPart(file,
+            new ByteArrayInputStream(payload),
+            index,
+            uploadHandle,
+            payload.length);
+    timer.end("Uploaded part %s", index);
+    LOG.info("Upload bandwidth {} MB/s",
+        timer.bandwidthDescription(payload.length));
+    return partHandle;
+  }
+
+  /**
+   * Complete an upload with the active MPU instance.
+   * @param file destination
+   * @param uploadHandle handle
+   * @param partHandles map of handles
+   * @param origDigest digest of source data (may be null)
+   * @param expectedLength expected length of result.
+   * @return the path handle from the upload.
+   * @throws IOException IO failure
+   */
   private PathHandle completeUpload(final Path file,
-      final MultipartUploader mpu,
       final UploadHandle uploadHandle,
-      final List<Pair<Integer, PartHandle>> partHandles,
+      final Map<Integer, PartHandle> partHandles,
       final MessageDigest origDigest,
       final int expectedLength) throws IOException {
-    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+    PathHandle fd = complete(file, uploadHandle, partHandles);
 
     FileStatus status = verifyPathExists(getFileSystem(),
         "Completed file", file);
     assertEquals("length of " + status,
         expectedLength, status.getLen());
 
+    if (origDigest != null) {
+      verifyContents(file, origDigest, expectedLength);
+    }
+    return fd;
+  }
+
+  /**
+   * Verify the contents of a file.
+   * @param file path
+   * @param origDigest digest
+   * @param expectedLength expected length (for logging B/W)
+   * @throws IOException IO failure
+   */
+  protected void verifyContents(final Path file,
+                                final MessageDigest origDigest,
+                                final int expectedLength) throws IOException {
+    ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
     assertArrayEquals("digest of source and " + file
             + " differ",
         origDigest.digest(), digest(file));
+    timer2.end("Completed digest", file);
+    LOG.info("Download bandwidth {} MB/s",
+        timer2.bandwidthDescription(expectedLength));
+  }
+
+  /**
+   * Perform the inner complete without verification.
+   * @param file destination path
+   * @param uploadHandle upload handle
+   * @param partHandles map of parts
+   * @return the path handle from the upload.
+   * @throws IOException IO failure
+   */
+  private PathHandle complete(final Path file,
+      final UploadHandle uploadHandle,
+      final Map<Integer, PartHandle> partHandles) throws IOException {
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    PathHandle fd = randomMpu().complete(file, partHandles, uploadHandle);
+    timer.end("Completed upload to %s", file);
     return fd;
   }
 
   /**
+   * Abort an upload.
+   * @param file path
+   * @param uploadHandle handle
+   * @throws IOException failure
+   */
+  private void abortUpload(final Path file, UploadHandle uploadHandle)
+      throws IOException {
+    randomMpu().abort(file, uploadHandle);
+  }
+
+  /**
    * Assert that a multipart upload is successful.
    * @throws Exception failure
    */
   @Test
   public void testMultipartUpload() throws Exception {
-    FileSystem fs = getFileSystem();
-    Path file = path("testMultipartUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    Path file = methodPath();
+    UploadHandle uploadHandle = initializeUpload(file);
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
     MessageDigest origDigest = DigestUtils.getMd5Digest();
     final int payloadCount = getTestPayloadCount();
     for (int i = 1; i <= payloadCount; ++i) {
-      byte[] payload = generatePayload(i);
-      origDigest.update(payload);
-      InputStream is = new ByteArrayInputStream(payload);
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
-          payload.length);
-      partHandles.add(Pair.of(i, partHandle));
+      PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
+          origDigest);
+      partHandles.put(i, partHandle);
     }
-    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+    completeUpload(file, uploadHandle, partHandles, origDigest,
         payloadCount * partSizeInBytes());
   }
 
@@ -173,17 +399,33 @@ public abstract class AbstractContractMultipartUploaderTest extends
   public void testMultipartUploadEmptyPart() throws Exception {
     FileSystem fs = getFileSystem();
     Path file = path("testMultipartUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    MessageDigest origDigest = DigestUtils.getMd5Digest();
-    byte[] payload = new byte[0];
-    origDigest.update(payload);
-    InputStream is = new ByteArrayInputStream(payload);
-    PartHandle partHandle = mpu.putPart(file, is, 0, uploadHandle,
-        payload.length);
-      partHandles.add(Pair.of(0, partHandle));
-    completeUpload(file, mpu, uploadHandle, partHandles, origDigest, 0);
+    try (MultipartUploader uploader =
+             MultipartUploaderFactory.get(fs, null)) {
+      UploadHandle uploadHandle = uploader.initialize(file);
+
+      Map<Integer, PartHandle> partHandles = new HashMap<>();
+      MessageDigest origDigest = DigestUtils.getMd5Digest();
+      byte[] payload = new byte[0];
+      origDigest.update(payload);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = uploader.putPart(file, is, 1, uploadHandle,
+          payload.length);
+      partHandles.put(1, partHandle);
+      completeUpload(file, uploadHandle, partHandles, origDigest, 0);
+    }
+  }
+
+  /**
+   * Assert that a multipart upload is successful.
+   * @throws Exception failure
+   */
+  @Test
+  public void testUploadEmptyBlock() throws Exception {
+    Path file = methodPath();
+    UploadHandle uploadHandle = initializeUpload(file);
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
+    partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
+    completeUpload(file, uploadHandle, partHandles, null, 0);
   }
 
   /**
@@ -192,11 +434,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
    */
   @Test
   public void testMultipartUploadReverseOrder() throws Exception {
-    FileSystem fs = getFileSystem();
-    Path file = path("testMultipartUploadReverseOrder");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    Path file = methodPath();
+    UploadHandle uploadHandle = initializeUpload(file);
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
     MessageDigest origDigest = DigestUtils.getMd5Digest();
     final int payloadCount = getTestPayloadCount();
     for (int i = 1; i <= payloadCount; ++i) {
@@ -204,13 +444,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
       origDigest.update(payload);
     }
     for (int i = payloadCount; i > 0; --i) {
-      byte[] payload = generatePayload(i);
-      InputStream is = new ByteArrayInputStream(payload);
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
-          payload.length);
-      partHandles.add(Pair.of(i, partHandle));
+      partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
     }
-    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+    completeUpload(file, uploadHandle, partHandles, origDigest,
         payloadCount * partSizeInBytes());
   }
 
@@ -222,25 +458,19 @@ public abstract class AbstractContractMultipartUploaderTest extends
   public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
       throws Exception {
     describe("Upload in reverse order and the part numbers are not contiguous");
-    FileSystem fs = getFileSystem();
-    Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    Path file = methodPath();
+    UploadHandle uploadHandle = initializeUpload(file);
     MessageDigest origDigest = DigestUtils.getMd5Digest();
     int payloadCount = 2 * getTestPayloadCount();
     for (int i = 2; i <= payloadCount; i += 2) {
       byte[] payload = generatePayload(i);
       origDigest.update(payload);
     }
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
     for (int i = payloadCount; i > 0; i -= 2) {
-      byte[] payload = generatePayload(i);
-      InputStream is = new ByteArrayInputStream(payload);
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
-          payload.length);
-      partHandles.add(Pair.of(i, partHandle));
+      partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
     }
-    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+    completeUpload(file, uploadHandle, partHandles, origDigest,
         getTestPayloadCount() * partSizeInBytes());
   }
 
@@ -251,19 +481,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
   @Test
   public void testMultipartUploadAbort() throws Exception {
     describe("Upload and then abort it before completing");
-    FileSystem fs = getFileSystem();
-    Path file = path("testMultipartUploadAbort");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    for (int i = 20; i >= 10; --i) {
-      byte[] payload = generatePayload(i);
-      InputStream is = new ByteArrayInputStream(payload);
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
-          payload.length);
-      partHandles.add(Pair.of(i, partHandle));
+    Path file = methodPath();
+    UploadHandle uploadHandle = initializeUpload(file);
+    int end = 10;
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
+    for (int i = 12; i > 10; i--) {
+      partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
     }
-    mpu.abort(file, uploadHandle);
+    abortUpload(file, uploadHandle);
 
     String contents = "ThisIsPart49\n";
     int len = contents.getBytes(Charsets.UTF_8).length;
@@ -275,6 +500,15 @@ public abstract class AbstractContractMultipartUploaderTest extends
         () -> mpu.complete(file, partHandles, uploadHandle));
 
     assertPathDoesNotExist("Uploaded file should not exist", file);
+
+    // A second abort should be an FileNotFoundException if the UploadHandle is
+    // consumed by finalization operations (complete, abort).
+    if (finalizeConsumesUploadIdImmediately()) {
+      intercept(FileNotFoundException.class,
+          () -> abortUpload(file, uploadHandle));
+    } else {
+      abortUpload(file, uploadHandle);
+    }
   }
 
   /**
@@ -282,13 +516,23 @@ public abstract class AbstractContractMultipartUploaderTest extends
    */
   @Test
   public void testAbortUnknownUpload() throws Exception {
-    FileSystem fs = getFileSystem();
-    Path file = path("testAbortUnknownUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    Path file = methodPath();
     ByteBuffer byteBuffer = ByteBuffer.wrap(
         "invalid-handle".getBytes(Charsets.UTF_8));
     UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
-    intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle));
+    intercept(FileNotFoundException.class,
+        () -> abortUpload(file, uploadHandle));
+  }
+
+  /**
+   * Trying to abort with a handle of size 0 must fail.
+   */
+  @Test
+  public void testAbortEmptyUpload() throws Exception {
+    describe("initialize upload and abort before uploading data");
+    Path file = methodPath();
+    abortUpload(file, initializeUpload(file));
+    assertPathDoesNotExist("Uploaded file should not exist", file);
   }
 
   /**
@@ -296,13 +540,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
    */
   @Test
   public void testAbortEmptyUploadHandle() throws Exception {
-    FileSystem fs = getFileSystem();
-    Path file = path("testAbortEmptyUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
     UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
     intercept(IllegalArgumentException.class,
-        () -> mpu.abort(file, uploadHandle));
+        () -> abortUpload(methodPath(), uploadHandle));
   }
 
   /**
@@ -311,26 +552,20 @@ public abstract class AbstractContractMultipartUploaderTest extends
   @Test
   public void testCompleteEmptyUpload() throws Exception {
     describe("Expect an empty MPU to fail, but still be abortable");
-    FileSystem fs = getFileSystem();
-    Path dest = path("testCompleteEmptyUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle handle = mpu.initialize(dest);
-    intercept(IOException.class,
-        () -> mpu.complete(dest, new ArrayList<>(), handle));
-    mpu.abort(dest, handle);
+    Path dest = methodPath();
+    UploadHandle handle = initializeUpload(dest);
+    intercept(IllegalArgumentException.class,
+        () -> mpu.complete(dest, new HashMap<>(), handle));
+    abortUpload(dest, handle);
   }
 
   /**
    * When we pass empty uploadID, putPart throws IllegalArgumentException.
-   * @throws Exception
    */
   @Test
   public void testPutPartEmptyUploadID() throws Exception {
     describe("Expect IllegalArgumentException when putPart uploadID is empty");
-    FileSystem fs = getFileSystem();
-    Path dest = path("testCompleteEmptyUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    mpu.initialize(dest);
+    Path dest = methodPath();
     UploadHandle emptyHandle =
         BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
     byte[] payload = generatePayload(1);
@@ -341,25 +576,123 @@ public abstract class AbstractContractMultipartUploaderTest extends
 
   /**
    * When we pass empty uploadID, complete throws IllegalArgumentException.
-   * @throws Exception
    */
   @Test
   public void testCompleteEmptyUploadID() throws Exception {
     describe("Expect IllegalArgumentException when complete uploadID is empty");
-    FileSystem fs = getFileSystem();
-    Path dest = path("testCompleteEmptyUpload");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle realHandle = mpu.initialize(dest);
+    Path dest = methodPath();
+    UploadHandle realHandle = initializeUpload(dest);
     UploadHandle emptyHandle =
         BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    byte[] payload = generatePayload(1);
-    InputStream is = new ByteArrayInputStream(payload);
-    PartHandle partHandle = mpu.putPart(dest, is, 1, realHandle,
-        payload.length);
-    partHandles.add(Pair.of(1, partHandle));
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
+    PartHandle partHandle = putPart(dest, realHandle, 1,
+        generatePayload(1, SMALL_FILE));
+    partHandles.put(1, partHandle);
 
     intercept(IllegalArgumentException.class,
         () -> mpu.complete(dest, partHandles, emptyHandle));
+
+    // and, while things are setup, attempt to complete with
+    // a part index of 0
+    partHandles.clear();
+    partHandles.put(0, partHandle);
+    intercept(IllegalArgumentException.class,
+        () -> mpu.complete(dest, partHandles, realHandle));
+  }
+
+  /**
+   * Assert that upon completion, a directory in the way of the file will
+   * result in a failure. This test only applies to backing stores with a
+   * concept of directories.
+   * @throws Exception failure
+   */
+  @Test
+  public void testDirectoryInTheWay() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = methodPath();
+    UploadHandle uploadHandle = initializeUpload(file);
+    Map<Integer, PartHandle> partHandles = new HashMap<>();
+    int size = SMALL_FILE;
+    PartHandle partHandle = putPart(file, uploadHandle, 1,
+        generatePayload(1, size));
+    partHandles.put(1, partHandle);
+
+    fs.mkdirs(file);
+    intercept(IOException.class,
+        () -> completeUpload(file, uploadHandle, partHandles, null,
+            size));
+    // abort should still work
+    abortUpload(file, uploadHandle);
+  }
+
+  @Test
+  public void testConcurrentUploads() throws Throwable {
+
+    // if the FS doesn't support concurrent uploads, this test is
+    // required to fail during the second initialization.
+    final boolean concurrent = supportsConcurrentUploadsToSamePath();
+
+    describe("testing concurrent uploads, MPU support for this is "
+        + concurrent);
+    final FileSystem fs = getFileSystem();
+    final Path file = methodPath();
+    final int size1 = SMALL_FILE;
+    final int partId1 = 1;
+    final byte[] payload1 = generatePayload(partId1, size1);
+    final MessageDigest digest1 = DigestUtils.getMd5Digest();
+    digest1.update(payload1);
+    final UploadHandle upload1 = initializeUpload(file);
+    final Map<Integer, PartHandle> partHandles1 = new HashMap<>();
+
+    // initiate part 2
+    // by using a different size, it's straightforward to see which
+    // version is visible, before reading/digesting the contents
+    final int size2 = size1 * 2;
+    final int partId2 = 2;
+    final byte[] payload2 = generatePayload(partId1, size2);
+    final MessageDigest digest2 = DigestUtils.getMd5Digest();
+    digest2.update(payload2);
+
+    final UploadHandle upload2;
+    try {
+      upload2 = initializeUpload(file);
+      Assume.assumeTrue(
+          "The Filesystem is unexpectedly supporting concurrent uploads",
+          concurrent);
+    } catch (IOException e) {
+      if (!concurrent) {
+        // this is expected, so end the test
+        LOG.debug("Expected exception raised on concurrent uploads {}", e);
+        return;
+      } else {
+        throw e;
+      }
+    }
+    final Map<Integer, PartHandle> partHandles2 = new HashMap<>();
+
+
+    assertNotEquals("Upload handles match", upload1, upload2);
+
+    // put part 1
+    partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
+
+    // put part2
+    partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
+
+    // complete part u1. expect its size and digest to
+    // be as expected.
+    completeUpload(file, upload1, partHandles1, digest1, size1);
+
+    // now upload part 2.
+    complete(file, upload2, partHandles2);
+    // and await the visible length to match
+    eventually(timeToBecomeConsistentMillis(), 500,
+        () -> {
+          FileStatus status = fs.getFileStatus(file);
+          assertEquals("File length in " + status,
+              size2, status.getLen());
+        });
+
+    verifyContents(file, digest2, size2);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
index a50d2e4..6e27964d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
@@ -40,4 +40,14 @@ public class TestLocalFSContractMultipartUploader
   protected int partSizeInBytes() {
     return 1024;
   }
+
+  @Override
+  protected boolean finalizeConsumesUploadIdImmediately() {
+    return true;
+  }
+
+  @Override
+  protected boolean supportsConcurrentUploadsToSamePath() {
+    return true;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
index f3a5265..54f4ed2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
@@ -32,6 +34,9 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 public class TestHDFSContractMultipartUploader extends
     AbstractContractMultipartUploaderTest {
 
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
+
   @BeforeClass
   public static void createCluster() throws IOException {
     HDFSContract.createCluster();
@@ -55,4 +60,14 @@ public class TestHDFSContractMultipartUploader extends
   protected int partSizeInBytes() {
     return 1024;
   }
+
+  @Override
+  protected boolean finalizeConsumesUploadIdImmediately() {
+    return true;
+  }
+
+  @Override
+  protected boolean supportsConcurrentUploadsToSamePath() {
+    return true;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
index cab4e2a..cf58751 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -68,10 +70,8 @@ public class S3AMultipartUploader extends MultipartUploader {
   public static final String HEADER = "S3A-part01";
 
   public S3AMultipartUploader(FileSystem fs, Configuration conf) {
-    if (!(fs instanceof S3AFileSystem)) {
-      throw new IllegalArgumentException(
-          "S3A MultipartUploads must use S3AFileSystem");
-    }
+    Preconditions.checkArgument(fs instanceof S3AFileSystem,
+        "Wrong filesystem: expected S3A but got %s", fs);
     s3a = (S3AFileSystem) fs;
   }
 
@@ -88,6 +88,8 @@ public class S3AMultipartUploader extends MultipartUploader {
   public PartHandle putPart(Path filePath, InputStream inputStream,
       int partNumber, UploadHandle uploadId, long lengthInBytes)
       throws IOException {
+    checkPutArguments(filePath, inputStream, partNumber, uploadId,
+        lengthInBytes);
     byte[] uploadIdBytes = uploadId.toByteArray();
     checkUploadId(uploadIdBytes);
     String key = s3a.pathToKey(filePath);
@@ -105,14 +107,16 @@ public class S3AMultipartUploader extends MultipartUploader {
 
   @Override
   public PathHandle complete(Path filePath,
-      List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId)
+      Map<Integer, PartHandle> handleMap,
+      UploadHandle uploadId)
       throws IOException {
     byte[] uploadIdBytes = uploadId.toByteArray();
     checkUploadId(uploadIdBytes);
-    if (handles.isEmpty()) {
-      throw new IOException("Empty upload");
-    }
 
+    checkPartHandles(handleMap);
+    List<Map.Entry<Integer, PartHandle>> handles =
+        new ArrayList<>(handleMap.entrySet());
+    handles.sort(Comparator.comparingInt(Map.Entry::getKey));
     final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
     String key = s3a.pathToKey(filePath);
 
@@ -121,11 +125,11 @@ public class S3AMultipartUploader extends MultipartUploader {
     ArrayList<PartETag> eTags = new ArrayList<>();
     eTags.ensureCapacity(handles.size());
     long totalLength = 0;
-    for (Pair<Integer, PartHandle> handle : handles) {
-      byte[] payload = handle.getRight().toByteArray();
+    for (Map.Entry<Integer, PartHandle> handle : handles) {
+      byte[] payload = handle.getValue().toByteArray();
       Pair<Long, String> result = parsePartHandlePayload(payload);
       totalLength += result.getLeft();
-      eTags.add(new PartETag(handle.getLeft(), result.getRight()));
+      eTags.add(new PartETag(handle.getKey(), result.getRight()));
     }
     AtomicInteger errorCount = new AtomicInteger(0);
     CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
@@ -172,7 +176,7 @@ public class S3AMultipartUploader extends MultipartUploader {
       throws IOException {
     Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
         "Empty etag");
-    Preconditions.checkArgument(len > 0,
+    Preconditions.checkArgument(len >= 0,
         "Invalid length");
 
     ByteArrayOutputStream bytes = new ByteArrayOutputStream();
@@ -190,6 +194,7 @@ public class S3AMultipartUploader extends MultipartUploader {
    * @return the length and etag
    * @throws IOException error reading the payload
    */
+  @VisibleForTesting
   static Pair<Long, String> parsePartHandlePayload(byte[] data)
       throws IOException {
 
@@ -201,7 +206,7 @@ public class S3AMultipartUploader extends MultipartUploader {
       }
       final long len = input.readLong();
       final String etag = input.readUTF();
-      if (len <= 0) {
+      if (len < 0) {
         throw new IOException("Negative length");
       }
       return Pair.of(len, etag);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 6514ea3..059312a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.fs.contract.s3a;
 
-import java.io.IOException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 
@@ -35,6 +34,9 @@ import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_H
 
 /**
  * Test MultipartUploader with S3A.
+ * Although not an S3A Scale test subclass, it uses the -Dscale option
+ * to enable it, and partition size option to control the size of
+ * parts uploaded.
  */
 public class ITestS3AContractMultipartUploader extends
     AbstractContractMultipartUploaderTest {
@@ -79,6 +81,35 @@ public class ITestS3AContractMultipartUploader extends
     return new S3AContract(conf);
   }
 
+  /**
+   * Bigger test: use the scale timeout.
+   * @return the timeout for scale tests.
+   */
+  @Override
+  protected int getTestTimeoutMillis() {
+    return SCALE_TEST_TIMEOUT_MILLIS;
+  }
+
+
+  @Override
+  protected boolean supportsConcurrentUploadsToSamePath() {
+    return true;
+  }
+
+  /**
+   * Provide a pessimistic time to become consistent.
+   * @return a time in milliseconds
+   */
+  @Override
+  protected int timeToBecomeConsistentMillis() {
+    return 30 * 1000;
+  }
+
+  @Override
+  protected boolean finalizeConsumesUploadIdImmediately() {
+    return false;
+  }
+
   @Override
   public void setup() throws Exception {
     super.setup();
@@ -103,19 +134,29 @@ public class ITestS3AContractMultipartUploader extends
   public void teardown() throws Exception {
     Path teardown = path("teardown").getParent();
     S3AFileSystem fs = getFileSystem();
-    WriteOperationHelper helper = fs.getWriteOperationHelper();
-    try {
-      LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
-      int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown));
-      LOG.info("Found {} incomplete uploads", count);
-    } catch (IOException e) {
-      LOG.warn("IOE in teardown", e);
+    if (fs != null) {
+      WriteOperationHelper helper = fs.getWriteOperationHelper();
+      try {
+        LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
+        int count = helper.abortMultipartUploadsUnderPath(
+            fs.pathToKey(teardown));
+        LOG.info("Found {} incomplete uploads", count);
+      } catch (Exception e) {
+        LOG.warn("Exeception in teardown", e);
+      }
     }
     super.teardown();
   }
 
+  /**
+   * S3 has no concept of directories, so this test does not apply.
+   */
+  public void testDirectoryInTheWay() throws Exception {
+    // no-op
+  }
+
   @Override
-  public void testMultipartUploadEmptyPart() throws Exception {
-    // ignore the test in the base class.
+  public void testMultipartUploadReverseOrder() throws Exception {
+    ContractTestUtils.skip("skipped for speed");
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
index 35d0460..4825d26 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
@@ -60,7 +60,7 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
   @Test
   public void testNoLen() throws Throwable {
     intercept(IllegalArgumentException.class,
-        () -> buildPartHandlePayload("tag", 0));
+        () -> buildPartHandlePayload("tag", -1));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org