You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by za...@apache.org on 2016/07/01 17:28:31 UTC

jclouds git commit: Changes the upload behavior to parallel, a TODO

Repository: jclouds
Updated Branches:
  refs/heads/master 984b6ae8f -> 6bff97b6d


Changes the upload behavior to parallel, a TODO


Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo
Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/6bff97b6
Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/6bff97b6
Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/6bff97b6

Branch: refs/heads/master
Commit: 6bff97b6d3f05aaf7e24d303d917c478f533313d
Parents: 984b6ae
Author: Zack Shoylev <za...@rackspace.com>
Authored: Mon Jun 20 15:36:47 2016 -0500
Committer: Zack Shoylev <za...@rackspace.com>
Committed: Fri Jul 1 11:26:56 2016 -0500

----------------------------------------------------------------------
 .../blobstore/RegionScopedSwiftBlobStore.java   | 63 +++++++++++++--
 .../jclouds/blobstore/options/PutOptions.java   | 82 +++++++++++++++++++-
 2 files changed, 134 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds/blob/6bff97b6/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
----------------------------------------------------------------------
diff --git a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
index 20bdaef..63f4315 100644
--- a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
+++ b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java
@@ -20,15 +20,19 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.tryFind;
 import static com.google.common.collect.Lists.transform;
+import static org.jclouds.Constants.PROPERTY_USER_THREADS;
 import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
 import static org.jclouds.location.predicates.LocationPredicates.idEquals;
 import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import javax.inject.Inject;
+import javax.inject.Named;
 
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
@@ -76,6 +80,7 @@ import org.jclouds.openstack.swift.v1.options.UpdateContainerOptions;
 import org.jclouds.openstack.swift.v1.reference.SwiftHeaders;
 
 import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
@@ -89,10 +94,12 @@ import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.ByteSource;
 import com.google.common.net.HttpHeaders;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Injector;
 import com.google.inject.assistedinject.Assisted;
@@ -579,10 +586,33 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
       throw new UnsupportedOperationException();
    }
 
-   // copied from BaseBlobStore
+   @com.google.inject.Inject
+   @Named(PROPERTY_USER_THREADS)
+   @VisibleForTesting
+   ListeningExecutorService userExecutor;
+
+   /**
+    * Upload using a user-provided executor, or the jclouds userExecutor
+    *
+    * @param container
+    * @param blob
+    * @param overrides
+    * @return the multipart blob etag
+    */
    @Beta
    protected String putMultipartBlob(String container, Blob blob, PutOptions overrides) {
-      List<MultipartPart> parts = Lists.newArrayList();
+      if (overrides.getUseCustomExecutor()) {
+         return putMultipartBlob(container, blob, overrides, overrides.getCustomExecutor());
+      } else {
+         return putMultipartBlob(container, blob, overrides, userExecutor);
+      }
+   }
+
+   // copied from BaseBlobStore
+   @Beta
+   protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
+      ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
+
       long contentLength = checkNotNull(blob.getMetadata().getContentMetadata().getContentLength(),
             "must provide content-length to use multi-part upload");
       MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
@@ -590,11 +620,30 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
       long partSize = algorithm.calculateChunkSize(contentLength);
       MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides);
       int partNumber = 1;
+
       for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
-         MultipartPart part = uploadMultipartPart(mpu, partNumber, payload);
-         parts.add(part);
-         ++partNumber;
+         BlobUploader b =
+               new BlobUploader(mpu, partNumber++, payload);
+         parts.add(executor.submit(b));
+      }
+
+      return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
+   }
+
+   private final class BlobUploader implements Callable<MultipartPart> {
+      private final MultipartUpload mpu;
+      private final int partNumber;
+      private final Payload payload;
+
+      BlobUploader(MultipartUpload mpu, int partNumber, Payload payload) {
+         this.mpu = mpu;
+         this.partNumber = partNumber;
+         this.payload = payload;
+      }
+
+      @Override
+      public MultipartPart call() {
+         return uploadMultipartPart(mpu, partNumber, payload);
       }
-      return completeMultipartUpload(mpu, parts);
    }
 }

