You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jclouds.apache.org by ga...@apache.org on 2023/01/29 08:56:19 UTC

[jclouds] branch master updated: Lazily open parts during LocalBlobStore complete MPU

This is an automated email from the ASF dual-hosted git repository.

gaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/jclouds.git


The following commit(s) were added to refs/heads/master by this push:
     new b7f28f1e6a Lazily open parts during LocalBlobStore complete MPU
b7f28f1e6a is described below

commit b7f28f1e6a2632fce579360c7900b6dc548ff25f
Author: Andrew Gaul <ga...@apache.org>
AuthorDate: Sun Jan 22 13:20:59 2023 +0900

    Lazily open parts during LocalBlobStore complete MPU
    
    This removes a previous workaround for opening too many
    FileInputStream and exhausting rlimits.
---
 .../jclouds/blobstore/config/LocalBlobStore.java   | 43 ++++++++++------------
 1 file changed, 20 insertions(+), 23 deletions(-)

diff --git a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
index c50a405817..abb7bd0d43 100644
--- a/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
+++ b/blobstore/src/main/java/org/jclouds/blobstore/config/LocalBlobStore.java
@@ -28,7 +28,6 @@ import static com.google.common.collect.Sets.newTreeSet;
 import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -731,14 +730,8 @@ public final class LocalBlobStore implements BlobStore {
       Payload payload;
       try {
          InputStream is = blob.getPayload().openStream();
-         if (is instanceof FileInputStream) {
-            // except for FileInputStream since large MPU can open too many fds
-            is.close();
-            payload = blob.getPayload();
-         } else {
-            blob.resetPayload(/*release=*/ false);
-            payload = new InputStreamPayload(is);
-         }
+         blob.resetPayload(/*release=*/ false);
+         payload = new InputStreamPayload(is);
       } catch (IOException ioe) {
          throw new RuntimeException(ioe);
       }
@@ -825,16 +818,16 @@ public final class LocalBlobStore implements BlobStore {
 
    @Override
    public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
-      ImmutableList.Builder<Blob> blobs = ImmutableList.builder();
+      ImmutableList.Builder<BlobMetadata> metas = ImmutableList.builder();
       long contentLength = 0;
       Hasher md5Hasher = Hashing.md5().newHasher();
 
       for (MultipartPart part : parts) {
-         Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
-         contentLength += blobPart.getMetadata().getContentMetadata().getContentLength();
-         blobs.add(blobPart);
-         if (blobPart.getMetadata().getETag() != null) {
-            md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag()));
+         BlobMetadata meta = blobMetadata(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
+         contentLength += meta.getContentMetadata().getContentLength();
+         metas.add(meta);
+         if (meta.getETag() != null) {
+            md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(meta.getETag()));
          }
       }
       String mpuETag = new StringBuilder("\"")
@@ -845,7 +838,7 @@ public final class LocalBlobStore implements BlobStore {
          .toString();
       PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName())
             .userMetadata(mpu.blobMetadata().getUserMetadata())
-            .payload(new MultiBlobInputStream(blobs.build()))
+            .payload(new MultiBlobInputStream(this, metas.build()))
             .contentLength(contentLength)
             .eTag(mpuETag);
       String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl();
@@ -995,21 +988,24 @@ public final class LocalBlobStore implements BlobStore {
    }
 
    private static final class MultiBlobInputStream extends InputStream {
-      private final Iterator<Blob> blobs;
+      private final BlobStore blobStore;
+      private final Iterator<BlobMetadata> metas;
       private InputStream current;
 
-      MultiBlobInputStream(List<Blob> blobs) {
-         this.blobs = blobs.iterator();
+      MultiBlobInputStream(BlobStore blobStore, List<BlobMetadata> metas) {
+         this.blobStore = blobStore;
+         this.metas = metas.iterator();
       }
 
       @Override
       public int read() throws IOException {
          while (true) {
             if (current == null) {
-               if (!blobs.hasNext()) {
+               if (!metas.hasNext()) {
                   return -1;
                }
-               current = blobs.next().getPayload().openStream();
+               BlobMetadata meta = metas.next();
+               current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream();
             }
             int result = current.read();
             if (result == -1) {
@@ -1025,10 +1021,11 @@ public final class LocalBlobStore implements BlobStore {
       public int read(byte[] b, int off, int len) throws IOException {
          while (true) {
             if (current == null) {
-               if (!blobs.hasNext()) {
+               if (!metas.hasNext()) {
                   return -1;
                }
-               current = blobs.next().getPayload().openStream();
+               BlobMetadata meta = metas.next();
+               current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream();
             }
             int result = current.read(b, off, len);
             if (result == -1) {