You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by GitBox <gi...@apache.org> on 2022/11/25 05:08:51 UTC

[GitHub] [james-project] chibenwa opened a new pull request, #1334: [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream)

chibenwa opened a new pull request, #1334:
URL: https://github.com/apache/james-project/pull/1334

    - Retrieve the size without needing to recompute it
    - Reduce the chunking from 1MB to 100KB
    - No need to allocate more that the stream size


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1334: [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream)

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1334:
URL: https://github.com/apache/james-project/pull/1334#discussion_r1033203581


##########
server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java:
##########
@@ -326,6 +326,13 @@ public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content)
         }
     }
 
+    private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) {
+        if (chunkSize == 0L) {

Review Comment:
   Convert chunkSize from Long to Int, and compare `L` here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1334: [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream)

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1334:
URL: https://github.com/apache/james-project/pull/1334#discussion_r1033215986


##########
server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java:
##########
@@ -276,22 +302,35 @@ private Mono<Void> uploadUsingFile(BucketName bucketName, BlobId blobId, InputSt
     @Override
     public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+        try {
+            long contentLength = content.size();
+            int chunkSize = Math.min((int) contentLength, CHUNK_SIZE);
 
-        return Mono.using(content::openStream,
-            stream -> Mono.fromFuture(() ->
+            return Mono.using(content::openStream,
+                stream -> Mono.fromFuture(() ->
                     client.putObject(
                         Throwing.<PutObjectRequest.Builder>consumer(
-                            builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString()))
-                        .sneakyThrow(),
+                            builder -> builder.bucket(resolvedBucketName.asString()).contentLength(contentLength).key(blobId.asString()))
+                            .sneakyThrow(),
                         AsyncRequestBody.fromPublisher(
-                            DataChunker.chunkStream(stream, CHUNK_SIZE)))),
-            Throwing.consumer(InputStream::close),
-            LAZY)
-            .retryWhen(createBucketOnRetry(resolvedBucketName))
-            .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e))
-            .onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e))
-            .publishOn(Schedulers.parallel())
-            .then();
+                            chunkStream(chunkSize, stream)))),
+                Throwing.consumer(InputStream::close),
+                LAZY)
+                .retryWhen(createBucketOnRetry(resolvedBucketName))
+                .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e))
+                .onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e))
+                .publishOn(Schedulers.parallel())
+                .then();
+        } catch (IOException e) {
+            return Mono.error(new ObjectStoreIOException("Error saving blob", e));
+        }
+    }

Review Comment:
   ```suggestion
   return Mono.fromCallable(content::size)
               .map(contentLength -> Pair.of(contentLength, Math.min(contentLength.intValue(), CHUNK_SIZE)))
               .flatMap(contentLengthAndChunkSize -> Mono.using(content::openStream,
                   stream -> Mono.fromFuture(() ->
                       client.putObject(
                           Throwing.<PutObjectRequest.Builder>consumer(
                                   builder -> builder.bucket(resolvedBucketName.asString()).contentLength(contentLengthAndChunkSize.getLeft()).key(blobId.asString()))
                               .sneakyThrow(),
                           AsyncRequestBody.fromPublisher(
                               chunkStream(contentLengthAndChunkSize.getValue(), stream)))),
                   Throwing.consumer(InputStream::close),
                   LAZY))
               .retryWhen(createBucketOnRetry(resolvedBucketName))
               .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e))
               .onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e))
               .publishOn(Schedulers.parallel())
               .then();
   ```
   Will be more readable when lost try-catch block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] Arsnael merged pull request #1334: [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream)

Posted by GitBox <gi...@apache.org>.
Arsnael merged PR #1334:
URL: https://github.com/apache/james-project/pull/1334


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1334: [PERF] Reduce memory allocation upon S3BlobStoreDAO::save(InputStream)

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1334:
URL: https://github.com/apache/james-project/pull/1334#discussion_r1033210306


##########
server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java:
##########
@@ -326,6 +326,13 @@ public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content)
         }
     }
 
+    private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) {
+        if (chunkSize == 0L) {

Review Comment:
   => `  if (chunkSize == 0)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org