You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/11/29 07:58:45 UTC

[james-project] branch master updated: [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream) (#1334)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new a0c0dcbaf5 [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream) (#1334)
a0c0dcbaf5 is described below

commit a0c0dcbaf5b378a59cfd16a30d905f2204da073c
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Nov 29 14:58:40 2022 +0700

    [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream) (#1334)
---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 65 +++++++++++++++++-----
 1 file changed, 52 insertions(+), 13 deletions(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 970a09e557..6ca3ccd427 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -50,6 +50,7 @@ import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteSource;
@@ -79,12 +80,37 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
+    private static class FileBackedOutputStreamByteSource extends ByteSource {
+        private final FileBackedOutputStream stream;
+        private final long size;
+
+        private FileBackedOutputStreamByteSource(FileBackedOutputStream stream, long size) {
+            Preconditions.checkArgument(size >= 0, "'size' must be positive");
+            this.stream = stream;
+            this.size = size;
+        }
+
+        @Override
+        public InputStream openStream() throws IOException {
+            return stream.asByteSource().openStream();
+        }
+
+        @Override
+        public Optional<Long> sizeIfKnown() {
+            return Optional.of(size);
+        }
+
+        @Override
+        public long size() {
+            return size;
+        }
+    }
 
-    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CHUNK_SIZE = 1024 * 100;
     private static final int EMPTY_BUCKET_BATCH_SIZE = 1000;
     private static final int FILE_THRESHOLD = 1024 * 100;
     private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
@@ -266,7 +292,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             () -> new FileBackedOutputStream(FILE_THRESHOLD),
             fileBackedOutputStream ->
                 Mono.fromCallable(() -> IOUtils.copy(inputStream, fileBackedOutputStream))
-                    .flatMap(ignore -> save(bucketName, blobId, fileBackedOutputStream.asByteSource())),
+                    .flatMap(size -> save(bucketName, blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size))),
             Throwing.consumer(FileBackedOutputStream::reset),
             LAZY)
             .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e))
@@ -277,16 +303,12 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return Mono.using(content::openStream,
-            stream -> Mono.fromFuture(() ->
-                    client.putObject(
-                        Throwing.<PutObjectRequest.Builder>consumer(
-                            builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString()))
-                        .sneakyThrow(),
-                        AsyncRequestBody.fromPublisher(
-                            DataChunker.chunkStream(stream, CHUNK_SIZE)))),
-            Throwing.consumer(InputStream::close),
-            LAZY)
+        return Mono.fromCallable(content::size)
+            .flatMap(contentLength ->
+                Mono.using(content::openStream,
+                    stream -> save(resolvedBucketName, blobId, stream, contentLength),
+                    Throwing.consumer(InputStream::close),
+                    LAZY))
             .retryWhen(createBucketOnRetry(resolvedBucketName))
             .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e))
             .onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e))
@@ -294,6 +316,23 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .then();
     }
 
+    private Mono<PutObjectResponse> save(BucketName resolvedBucketName, BlobId blobId, InputStream stream, long contentLength) {
+        int chunkSize = Math.min((int) contentLength, CHUNK_SIZE);
+
+        return Mono.fromFuture(() -> client.putObject(builder -> builder
+                .bucket(resolvedBucketName.asString())
+                .contentLength(contentLength)
+                .key(blobId.asString()),
+            AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream))));
+    }
+
+    private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) {
+        if (chunkSize == 0) {
+            return Flux.empty();
+        }
+        return DataChunker.chunkStream(stream, chunkSize);
+    }
+
     private RetryBackoffSpec createBucketOnRetry(BucketName bucketName) {
         return RetryBackoffSpec.backoff(MAX_RETRIES, FIRST_BACK_OFF)
             .maxAttempts(MAX_RETRIES)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org