You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by na...@apache.org on 2016/08/10 22:09:48 UTC

[04/12] jclouds git commit: Parallel upload for BaseBlobStore

Parallel upload for BaseBlobStore


Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/42079e13
Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/42079e13
Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/42079e13

Branch: refs/heads/gsoc2016-ivan
Commit: 42079e1392fb5b2b792f518812689854c375445f
Parents: a515ce2
Author: Zack Shoylev <za...@rackspace.com>
Authored: Thu Jul 7 16:39:04 2016 -0500
Committer: Zack Shoylev <za...@rackspace.com>
Committed: Fri Jul 15 04:15:17 2016 -0500

----------------------------------------------------------------------
 .../blobstore/RegionScopedSwiftBlobStore.java   |  1 -
 .../blobstore/internal/BaseBlobStore.java       | 80 +++++++++++++++-----
 2 files changed, 62 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds/blob/42079e13/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
----------------------------------------------------------------------
diff --git a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
index 63f4315..0b29f1f 100644
--- a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
+++ b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
@@ -608,7 +608,6 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
       }
    }
 
-   // copied from BaseBlobStore
    @Beta
    protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
       ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();

http://git-wip-us.apache.org/repos/asf/jclouds/blob/42079e13/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
----------------------------------------------------------------------
diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
index 818a154..21ae2ff 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java
@@ -18,17 +18,20 @@ package org.jclouds.blobstore.internal;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.jclouds.Constants.PROPERTY_USER_THREADS;
 import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
 import static org.jclouds.util.Predicates2.retry;
 
 import java.io.InputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import javax.inject.Inject;
+import javax.inject.Named;
 
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
@@ -57,10 +60,13 @@ import org.jclouds.io.PayloadSlicer;
 import org.jclouds.util.Closeables2;
 
 import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
 public abstract class BaseBlobStore implements BlobStore {
 
@@ -96,7 +102,7 @@ public abstract class BaseBlobStore implements BlobStore {
    /**
     * This implementation invokes
     * {@link #list(String,org.jclouds.blobstore.options.ListContainerOptions)}
-    * 
+    *
     * @param container
     *           container name
     */
@@ -107,7 +113,7 @@ public abstract class BaseBlobStore implements BlobStore {
 
    /**
     * This implementation invokes {@link BlobUtilsImpl#directoryExists}
-    * 
+    *
     * @param container
     *           container name
     * @param directory
@@ -120,7 +126,7 @@ public abstract class BaseBlobStore implements BlobStore {
 
    /**
     * This implementation invokes {@link BlobUtilsImpl#createDirectory}
-    * 
+    *
     * @param container
     *           container name
     * @param directory
@@ -141,7 +147,7 @@ public abstract class BaseBlobStore implements BlobStore {
    /**
     * This implementation invokes {@link #countBlobs} with the
     * {@link ListContainerOptions#recursive} option.
-    * 
+    *
     * @param container
     *           container name
     */
@@ -152,7 +158,7 @@ public abstract class BaseBlobStore implements BlobStore {
 
    /**
     * This implementation invokes {@link BlobUtilsImpl#countBlobs}
-    * 
+    *
     * @param container
     *           container name
     */
@@ -164,7 +170,7 @@ public abstract class BaseBlobStore implements BlobStore {
    /**
     * This implementation invokes {@link #clearContainer} with the
     * {@link ListContainerOptions#recursive} option.
-    * 
+    *
     * @param container
     *           container name
     */
@@ -175,7 +181,7 @@ public abstract class BaseBlobStore implements BlobStore {
 
    /**
     * This implementation invokes {@link BlobUtilsImpl#clearContainer}
-    * 
+    *
     * @param container
     *           container name
     */
@@ -186,7 +192,7 @@ public abstract class BaseBlobStore implements BlobStore {
 
    /**
     * This implementation invokes {@link BlobUtilsImpl#deleteDirectory}.
-    * 
+    *
     * @param container
     *           container name
     */
@@ -198,7 +204,7 @@ public abstract class BaseBlobStore implements BlobStore {
    /**
     * This implementation invokes
     * {@link #getBlob(String,String,org.jclouds.blobstore.options.GetOptions)}
-    * 
+    *
     * @param container
     *           container name
     * @param key
@@ -211,7 +217,7 @@ public abstract class BaseBlobStore implements BlobStore {
 
    /**
     * This implementation invokes {@link #deleteAndEnsurePathGone}
-    * 
+    *
     * @param container
     *           bucket name
     */
@@ -320,29 +326,67 @@ public abstract class BaseBlobStore implements BlobStore {
       }
    }
 
-   // TODO: parallel uploads
+   @com.google.inject.Inject
+   @Named(PROPERTY_USER_THREADS)
+   @VisibleForTesting
+   ListeningExecutorService userExecutor;
+
+   /**
+    * Upload using a user-provided executor, or the jclouds userExecutor
+    *
+    * @param container
+    * @param blob
+    * @param overrides
+    * @return the multipart blob etag
+    */
    @Beta
    protected String putMultipartBlob(String container, Blob blob, PutOptions overrides) {
+      if (overrides.getUseCustomExecutor()) {
+         return putMultipartBlob(container, blob, overrides, overrides.getCustomExecutor());
+      } else {
+         return putMultipartBlob(container, blob, overrides, userExecutor);
+      }
+   }
+
+   @Beta
+   protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
+      ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
       MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides);
       try {
-         List<MultipartPart> parts = Lists.newArrayList();
          long contentLength = blob.getMetadata().getContentMetadata().getContentLength();
          MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
                getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
          long partSize = algorithm.calculateChunkSize(contentLength);
          int partNumber = 1;
          for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
-            MultipartPart part = uploadMultipartPart(mpu, partNumber, payload);
-            parts.add(part);
-            ++partNumber;
+            BlobUploader b =
+                  new BlobUploader(mpu, partNumber++, payload);
+            parts.add(executor.submit(b));
          }
-         return completeMultipartUpload(mpu, parts);
+         return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
       } catch (RuntimeException re) {
          abortMultipartUpload(mpu);
          throw re;
       }
    }
 
+   private final class BlobUploader implements Callable<MultipartPart> {
+      private final MultipartUpload mpu;
+      private final int partNumber;
+      private final Payload payload;
+
+      BlobUploader(MultipartUpload mpu, int partNumber, Payload payload) {
+         this.mpu = mpu;
+         this.partNumber = partNumber;
+         this.payload = payload;
+      }
+
+      @Override
+      public MultipartPart call() {
+         return uploadMultipartPart(mpu, partNumber, payload);
+      }
+   }
+
    private static HttpResponseException returnResponseException(int code) {
       HttpResponse response = HttpResponse.builder().statusCode(code).build();
       // TODO: bogus endpoint