You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by ga...@apache.org on 2019/01/04 23:43:40 UTC
jclouds git commit: JCLOUDS-1366: JCLOUDS-1472: Fix InputStream MPU
Repository: jclouds
Updated Branches:
refs/heads/master a36c9dcef -> 2393c7920
JCLOUDS-1366: JCLOUDS-1472: Fix InputStream MPU
Previously jclouds attempted to slice non-repeatable InputStream
Payloads in order to upload sequentially. This never worked due to
mutating the single stream via skip and close. Also backfill test
which spuriously succeeded.
Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/2393c792
Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/2393c792
Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/2393c792
Branch: refs/heads/master
Commit: 2393c7920b65be5f42fae3376b34810c7e96815f
Parents: a36c9dc
Author: Andrew Gaul <ga...@apache.org>
Authored: Fri Jan 4 14:56:29 2019 -0800
Committer: Andrew Gaul <ga...@apache.org>
Committed: Fri Jan 4 15:42:17 2019 -0800
----------------------------------------------------------------------
.../blobstore/internal/BaseBlobStore.java | 37 +++++++++++++++-----
.../internal/BaseBlobIntegrationTest.java | 8 ++++-
2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jclouds/blob/2393c792/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
----------------------------------------------------------------------
diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
index eed9ab1..1c0ba0c 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
@@ -23,6 +23,7 @@ import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursi
import static org.jclouds.util.Predicates2.retry;
import java.io.File;
+import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -57,6 +58,7 @@ import org.jclouds.http.HttpResponse;
import org.jclouds.http.HttpResponseException;
import org.jclouds.io.ContentMetadata;
import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.util.Closeables2;
@@ -352,6 +354,26 @@ public abstract class BaseBlobStore implements BlobStore {
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides);
+ // Cannot slice InputStream Payload since slice and close mutate the
+ // underlying stream. Also issue synchronous uploads to avoid buffering
+ // arbitrary amounts of data in-memory.
+ Payload payload = blob.getPayload();
+ boolean repeatable = blob.getPayload().isRepeatable();
+ if (!repeatable) {
+ payload = Payloads.newInputStreamPayload(new FilterInputStream((InputStream) payload.getRawContent()) {
+ @Override
+ public long skip(long offset) throws IOException {
+ // intentionally not implemented
+ return offset;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // intentionally not implemented
+ }
+ });
+ }
+
try {
long contentLength = blob.getMetadata().getContentMetadata().getContentLength();
// TODO: inject MultipartUploadSlicingAlgorithm to override default part size
@@ -359,19 +381,16 @@ public abstract class BaseBlobStore implements BlobStore {
getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
long partSize = algorithm.calculateChunkSize(contentLength);
int partNumber = 1;
- // TODO: for InputStream payloads, this buffers all parts in-memory!
while (partNumber <= algorithm.getParts()) {
- Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), partSize);
- BlobUploader b =
- new BlobUploader(mpu, partNumber++, payload);
- parts.add(executor.submit(b));
+ Payload slice = slicer.slice(payload, algorithm.getCopied(), partSize);
+ BlobUploader b = new BlobUploader(mpu, partNumber++, slice);
+ parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call()));
algorithm.addCopied(partSize);
}
if (algorithm.getRemaining() != 0) {
- Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), algorithm.getRemaining());
- BlobUploader b =
- new BlobUploader(mpu, partNumber, payload);
- parts.add(executor.submit(b));
+ Payload slice = slicer.slice(payload, algorithm.getCopied(), algorithm.getRemaining());
+ BlobUploader b = new BlobUploader(mpu, partNumber, slice);
+ parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call()));
}
return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
} catch (RuntimeException re) {
http://git-wip-us.apache.org/repos/asf/jclouds/blob/2393c792/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
----------------------------------------------------------------------
diff --git a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
index 35c43ac..f250a60 100644
--- a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
+++ b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
@@ -679,7 +679,13 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
@Test(groups = { "integration", "live" })
public void testPutMultipartInputStream() throws Exception {
- long length = getMinimumMultipartBlobSize();
+ long length = Math.max(getMinimumMultipartBlobSize(), MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE + 1);
+ BlobStore blobStore = view.getBlobStore();
+ MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
+ blobStore.getMinimumMultipartPartSize(), blobStore.getMaximumMultipartPartSize(),
+ blobStore.getMaximumNumberOfParts());
+ // make sure that we are creating multiple parts
+ assertThat(algorithm.calculateChunkSize(length)).isLessThan(length);
ByteSource byteSource = TestUtils.randomByteSource().slice(0, length);
Payload payload = new InputStreamPayload(byteSource.openStream());
testPut(payload, null, new ByteSourcePayload(byteSource), length, new PutOptions().multipart(true));