http://git-wip-us.apache.org/repos/asf/jclouds/blob/6bff97b6/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java
----------------------------------------------------------------------
diff --git a/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java b/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java
index 4cb8ca2..d80ab86 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/options/PutOptions.java
@@ -20,6 +20,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.jclouds.blobstore.domain.BlobAccess;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 /**
  * Contains options supported in the put blob operation. <h2>
  * Usage</h2> The recommended way to instantiate a PutOptions object is to statically import
@@ -36,6 +40,10 @@ public class PutOptions implements Cloneable {
 
    private BlobAccess blobAccess = BlobAccess.PRIVATE;
    private boolean multipart = false;
+   private boolean useCustomExecutor = false;
+
+   // TODO: This exposes ListeningExecutorService to the user, instead of a regular ExecutorService
+   private ListeningExecutorService customExecutor = MoreExecutors.sameThreadExecutor();
 
    public PutOptions() {
    }
@@ -44,6 +52,25 @@ public class PutOptions implements Cloneable {
       this.multipart = multipart;
    }
 
+   /**
+    * Used for clone
+    * @param multipart
+    * @param customExecutor
+    */
+   protected PutOptions(boolean multipart, boolean useCustomExecutor,  ListeningExecutorService customExecutor) {
+      Preconditions.checkNotNull(customExecutor);
+      this.multipart = multipart;
+      this.useCustomExecutor = useCustomExecutor;
+      this.customExecutor = customExecutor;
+   }
+
+   public PutOptions(ListeningExecutorService customExecutor) {
+      Preconditions.checkNotNull(customExecutor);
+      this.multipart = true;
+      this.useCustomExecutor = true;
+      this.customExecutor = customExecutor;
+   }
+
    public static class ImmutablePutOptions extends PutOptions {
       private final PutOptions delegate;
 
@@ -52,6 +79,16 @@ public class PutOptions implements Cloneable {
       }
 
       @Override
+      public ListeningExecutorService getCustomExecutor() {
+         return delegate.getCustomExecutor();
+      }
+
+      @Override
+      public PutOptions setCustomExecutor(ListeningExecutorService customExecutor) {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
       public BlobAccess getBlobAccess() {
          return delegate.getBlobAccess();
       }
@@ -87,6 +124,22 @@ public class PutOptions implements Cloneable {
       return blobAccess;
    }
 
+   public boolean getUseCustomExecutor() {
+      return useCustomExecutor;
+   }
+
+   public ListeningExecutorService getCustomExecutor() {
+      return customExecutor;
+   }
+
+   public PutOptions setCustomExecutor(ListeningExecutorService customExecutor) {
+      Preconditions.checkNotNull(customExecutor);
+      this.multipart = true;
+      this.useCustomExecutor = true;
+      this.customExecutor = customExecutor;
+      return this;
+   }
+
    public PutOptions setBlobAccess(BlobAccess blobAccess) {
       this.blobAccess = checkNotNull(blobAccess);
       return this;
@@ -98,7 +151,7 @@ public class PutOptions implements Cloneable {
 
    /**
     * split large blobs into pieces, if supported by the provider.
-    * 
+    *
     * Equivalent to <code>multipart(true)</code>
     */
    public PutOptions multipart() {
@@ -113,12 +166,25 @@ public class PutOptions implements Cloneable {
       return this;
    }
 
+   /**
+    * Whether to split large blobs into pieces, if supported by the provider, using a custom executor
+    *
+    * @param customExecutor User-provided ListeningExecutorService
+    */
+   public PutOptions multipart(ListeningExecutorService customExecutor) {
+      Preconditions.checkNotNull(customExecutor);
+      this.multipart = true;
+      this.useCustomExecutor = true;
+      this.customExecutor = customExecutor;
+      return this;
+   }
+
    public static class Builder {
 
       public static PutOptions fromPutOptions(PutOptions putOptions) {
          return multipart(putOptions.multipart);
       }
-      
+
       /**
        * @see PutOptions#multipart()
        */
@@ -130,15 +196,23 @@ public class PutOptions implements Cloneable {
          PutOptions options = new PutOptions();
          return options.multipart(val);
       }
+
+      public static PutOptions multipart(ListeningExecutorService customExecutor) {
+         PutOptions options = new PutOptions();
+         return options.multipart(customExecutor);
+      }
    }
 
    @Override
    public PutOptions clone() {
-      return new PutOptions(multipart);
+      return new PutOptions(multipart, useCustomExecutor, customExecutor);
    }
 
    @Override
    public String toString() {
-      return "[multipart=" + multipart + ", blobAccess=" + blobAccess + "]";
+      return "[multipart=" + multipart +
+            ", blobAccess=" + blobAccess +
+            ", useCustomExecutor=" + useCustomExecutor +
+            ", customExecutor=" + customExecutor + "]";
    }
 }