You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/02/04 17:10:33 UTC
[hadoop] branch branch-3.2 updated: HDFS-13713. Add specification
of Multipart Upload API to FS specification, with contract tests.
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new bdd17be HDFS-13713. Add specification of Multipart Upload API to FS specification, with contract tests.
bdd17be is described below
commit bdd17be9eceb144697b9cd5bb1f785567ec633be
Author: Steve Loughran <st...@apache.org>
AuthorDate: Mon Feb 4 17:10:19 2019 +0000
HDFS-13713. Add specification of Multipart Upload API to FS specification, with contract tests.
Contributed by Ewan Higgs and Steve Loughran.
(cherry picked from commit c1d24f848345f6d34a2ac2d570d49e9787a0df6a)
---
.../hadoop/fs/FileSystemMultipartUploader.java | 36 +-
.../org/apache/hadoop/fs/MultipartUploader.java | 88 +++-
.../apache/hadoop/fs/MultipartUploaderFactory.java | 7 +
.../src/site/markdown/filesystem/index.md | 1 +
.../site/markdown/filesystem/multipartuploader.md | 235 +++++++++
.../AbstractContractMultipartUploaderTest.java | 565 ++++++++++++++++-----
.../TestLocalFSContractMultipartUploader.java | 10 +
.../hdfs/TestHDFSContractMultipartUploader.java | 15 +
.../apache/hadoop/fs/s3a/S3AMultipartUploader.java | 31 +-
.../s3a/ITestS3AContractMultipartUploader.java | 63 ++-
.../fs/s3a/TestS3AMultipartUploaderSupport.java | 2 +-
11 files changed, 873 insertions(+), 180 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
index 94c7861..b77c244 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
@@ -19,21 +19,23 @@ package org.apache.hadoop.fs;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.stream.Collectors;
import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.fs.Path.mergePaths;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
* A MultipartUploader that uses the basic FileSystem commands.
@@ -70,7 +72,8 @@ public class FileSystemMultipartUploader extends MultipartUploader {
public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException {
-
+ checkPutArguments(filePath, inputStream, partNumber, uploadId,
+ lengthInBytes);
byte[] uploadIdByteArray = uploadId.toByteArray();
checkUploadId(uploadIdByteArray);
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
@@ -82,16 +85,17 @@ public class FileSystemMultipartUploader extends MultipartUploader {
fs.createFile(partPath).build()) {
IOUtils.copy(inputStream, fsDataOutputStream, 4096);
} finally {
- org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream);
+ cleanupWithLogger(LOG, inputStream);
}
return BBPartHandle.from(ByteBuffer.wrap(
partPath.toString().getBytes(Charsets.UTF_8)));
}
private Path createCollectorPath(Path filePath) {
+ String uuid = UUID.randomUUID().toString();
return mergePaths(filePath.getParent(),
mergePaths(new Path(filePath.getName().split("\\.")[0]),
- mergePaths(new Path("_multipart"),
+ mergePaths(new Path("_multipart_" + uuid),
new Path(Path.SEPARATOR))));
}
@@ -110,21 +114,16 @@ public class FileSystemMultipartUploader extends MultipartUploader {
@Override
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
- public PathHandle complete(Path filePath,
- List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
- throws IOException {
+ public PathHandle complete(Path filePath, Map<Integer, PartHandle> handleMap,
+ UploadHandle multipartUploadId) throws IOException {
checkUploadId(multipartUploadId.toByteArray());
- if (handles.isEmpty()) {
- throw new IOException("Empty upload");
- }
- // If destination already exists, we believe we already completed it.
- if (fs.exists(filePath)) {
- return getPathHandle(filePath);
- }
+ checkPartHandles(handleMap);
+ List<Map.Entry<Integer, PartHandle>> handles =
+ new ArrayList<>(handleMap.entrySet());
+ handles.sort(Comparator.comparingInt(Map.Entry::getKey));
- handles.sort(Comparator.comparing(Pair::getKey));
List<Path> partHandles = handles
.stream()
.map(pair -> {
@@ -134,7 +133,10 @@ public class FileSystemMultipartUploader extends MultipartUploader {
})
.collect(Collectors.toList());
- Path collectorPath = createCollectorPath(filePath);
+ byte[] uploadIdByteArray = multipartUploadId.toByteArray();
+ Path collectorPath = new Path(new String(uploadIdByteArray, 0,
+ uploadIdByteArray.length, Charsets.UTF_8));
+
boolean emptyFile = totalPartsLen(partHandles) == 0;
if (emptyFile) {
fs.create(filePath).close();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
index b56dbf8..7ed987e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
@@ -17,40 +17,45 @@
*/
package org.apache.hadoop.fs;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
-import java.util.List;
+import java.util.Map;
-import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import static com.google.common.base.Preconditions.checkArgument;
+
/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes. Users should:
* <ol>
- * <li>Initialize an upload</li>
- * <li>Upload parts in any order</li>
+ * <li>Initialize an upload.</li>
+ * <li>Upload parts in any order.</li>
* <li>Complete the upload in order to have it materialize in the destination
- * FS</li>
+ * FS.</li>
* </ol>
- *
- * Implementers should make sure that the complete function should make sure
- * that 'complete' will reorder parts if the destination FS doesn't already
- * do it for them.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public abstract class MultipartUploader {
+public abstract class MultipartUploader implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(MultipartUploader.class);
/**
+ * Perform any cleanup.
+ * The upload is not required to support any operations after this.
+ * @throws IOException problems on close.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
* Initialize a multipart upload.
* @param filePath Target path for upload.
* @return unique identifier associating part uploads.
@@ -59,8 +64,8 @@ public abstract class MultipartUploader {
public abstract UploadHandle initialize(Path filePath) throws IOException;
/**
- * Put part as part of a multipart upload. It should be possible to have
- * parts uploaded in any order (or in parallel).
+ * Put part as part of a multipart upload.
+ * It is possible to have parts uploaded in any order (or in parallel).
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data.
@@ -77,15 +82,15 @@ public abstract class MultipartUploader {
/**
* Complete a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
- * @param handles non-empty list of identifiers with associated part numbers
+ * @param handles non-empty map of part number to part handle.
* from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
- * Depending on the backend, the list order may be significant.
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
* @return unique PathHandle identifier for the uploaded file.
- * @throws IOException IO failure or the handle list is empty.
+ * @throws IOException IO failure
*/
public abstract PathHandle complete(Path filePath,
- List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
+ Map<Integer, PartHandle> handles,
+ UploadHandle multipartUploadId)
throws IOException;
/**
@@ -98,13 +103,52 @@ public abstract class MultipartUploader {
throws IOException;
/**
- * Utility method to validate uploadIDs
- * @param uploadId
- * @throws IllegalArgumentException
+ * Utility method to validate uploadIDs.
+ * @param uploadId Upload ID
+ * @throws IllegalArgumentException invalid ID
*/
protected void checkUploadId(byte[] uploadId)
throws IllegalArgumentException {
- Preconditions.checkArgument(uploadId.length > 0,
+ checkArgument(uploadId != null, "null uploadId");
+ checkArgument(uploadId.length > 0,
"Empty UploadId is not valid");
}
+
+ /**
+ * Utility method to validate partHandles.
+ * @param partHandles handles
+ * @throws IllegalArgumentException if the parts are invalid
+ */
+ protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
+ checkArgument(!partHandles.isEmpty(),
+ "Empty upload");
+ partHandles.keySet()
+ .stream()
+ .forEach(key ->
+ checkArgument(key > 0,
+ "Invalid part handle index %s", key));
+ }
+
+ /**
+ * Check all the arguments to the
+ * {@link #putPart(Path, InputStream, int, UploadHandle, long)} operation.
+ * @param filePath Target path for upload (same as {@link #initialize(Path)}).
+ * @param inputStream Data for this part. Implementations MUST close this
+ * stream after reading in the data.
+ * @param partNumber Index of the part relative to others.
+ * @param uploadId Identifier from {@link #initialize(Path)}.
+ * @param lengthInBytes Target length to read from the stream.
+ * @throws IllegalArgumentException invalid argument
+ */
+ protected void checkPutArguments(Path filePath,
+ InputStream inputStream,
+ int partNumber,
+ UploadHandle uploadId,
+ long lengthInBytes) throws IllegalArgumentException {
+ checkArgument(filePath != null, "null filePath");
+ checkArgument(inputStream != null, "null inputStream");
+ checkArgument(partNumber > 0, "Invalid part number: %d", partNumber);
+ checkArgument(uploadId != null, "null uploadId");
+ checkArgument(lengthInBytes >= 0, "Invalid part length: %d", lengthInBytes);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
index 8b35232..e35b6bf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
@@ -52,6 +52,13 @@ public abstract class MultipartUploaderFactory {
}
}
+ /**
+ * Get the multipart loader for a specific filesystem.
+ * @param fs filesystem
+ * @param conf configuration
+ * @return an uploader, or null if one was found.
+ * @throws IOException failure during the creation process.
+ */
public static MultipartUploader get(FileSystem fs, Configuration conf)
throws IOException {
MultipartUploader mpu = null;
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
index 532b6c7..6b4399e 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
@@ -36,3 +36,4 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
2. [Testing with the Filesystem specification](testing.html)
2. [Extending the specification and its tests](extending.html)
+1. [Uploading a file using Multiple Parts](multipartuploader.html)
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md
new file mode 100644
index 0000000..629c0c4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/multipartuploader.md
@@ -0,0 +1,235 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+
+<!-- ============================================================= -->
+<!-- CLASS: MultipartUploader -->
+<!-- ============================================================= -->
+
+# class `org.apache.hadoop.fs.MultipartUploader`
+
+<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
+
+The abstract `MultipartUploader` class is the original class to upload a file
+using multiple parts to Hadoop-supported filesystems. The benefits of a
+multipart upload is that the file can be uploaded from multiple clients or
+processes in parallel and the results will not be visible to other clients until
+the `complete` function is called.
+
+When implemented by an object store, uploaded data may incur storage charges,
+even before it is visible in the filesystems. Users of this API must be diligent
+and always perform best-effort attempts to complete or abort the upload.
+
+## Invariants
+
+All the requirements of a valid MultipartUploader are considered implicit
+econditions and postconditions:
+all operations on a valid MultipartUploader MUST result in a new
+MultipartUploader that is also valid.
+
+The operations of a single multipart upload may take place across different
+instance of a multipart uploader, across different processes and hosts.
+It is therefore a requirement that:
+
+1. All state needed to upload a part, complete an upload or abort an upload
+must be contained within or retrievable from an upload handle.
+
+1. If an upload handle is marshalled to another process, then, if the
+receiving process has the correct permissions, it may participate in the
+upload, by uploading one or more parts, by completing an upload, and/or by
+aborting the upload.
+
+## Concurrency
+
+Multiple processes may upload parts of a multipart upload simultaneously.
+
+If a call is made to `initialize(path)` to a destination where an active
+upload is in progress, implementations MUST perform one of the two operations.
+
+* Reject the call as a duplicate.
+* Permit both to proceed, with the final output of the file being
+that of _exactly one of the two uploads_.
+
+Which upload succeeds is undefined. Users must not expect consistent
+behavior across filesystems, across filesystem instances *or even
+across different requests.
+
+If a multipart upload is completed or aborted while a part upload is in progress,
+the in-progress upload, if it has not completed, must not be included in
+the final file, in whole or in part. Implementations SHOULD raise an error
+in the `putPart()` operation.
+
+## Model
+
+A File System which supports Multipart Uploads extends the existing model
+`(Directories, Files, Symlinks)` to one of `(Directories, Files, Symlinks, Uploads)`
+`Uploads` of type `Map[UploadHandle -> Map[PartHandle -> UploadPart]`.
+
+
+The `Uploads` element of the state tuple is a map of all active uploads.
+
+```python
+Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`
+```
+
+An UploadHandle is a non-empty list of bytes.
+```python
+UploadHandle: List[byte]
+len(UploadHandle) > 0
+```
+
+Clients *MUST* treat this as opaque. What is core to this features design is that the handle is valid from
+across clients: the handle may be serialized on host `hostA`, deserialized on `hostB` and still used
+to extend or complete the upload.
+
+```python
+UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])
+```
+
+Similarly, the `PartHandle` type is also a non-empty list of opaque bytes, again, marshallable between hosts.
+
+```python
+PartHandle: List[byte]
+```
+
+It is implicit that each `UploadHandle` in `FS.Uploads` is unique.
+Similarly, each `PartHandle` in the map of `[PartHandle -> UploadPart]` must also be unique.
+
+1. There is no requirement that Part Handles are unique across uploads.
+1. There is no requirement that Upload Handles are unique over time.
+However, if Part Handles are rapidly recycled, there is a risk that the nominally
+idempotent operation `abort(FS, uploadHandle)` could unintentionally cancel a
+successor operation which used the same Upload Handle.
+
+## State Changing Operations
+
+### `UploadHandle initialize(Path path)`
+
+Initialized a Multipart Upload, returning an upload handle for use in
+subsequent operations.
+
+#### Preconditions
+
+```python
+if path == "/" : raise IOException
+
+if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
+```
+
+If a filesystem does not support concurrent uploads to a destination,
+then the following precondition is added
+
+```python
+if path in values(FS.Uploads) raise PathExistsException, IOException
+
+```
+
+
+#### Postconditions
+
+The outcome of this operation is that the filesystem state is updated with a new
+active upload, with a new handle, this handle being returned to the caller.
+
+```python
+handle' = UploadHandle where not handle' in keys(FS.Uploads)
+FS' = FS where FS'.Uploads(handle') == {}
+result = handle'
+```
+
+### `PartHandle putPart(Path path, InputStream inputStream, int partNumber, UploadHandle uploadHandle, long lengthInBytes)`
+
+Upload a part for the multipart upload.
+
+#### Preconditions
+
+
+```python
+uploadHandle in keys(FS.Uploads)
+partNumber >= 1
+lengthInBytes >= 0
+len(inputStream) >= lengthInBytes
+```
+
+#### Postconditions
+
+```python
+data' = inputStream(0..lengthInBytes)
+partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts)
+FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
+result = partHandle'
+```
+
+The data is stored in the filesystem, pending completion.
+
+
+### `PathHandle complete(Path path, Map<Integer, PartHandle> parts, UploadHandle multipartUploadId)`
+
+Complete the multipart upload.
+
+A Filesystem may enforce a minimum size of each part, excluding the last part uploaded.
+
+If a part is out of this range, an `IOException` MUST be raised.
+
+#### Preconditions
+
+```python
+uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
+FS.Uploads(uploadHandle).path == path
+if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
+parts.size() > 0
+```
+
+If there are handles in the MPU which aren't included in the map, then the omitted
+parts will not be a part of the resulting file. It is up to the implementation
+of the MultipartUploader to make sure the leftover parts are cleaned up.
+
+In the case of backing stores that support directories (local filesystem, HDFS,
+etc), if, at the point of completion, there is now a directory at the
+destination then a `PathIsDirectoryException` or other `IOException` must be thrown.
+
+#### Postconditions
+
+```python
+UploadData' == ordered concatention of all data in the map of parts, ordered by key
+exists(FS', path') and result = PathHandle(path')
+FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)
+```
+
+The PathHandle is returned by the complete operation so subsequent operations
+will be able to identify that the data has not changed in the meantime.
+
+The order of parts in the uploaded by file is that of the natural order of
+parts: part 1 is ahead of part 2, etc.
+
+
+### `void abort(Path path, UploadHandle multipartUploadId)`
+
+Abort a multipart upload. The handle becomes invalid and not subject to reuse.
+
+#### Preconditions
+
+
+```python
+uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
+```
+
+#### Postconditions
+
+The upload handle is no longer known.
+
+```python
+FS' = FS where not uploadHandle in keys(FS'.uploads)
+```
+A subsequent call to `abort()` with the same handle will fail, unless
+the handle has been recycled.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 7cee5a6..7a8f083 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -23,15 +23,19 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
import com.google.common.base.Charsets;
+import org.junit.Assume;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -43,11 +47,64 @@ import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
public abstract class AbstractContractMultipartUploaderTest extends
AbstractFSContractTestBase {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
+
+ /**
+ * Size of very small uploads.
+ * Enough to be non empty, not big enough to cause delays on uploads.
+ */
+ protected static final int SMALL_FILE = 100;
+
+ private MultipartUploader mpu;
+ private MultipartUploader mpu2;
+ private final Random random = new Random();
+ private UploadHandle activeUpload;
+ private Path activeUploadPath;
+
+ protected String getMethodName() {
+ return methodName.getMethodName();
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ Configuration conf = getContract().getConf();
+ mpu = MultipartUploaderFactory.get(getFileSystem(), conf);
+ mpu2 = MultipartUploaderFactory.get(getFileSystem(), conf);
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ if (mpu!= null && activeUpload != null) {
+ try {
+ mpu.abort(activeUploadPath, activeUpload);
+ } catch (FileNotFoundException ignored) {
+ /* this is fine */
+ } catch (Exception e) {
+ LOG.info("in teardown", e);
+ }
+ }
+ cleanupWithLogger(LOG, mpu, mpu2);
+ super.teardown();
+ }
+
+ /**
+ * Get a test path based on the method name.
+ * @return a path to use in the test
+ * @throws IOException failure to build the path name up.
+ */
+ protected Path methodPath() throws IOException {
+ return path(getMethodName());
+ }
+
/**
* The payload is the part number repeated for the length of the part.
* This makes checking the correctness of the upload straightforward.
@@ -55,9 +112,19 @@ public abstract class AbstractContractMultipartUploaderTest extends
* @return the bytes to upload.
*/
private byte[] generatePayload(int partNumber) {
- int sizeInBytes = partSizeInBytes();
- ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
- for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) {
+ return generatePayload(partNumber, partSizeInBytes());
+ }
+
+ /**
+ * Generate a payload of a given size; part number is used
+ * for all the values.
+ * @param partNumber part number
+ * @param size size in bytes
+ * @return the bytes to upload.
+ */
+ private byte[] generatePayload(final int partNumber, final int size) {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ for (int i=0; i < size /(Integer.SIZE/Byte.SIZE); ++i) {
buffer.putInt(partNumber);
}
return buffer.array();
@@ -70,11 +137,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
* @throws IOException failure to read or digest the file.
*/
protected byte[] digest(Path path) throws IOException {
- FileSystem fs = getFileSystem();
- try (InputStream in = fs.open(path)) {
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ try (InputStream in = getFileSystem().open(path)) {
byte[] fdData = IOUtils.toByteArray(in);
MessageDigest newDigest = DigestUtils.getMd5Digest();
- return newDigest.digest(fdData);
+ byte[] digest = newDigest.digest(fdData);
+ return digest;
+ } finally {
+ timer.end("Download and digest of path %s", path);
}
}
@@ -93,74 +163,230 @@ public abstract class AbstractContractMultipartUploaderTest extends
}
/**
+ * How long in milliseconds for propagation of
+ * store changes, including update/delete/list
+ * to be everywhere.
+ * If 0: the FS is consistent.
+ * @return a time in milliseconds.
+ */
+ protected int timeToBecomeConsistentMillis() {
+ return 0;
+ }
+
+ /**
+ * Does a call to finalize an upload (either complete or abort) consume the
+ * uploadID immediately or is it reaped at a later point in time?
+ * @return true if the uploadID will be consumed immediately (and no longer
+ * resuable).
+ */
+ protected abstract boolean finalizeConsumesUploadIdImmediately();
+
+ /**
+ * Does the store support concurrent uploads to the same destination path?
+ * @return true if concurrent uploads are supported.
+ */
+ protected abstract boolean supportsConcurrentUploadsToSamePath();
+
+ /**
+ * Pick a multipart uploader from the index value.
+ * @param index index of upload
+ * @return an uploader
+ */
+ protected MultipartUploader mpu(int index) {
+ return (index % 2 == 0) ? mpu : mpu2;
+ }
+
+ /**
+ * Pick a multipart uploader at random.
+ * @return an uploader
+ */
+ protected MultipartUploader randomMpu() {
+ return mpu(random.nextInt(10));
+ }
+
+ /**
* Assert that a multipart upload is successful.
* @throws Exception failure
*/
@Test
public void testSingleUpload() throws Exception {
- FileSystem fs = getFileSystem();
- Path file = path("testSingleUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+ Path file = methodPath();
+ UploadHandle uploadHandle = initializeUpload(file);
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
- byte[] payload = generatePayload(1);
+ int size = SMALL_FILE;
+ byte[] payload = generatePayload(1, size);
origDigest.update(payload);
- InputStream is = new ByteArrayInputStream(payload);
- PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle,
- payload.length);
- partHandles.add(Pair.of(1, partHandle));
- PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles,
+ PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
+ partHandles.put(1, partHandle);
+ PathHandle fd = completeUpload(file, uploadHandle, partHandles,
origDigest,
- payload.length);
+ size);
+
+ if (finalizeConsumesUploadIdImmediately()) {
+ intercept(FileNotFoundException.class,
+ () -> mpu.complete(file, partHandles, uploadHandle));
+ } else {
+ PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
+ assertArrayEquals("Path handles differ", fd.toByteArray(),
+ fd2.toByteArray());
+ }
+ }
+
+ /**
+ * Initialize an upload.
+ * This saves the path and upload handle as the active
+ * upload, for aborting in teardown
+ * @param dest destination
+ * @return the handle
+ * @throws IOException failure to initialize
+ */
+ protected UploadHandle initializeUpload(final Path dest) throws IOException {
+ activeUploadPath = dest;
+ activeUpload = randomMpu().initialize(dest);
+ return activeUpload;
+ }
- // Complete is idempotent
- PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
- assertArrayEquals("Path handles differ", fd.toByteArray(),
- fd2.toByteArray());
+ /**
+ * Generate then upload a part.
+ * @param file destination
+ * @param uploadHandle handle
+ * @param index index of part
+ * @param origDigest digest to build up. May be null
+ * @return the part handle
+ * @throws IOException IO failure.
+ */
+ protected PartHandle buildAndPutPart(
+ final Path file,
+ final UploadHandle uploadHandle,
+ final int index,
+ final MessageDigest origDigest) throws IOException {
+ byte[] payload = generatePayload(index);
+ if (origDigest != null) {
+ origDigest.update(payload);
+ }
+ return putPart(file, uploadHandle, index, payload);
}
+ /**
+ * Put a part.
+ * The entire byte array is uploaded.
+ * @param file destination
+ * @param uploadHandle handle
+ * @param index index of part
+ * @param payload byte array of payload
+ * @return the part handle
+ * @throws IOException IO failure.
+ */
+ protected PartHandle putPart(final Path file,
+ final UploadHandle uploadHandle,
+ final int index,
+ final byte[] payload) throws IOException {
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ PartHandle partHandle = mpu(index)
+ .putPart(file,
+ new ByteArrayInputStream(payload),
+ index,
+ uploadHandle,
+ payload.length);
+ timer.end("Uploaded part %s", index);
+ LOG.info("Upload bandwidth {} MB/s",
+ timer.bandwidthDescription(payload.length));
+ return partHandle;
+ }
+
+ /**
+ * Complete an upload with the active MPU instance.
+ * @param file destination
+ * @param uploadHandle handle
+ * @param partHandles map of handles
+ * @param origDigest digest of source data (may be null)
+ * @param expectedLength expected length of result.
+ * @return the path handle from the upload.
+ * @throws IOException IO failure
+ */
private PathHandle completeUpload(final Path file,
- final MultipartUploader mpu,
final UploadHandle uploadHandle,
- final List<Pair<Integer, PartHandle>> partHandles,
+ final Map<Integer, PartHandle> partHandles,
final MessageDigest origDigest,
final int expectedLength) throws IOException {
- PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+ PathHandle fd = complete(file, uploadHandle, partHandles);
FileStatus status = verifyPathExists(getFileSystem(),
"Completed file", file);
assertEquals("length of " + status,
expectedLength, status.getLen());
+ if (origDigest != null) {
+ verifyContents(file, origDigest, expectedLength);
+ }
+ return fd;
+ }
+
+ /**
+ * Verify the contents of a file.
+ * @param file path
+ * @param origDigest digest
+ * @param expectedLength expected length (for logging B/W)
+ * @throws IOException IO failure
+ */
+ protected void verifyContents(final Path file,
+ final MessageDigest origDigest,
+ final int expectedLength) throws IOException {
+ ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
assertArrayEquals("digest of source and " + file
+ " differ",
origDigest.digest(), digest(file));
+ timer2.end("Completed digest", file);
+ LOG.info("Download bandwidth {} MB/s",
+ timer2.bandwidthDescription(expectedLength));
+ }
+
+ /**
+ * Perform the inner complete without verification.
+ * @param file destination path
+ * @param uploadHandle upload handle
+ * @param partHandles map of parts
+ * @return the path handle from the upload.
+ * @throws IOException IO failure
+ */
+ private PathHandle complete(final Path file,
+ final UploadHandle uploadHandle,
+ final Map<Integer, PartHandle> partHandles) throws IOException {
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ PathHandle fd = randomMpu().complete(file, partHandles, uploadHandle);
+ timer.end("Completed upload to %s", file);
return fd;
}
/**
+ * Abort an upload.
+ * @param file path
+ * @param uploadHandle handle
+ * @throws IOException failure
+ */
+ private void abortUpload(final Path file, UploadHandle uploadHandle)
+ throws IOException {
+ randomMpu().abort(file, uploadHandle);
+ }
+
+ /**
* Assert that a multipart upload is successful.
* @throws Exception failure
*/
@Test
public void testMultipartUpload() throws Exception {
- FileSystem fs = getFileSystem();
- Path file = path("testMultipartUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+ Path file = methodPath();
+ UploadHandle uploadHandle = initializeUpload(file);
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
final int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
- byte[] payload = generatePayload(i);
- origDigest.update(payload);
- InputStream is = new ByteArrayInputStream(payload);
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
- payload.length);
- partHandles.add(Pair.of(i, partHandle));
+ PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
+ origDigest);
+ partHandles.put(i, partHandle);
}
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+ completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes());
}
@@ -173,17 +399,33 @@ public abstract class AbstractContractMultipartUploaderTest extends
public void testMultipartUploadEmptyPart() throws Exception {
FileSystem fs = getFileSystem();
Path file = path("testMultipartUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
- MessageDigest origDigest = DigestUtils.getMd5Digest();
- byte[] payload = new byte[0];
- origDigest.update(payload);
- InputStream is = new ByteArrayInputStream(payload);
- PartHandle partHandle = mpu.putPart(file, is, 0, uploadHandle,
- payload.length);
- partHandles.add(Pair.of(0, partHandle));
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest, 0);
+ try (MultipartUploader uploader =
+ MultipartUploaderFactory.get(fs, null)) {
+ UploadHandle uploadHandle = uploader.initialize(file);
+
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
+ MessageDigest origDigest = DigestUtils.getMd5Digest();
+ byte[] payload = new byte[0];
+ origDigest.update(payload);
+ InputStream is = new ByteArrayInputStream(payload);
+ PartHandle partHandle = uploader.putPart(file, is, 1, uploadHandle,
+ payload.length);
+ partHandles.put(1, partHandle);
+ completeUpload(file, uploadHandle, partHandles, origDigest, 0);
+ }
+ }
+
+ /**
+ * Assert that a multipart upload is successful.
+ * @throws Exception failure
+ */
+ @Test
+ public void testUploadEmptyBlock() throws Exception {
+ Path file = methodPath();
+ UploadHandle uploadHandle = initializeUpload(file);
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
+ partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
+ completeUpload(file, uploadHandle, partHandles, null, 0);
}
/**
@@ -192,11 +434,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
*/
@Test
public void testMultipartUploadReverseOrder() throws Exception {
- FileSystem fs = getFileSystem();
- Path file = path("testMultipartUploadReverseOrder");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+ Path file = methodPath();
+ UploadHandle uploadHandle = initializeUpload(file);
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
MessageDigest origDigest = DigestUtils.getMd5Digest();
final int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
@@ -204,13 +444,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
origDigest.update(payload);
}
for (int i = payloadCount; i > 0; --i) {
- byte[] payload = generatePayload(i);
- InputStream is = new ByteArrayInputStream(payload);
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
- payload.length);
- partHandles.add(Pair.of(i, partHandle));
+ partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
}
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+ completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes());
}
@@ -222,25 +458,19 @@ public abstract class AbstractContractMultipartUploaderTest extends
public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
throws Exception {
describe("Upload in reverse order and the part numbers are not contiguous");
- FileSystem fs = getFileSystem();
- Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+ Path file = methodPath();
+ UploadHandle uploadHandle = initializeUpload(file);
MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = 2 * getTestPayloadCount();
for (int i = 2; i <= payloadCount; i += 2) {
byte[] payload = generatePayload(i);
origDigest.update(payload);
}
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = payloadCount; i > 0; i -= 2) {
- byte[] payload = generatePayload(i);
- InputStream is = new ByteArrayInputStream(payload);
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
- payload.length);
- partHandles.add(Pair.of(i, partHandle));
+ partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
}
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+ completeUpload(file, uploadHandle, partHandles, origDigest,
getTestPayloadCount() * partSizeInBytes());
}
@@ -251,19 +481,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
@Test
public void testMultipartUploadAbort() throws Exception {
describe("Upload and then abort it before completing");
- FileSystem fs = getFileSystem();
- Path file = path("testMultipartUploadAbort");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle uploadHandle = mpu.initialize(file);
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
- for (int i = 20; i >= 10; --i) {
- byte[] payload = generatePayload(i);
- InputStream is = new ByteArrayInputStream(payload);
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
- payload.length);
- partHandles.add(Pair.of(i, partHandle));
+ Path file = methodPath();
+ UploadHandle uploadHandle = initializeUpload(file);
+ int end = 10;
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
+ for (int i = 12; i > 10; i--) {
+ partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
}
- mpu.abort(file, uploadHandle);
+ abortUpload(file, uploadHandle);
String contents = "ThisIsPart49\n";
int len = contents.getBytes(Charsets.UTF_8).length;
@@ -275,6 +500,15 @@ public abstract class AbstractContractMultipartUploaderTest extends
() -> mpu.complete(file, partHandles, uploadHandle));
assertPathDoesNotExist("Uploaded file should not exist", file);
+
+ // A second abort should be an FileNotFoundException if the UploadHandle is
+ // consumed by finalization operations (complete, abort).
+ if (finalizeConsumesUploadIdImmediately()) {
+ intercept(FileNotFoundException.class,
+ () -> abortUpload(file, uploadHandle));
+ } else {
+ abortUpload(file, uploadHandle);
+ }
}
/**
@@ -282,13 +516,23 @@ public abstract class AbstractContractMultipartUploaderTest extends
*/
@Test
public void testAbortUnknownUpload() throws Exception {
- FileSystem fs = getFileSystem();
- Path file = path("testAbortUnknownUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+ Path file = methodPath();
ByteBuffer byteBuffer = ByteBuffer.wrap(
"invalid-handle".getBytes(Charsets.UTF_8));
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
- intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle));
+ intercept(FileNotFoundException.class,
+ () -> abortUpload(file, uploadHandle));
+ }
+
+ /**
+ * Trying to abort with a handle of size 0 must fail.
+ */
+ @Test
+ public void testAbortEmptyUpload() throws Exception {
+ describe("initialize upload and abort before uploading data");
+ Path file = methodPath();
+ abortUpload(file, initializeUpload(file));
+ assertPathDoesNotExist("Uploaded file should not exist", file);
}
/**
@@ -296,13 +540,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
*/
@Test
public void testAbortEmptyUploadHandle() throws Exception {
- FileSystem fs = getFileSystem();
- Path file = path("testAbortEmptyUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
intercept(IllegalArgumentException.class,
- () -> mpu.abort(file, uploadHandle));
+ () -> abortUpload(methodPath(), uploadHandle));
}
/**
@@ -311,26 +552,20 @@ public abstract class AbstractContractMultipartUploaderTest extends
@Test
public void testCompleteEmptyUpload() throws Exception {
describe("Expect an empty MPU to fail, but still be abortable");
- FileSystem fs = getFileSystem();
- Path dest = path("testCompleteEmptyUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle handle = mpu.initialize(dest);
- intercept(IOException.class,
- () -> mpu.complete(dest, new ArrayList<>(), handle));
- mpu.abort(dest, handle);
+ Path dest = methodPath();
+ UploadHandle handle = initializeUpload(dest);
+ intercept(IllegalArgumentException.class,
+ () -> mpu.complete(dest, new HashMap<>(), handle));
+ abortUpload(dest, handle);
}
/**
* When we pass empty uploadID, putPart throws IllegalArgumentException.
- * @throws Exception
*/
@Test
public void testPutPartEmptyUploadID() throws Exception {
describe("Expect IllegalArgumentException when putPart uploadID is empty");
- FileSystem fs = getFileSystem();
- Path dest = path("testCompleteEmptyUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- mpu.initialize(dest);
+ Path dest = methodPath();
UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
byte[] payload = generatePayload(1);
@@ -341,25 +576,123 @@ public abstract class AbstractContractMultipartUploaderTest extends
/**
* When we pass empty uploadID, complete throws IllegalArgumentException.
- * @throws Exception
*/
@Test
public void testCompleteEmptyUploadID() throws Exception {
describe("Expect IllegalArgumentException when complete uploadID is empty");
- FileSystem fs = getFileSystem();
- Path dest = path("testCompleteEmptyUpload");
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
- UploadHandle realHandle = mpu.initialize(dest);
+ Path dest = methodPath();
+ UploadHandle realHandle = initializeUpload(dest);
UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
- byte[] payload = generatePayload(1);
- InputStream is = new ByteArrayInputStream(payload);
- PartHandle partHandle = mpu.putPart(dest, is, 1, realHandle,
- payload.length);
- partHandles.add(Pair.of(1, partHandle));
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
+ PartHandle partHandle = putPart(dest, realHandle, 1,
+ generatePayload(1, SMALL_FILE));
+ partHandles.put(1, partHandle);
intercept(IllegalArgumentException.class,
() -> mpu.complete(dest, partHandles, emptyHandle));
+
+ // and, while things are setup, attempt to complete with
+ // a part index of 0
+ partHandles.clear();
+ partHandles.put(0, partHandle);
+ intercept(IllegalArgumentException.class,
+ () -> mpu.complete(dest, partHandles, realHandle));
+ }
+
+ /**
+ * Assert that upon completion, a directory in the way of the file will
+ * result in a failure. This test only applies to backing stores with a
+ * concept of directories.
+ * @throws Exception failure
+ */
+ @Test
+ public void testDirectoryInTheWay() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path file = methodPath();
+ UploadHandle uploadHandle = initializeUpload(file);
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
+ int size = SMALL_FILE;
+ PartHandle partHandle = putPart(file, uploadHandle, 1,
+ generatePayload(1, size));
+ partHandles.put(1, partHandle);
+
+ fs.mkdirs(file);
+ intercept(IOException.class,
+ () -> completeUpload(file, uploadHandle, partHandles, null,
+ size));
+ // abort should still work
+ abortUpload(file, uploadHandle);
+ }
+
+ @Test
+ public void testConcurrentUploads() throws Throwable {
+
+ // if the FS doesn't support concurrent uploads, this test is
+ // required to fail during the second initialization.
+ final boolean concurrent = supportsConcurrentUploadsToSamePath();
+
+ describe("testing concurrent uploads, MPU support for this is "
+ + concurrent);
+ final FileSystem fs = getFileSystem();
+ final Path file = methodPath();
+ final int size1 = SMALL_FILE;
+ final int partId1 = 1;
+ final byte[] payload1 = generatePayload(partId1, size1);
+ final MessageDigest digest1 = DigestUtils.getMd5Digest();
+ digest1.update(payload1);
+ final UploadHandle upload1 = initializeUpload(file);
+ final Map<Integer, PartHandle> partHandles1 = new HashMap<>();
+
+ // initiate part 2
+ // by using a different size, it's straightforward to see which
+ // version is visible, before reading/digesting the contents
+ final int size2 = size1 * 2;
+ final int partId2 = 2;
+ final byte[] payload2 = generatePayload(partId1, size2);
+ final MessageDigest digest2 = DigestUtils.getMd5Digest();
+ digest2.update(payload2);
+
+ final UploadHandle upload2;
+ try {
+ upload2 = initializeUpload(file);
+ Assume.assumeTrue(
+ "The Filesystem is unexpectedly supporting concurrent uploads",
+ concurrent);
+ } catch (IOException e) {
+ if (!concurrent) {
+ // this is expected, so end the test
+ LOG.debug("Expected exception raised on concurrent uploads {}", e);
+ return;
+ } else {
+ throw e;
+ }
+ }
+ final Map<Integer, PartHandle> partHandles2 = new HashMap<>();
+
+
+ assertNotEquals("Upload handles match", upload1, upload2);
+
+ // put part 1
+ partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
+
+ // put part2
+ partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
+
+ // complete part u1. expect its size and digest to
+ // be as expected.
+ completeUpload(file, upload1, partHandles1, digest1, size1);
+
+ // now upload part 2.
+ complete(file, upload2, partHandles2);
+ // and await the visible length to match
+ eventually(timeToBecomeConsistentMillis(), 500,
+ () -> {
+ FileStatus status = fs.getFileStatus(file);
+ assertEquals("File length in " + status,
+ size2, status.getLen());
+ });
+
+ verifyContents(file, digest2, size2);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
index a50d2e4..6e27964d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
@@ -40,4 +40,14 @@ public class TestLocalFSContractMultipartUploader
protected int partSizeInBytes() {
return 1024;
}
+
+ @Override
+ protected boolean finalizeConsumesUploadIdImmediately() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportsConcurrentUploadsToSamePath() {
+ return true;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
index f3a5265..54f4ed2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
@@ -32,6 +34,9 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestHDFSContractMultipartUploader extends
AbstractContractMultipartUploaderTest {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
+
@BeforeClass
public static void createCluster() throws IOException {
HDFSContract.createCluster();
@@ -55,4 +60,14 @@ public class TestHDFSContractMultipartUploader extends
protected int partSizeInBytes() {
return 1024;
}
+
+ @Override
+ protected boolean finalizeConsumesUploadIdImmediately() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportsConcurrentUploadsToSamePath() {
+ return true;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
index cab4e2a..cf58751 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
@@ -25,7 +25,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -68,10 +70,8 @@ public class S3AMultipartUploader extends MultipartUploader {
public static final String HEADER = "S3A-part01";
public S3AMultipartUploader(FileSystem fs, Configuration conf) {
- if (!(fs instanceof S3AFileSystem)) {
- throw new IllegalArgumentException(
- "S3A MultipartUploads must use S3AFileSystem");
- }
+ Preconditions.checkArgument(fs instanceof S3AFileSystem,
+ "Wrong filesystem: expected S3A but got %s", fs);
s3a = (S3AFileSystem) fs;
}
@@ -88,6 +88,8 @@ public class S3AMultipartUploader extends MultipartUploader {
public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException {
+ checkPutArguments(filePath, inputStream, partNumber, uploadId,
+ lengthInBytes);
byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes);
String key = s3a.pathToKey(filePath);
@@ -105,14 +107,16 @@ public class S3AMultipartUploader extends MultipartUploader {
@Override
public PathHandle complete(Path filePath,
- List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId)
+ Map<Integer, PartHandle> handleMap,
+ UploadHandle uploadId)
throws IOException {
byte[] uploadIdBytes = uploadId.toByteArray();
checkUploadId(uploadIdBytes);
- if (handles.isEmpty()) {
- throw new IOException("Empty upload");
- }
+ checkPartHandles(handleMap);
+ List<Map.Entry<Integer, PartHandle>> handles =
+ new ArrayList<>(handleMap.entrySet());
+ handles.sort(Comparator.comparingInt(Map.Entry::getKey));
final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
String key = s3a.pathToKey(filePath);
@@ -121,11 +125,11 @@ public class S3AMultipartUploader extends MultipartUploader {
ArrayList<PartETag> eTags = new ArrayList<>();
eTags.ensureCapacity(handles.size());
long totalLength = 0;
- for (Pair<Integer, PartHandle> handle : handles) {
- byte[] payload = handle.getRight().toByteArray();
+ for (Map.Entry<Integer, PartHandle> handle : handles) {
+ byte[] payload = handle.getValue().toByteArray();
Pair<Long, String> result = parsePartHandlePayload(payload);
totalLength += result.getLeft();
- eTags.add(new PartETag(handle.getLeft(), result.getRight()));
+ eTags.add(new PartETag(handle.getKey(), result.getRight()));
}
AtomicInteger errorCount = new AtomicInteger(0);
CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
@@ -172,7 +176,7 @@ public class S3AMultipartUploader extends MultipartUploader {
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
"Empty etag");
- Preconditions.checkArgument(len > 0,
+ Preconditions.checkArgument(len >= 0,
"Invalid length");
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
@@ -190,6 +194,7 @@ public class S3AMultipartUploader extends MultipartUploader {
* @return the length and etag
* @throws IOException error reading the payload
*/
+ @VisibleForTesting
static Pair<Long, String> parsePartHandlePayload(byte[] data)
throws IOException {
@@ -201,7 +206,7 @@ public class S3AMultipartUploader extends MultipartUploader {
}
final long len = input.readLong();
final String etag = input.readUTF();
- if (len <= 0) {
+ if (len < 0) {
throw new IOException("Negative length");
}
return Pair.of(len, etag);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 6514ea3..059312a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.fs.contract.s3a;
-import java.io.IOException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
@@ -35,6 +34,9 @@ import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_H
/**
* Test MultipartUploader with S3A.
+ * Although not an S3A Scale test subclass, it uses the -Dscale option
+ * to enable it, and partition size option to control the size of
+ * parts uploaded.
*/
public class ITestS3AContractMultipartUploader extends
AbstractContractMultipartUploaderTest {
@@ -79,6 +81,35 @@ public class ITestS3AContractMultipartUploader extends
return new S3AContract(conf);
}
+ /**
+ * Bigger test: use the scale timeout.
+ * @return the timeout for scale tests.
+ */
+ @Override
+ protected int getTestTimeoutMillis() {
+ return SCALE_TEST_TIMEOUT_MILLIS;
+ }
+
+
+ @Override
+ protected boolean supportsConcurrentUploadsToSamePath() {
+ return true;
+ }
+
+ /**
+ * Provide a pessimistic time to become consistent.
+ * @return a time in milliseconds
+ */
+ @Override
+ protected int timeToBecomeConsistentMillis() {
+ return 30 * 1000;
+ }
+
+ @Override
+ protected boolean finalizeConsumesUploadIdImmediately() {
+ return false;
+ }
+
@Override
public void setup() throws Exception {
super.setup();
@@ -103,19 +134,29 @@ public class ITestS3AContractMultipartUploader extends
public void teardown() throws Exception {
Path teardown = path("teardown").getParent();
S3AFileSystem fs = getFileSystem();
- WriteOperationHelper helper = fs.getWriteOperationHelper();
- try {
- LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
- int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown));
- LOG.info("Found {} incomplete uploads", count);
- } catch (IOException e) {
- LOG.warn("IOE in teardown", e);
+ if (fs != null) {
+ WriteOperationHelper helper = fs.getWriteOperationHelper();
+ try {
+ LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
+ int count = helper.abortMultipartUploadsUnderPath(
+ fs.pathToKey(teardown));
+ LOG.info("Found {} incomplete uploads", count);
+ } catch (Exception e) {
+ LOG.warn("Exeception in teardown", e);
+ }
}
super.teardown();
}
+ /**
+ * S3 has no concept of directories, so this test does not apply.
+ */
+ public void testDirectoryInTheWay() throws Exception {
+ // no-op
+ }
+
@Override
- public void testMultipartUploadEmptyPart() throws Exception {
- // ignore the test in the base class.
+ public void testMultipartUploadReverseOrder() throws Exception {
+ ContractTestUtils.skip("skipped for speed");
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
index 35d0460..4825d26 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
@@ -60,7 +60,7 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
@Test
public void testNoLen() throws Throwable {
intercept(IllegalArgumentException.class,
- () -> buildPartHandlePayload("tag", 0));
+ () -> buildPartHandlePayload("tag", -1));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org