You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by ga...@apache.org on 2019/01/04 23:43:40 UTC

jclouds git commit: JCLOUDS-1366: JCLOUDS-1472: Fix InputStream MPU

Repository: jclouds
Updated Branches:
  refs/heads/master a36c9dcef -> 2393c7920


JCLOUDS-1366: JCLOUDS-1472: Fix InputStream MPU

Previously jclouds attempted to slice non-repeatable InputStream
Payloads in order to upload sequentially.  This never worked due to
mutating the single stream via skip and close.  Also backfill test
which spuriously succeeded.


Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/2393c792
Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/2393c792
Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/2393c792

Branch: refs/heads/master
Commit: 2393c7920b65be5f42fae3376b34810c7e96815f
Parents: a36c9dc
Author: Andrew Gaul <ga...@apache.org>
Authored: Fri Jan 4 14:56:29 2019 -0800
Committer: Andrew Gaul <ga...@apache.org>
Committed: Fri Jan 4 15:42:17 2019 -0800

----------------------------------------------------------------------
 .../blobstore/internal/BaseBlobStore.java       | 37 +++++++++++++++-----
 .../internal/BaseBlobIntegrationTest.java       |  8 ++++-
 2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds/blob/2393c792/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
----------------------------------------------------------------------
diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
index eed9ab1..1c0ba0c 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
@@ -23,6 +23,7 @@ import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursi
 import static org.jclouds.util.Predicates2.retry;
 
 import java.io.File;
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -57,6 +58,7 @@ import org.jclouds.http.HttpResponse;
 import org.jclouds.http.HttpResponseException;
 import org.jclouds.io.ContentMetadata;
 import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
 import org.jclouds.io.PayloadSlicer;
 import org.jclouds.util.Closeables2;
 
@@ -352,6 +354,26 @@ public abstract class BaseBlobStore implements BlobStore {
    protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
       ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
       MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides);
+      // Cannot slice InputStream Payload since slice and close mutate the
+      // underlying stream.  Also issue synchronous uploads to avoid buffering
+      // arbitrary amounts of data in-memory.
+      Payload payload = blob.getPayload();
+      boolean repeatable = blob.getPayload().isRepeatable();
+      if (!repeatable) {
+         payload = Payloads.newInputStreamPayload(new FilterInputStream((InputStream) payload.getRawContent()) {
+            @Override
+            public long skip(long offset) throws IOException {
+               // intentionally not implemented
+               return offset;
+            }
+
+            @Override
+            public void close() throws IOException {
+               // intentionally not implemented
+            }
+         });
+      }
+
       try {
          long contentLength = blob.getMetadata().getContentMetadata().getContentLength();
          // TODO: inject MultipartUploadSlicingAlgorithm to override default part size
@@ -359,19 +381,16 @@ public abstract class BaseBlobStore implements BlobStore {
                getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
          long partSize = algorithm.calculateChunkSize(contentLength);
          int partNumber = 1;
-         // TODO: for InputStream payloads, this buffers all parts in-memory!
          while (partNumber <= algorithm.getParts()) {
-            Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), partSize);
-            BlobUploader b =
-                  new BlobUploader(mpu, partNumber++, payload);
-            parts.add(executor.submit(b));
+            Payload slice = slicer.slice(payload, algorithm.getCopied(), partSize);
+            BlobUploader b = new BlobUploader(mpu, partNumber++, slice);
+            parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call()));
             algorithm.addCopied(partSize);
          }
          if (algorithm.getRemaining() != 0) {
-            Payload payload = slicer.slice(blob.getPayload(), algorithm.getCopied(), algorithm.getRemaining());
-            BlobUploader b =
-                  new BlobUploader(mpu, partNumber, payload);
-            parts.add(executor.submit(b));
+            Payload slice = slicer.slice(payload, algorithm.getCopied(), algorithm.getRemaining());
+            BlobUploader b = new BlobUploader(mpu, partNumber, slice);
+            parts.add(repeatable ? executor.submit(b) : Futures.immediateFuture(b.call()));
          }
          return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
       } catch (RuntimeException re) {

http://git-wip-us.apache.org/repos/asf/jclouds/blob/2393c792/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
----------------------------------------------------------------------
diff --git a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
index 35c43ac..f250a60 100644
--- a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
+++ b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java
@@ -679,7 +679,13 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
 
    @Test(groups = { "integration", "live" })
    public void testPutMultipartInputStream() throws Exception {
-      long length = getMinimumMultipartBlobSize();
+      long length = Math.max(getMinimumMultipartBlobSize(), MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE + 1);
+      BlobStore blobStore = view.getBlobStore();
+      MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
+              blobStore.getMinimumMultipartPartSize(), blobStore.getMaximumMultipartPartSize(),
+              blobStore.getMaximumNumberOfParts());
+      // make sure that we are creating multiple parts
+      assertThat(algorithm.calculateChunkSize(length)).isLessThan(length);
       ByteSource byteSource = TestUtils.randomByteSource().slice(0, length);
       Payload payload = new InputStreamPayload(byteSource.openStream());
       testPut(payload, null, new ByteSourcePayload(byteSource), length, new PutOptions().multipart(true));