You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/19 06:33:47 UTC

[james-project] 04/04: [PERF] S3BlobStoreDAO: readBytes copies too much data

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4f3fd88c3cf3db8b5ac091675ed297c2a5b1f0a2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 16 10:50:30 2022 +0700

    [PERF] S3BlobStoreDAO: readBytes copies too much data
    
    CF https://github.com/aws/aws-sdk-java-v2/issues/3193
    
    `AsyncResponseTransformer.toBytes()` relies internally on
    an unsized ByteArrayOutputStream (that thus will expend
    many times) and the result will be copied when calling
    `toByteArray()` and another defensive copy is carried
    other when transforming the future.
    
    We thus implement a response transformer variation:
    
     - Remove needless defensive copies inside
    `ByteArrayAsyncResponseTransformer`. The byte array is
    passed to the caller, who then becomes responsible of it,
    and nobody else references the old byte array once the
    publisher completes. This can be an instant win coming at
    a very low price.
     - Rely on `GetResponse::contentLength` to size a byte
    array and copy incoming buffers to it in place. This
    requires knowledge about response type... Thus this might
    be hardly doable in a generic fashion.
    
    On a typical IMAP benchmark S3 getObject toBytes
    transformation takes 1.66% of overall memory allocation.
---
 .../aws/MinimalCopyBytesResponseTransformer.java   | 105 +++++++++++++++++++++
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |   2 +-
 2 files changed, 106 insertions(+), 1 deletion(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/MinimalCopyBytesResponseTransformer.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/MinimalCopyBytesResponseTransformer.java
new file mode 100644
index 0000000000..6979fba301
--- /dev/null
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/MinimalCopyBytesResponseTransformer.java
@@ -0,0 +1,105 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.objectstorage.aws;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.core.async.SdkPublisher;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+
+/**
+ * Class copied for {@link software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer}
+ *
+ * Modified to take advantage of the content length of the get response in order to use a sized array
+ * upon content copy. This avoids the usage of a ByteArrayOutputStream that yields additional copies
+ * (resizing upon copy, copy of the resulting byte array).
+ *
+ * A defensive copy upon returning the result is also removed (responsibility transfered to the caller, no other usages)
+ */
+public class MinimalCopyBytesResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, ResponseBytes<GetObjectResponse>> {
+    private volatile CompletableFuture<byte[]> cf;
+    private volatile GetObjectResponse response;
+
+    public MinimalCopyBytesResponseTransformer() {
+
+    }
+
+    public CompletableFuture<ResponseBytes<GetObjectResponse>> prepare() {
+        this.cf = new CompletableFuture();
+        // Modifcation: Remove a defensive copy of the buffer upon completion: the caller is now the sole user of the array
+        return this.cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr));
+    }
+
+    public void onResponse(GetObjectResponse response) {
+        this.response = response;
+    }
+
+    public void onStream(SdkPublisher<ByteBuffer> publisher) {
+        publisher.subscribe(new BaosSubscriber(this.cf, response.contentLength().intValue()));
+    }
+
+    public void exceptionOccurred(Throwable throwable) {
+        this.cf.completeExceptionally(throwable);
+    }
+
+    static class BaosSubscriber implements Subscriber<ByteBuffer> {
+        private final CompletableFuture<byte[]> resultFuture;
+        // Modification: use a byte array instead of the ByteArrayInputStream and track position
+        private final byte[] buffer;
+        private int pos = 0;
+        private Subscription subscription;
+
+        BaosSubscriber(CompletableFuture<byte[]> resultFuture, int size) {
+            this.resultFuture = resultFuture;
+            this.buffer = new byte[size];
+        }
+
+        public void onSubscribe(Subscription s) {
+            if (this.subscription != null) {
+                s.cancel();
+            } else {
+                this.subscription = s;
+                this.subscription.request(9223372036854775807L);
+            }
+        }
+
+        public void onNext(ByteBuffer byteBuffer) {
+            // Modification: copy the response part in place into the result buffer and track position
+            int written = byteBuffer.remaining();
+            byteBuffer.get(buffer, pos, written);
+            pos += written;
+            this.subscription.request(1L);
+        }
+
+        public void onError(Throwable throwable) {
+            this.resultFuture.completeExceptionally(throwable);
+        }
+
+        public void onComplete() {
+            this.resultFuture.complete(this.buffer);
+        }
+    }
+}
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 66097a93fc..9c3392d31e 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -213,7 +213,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
         return Mono.fromFuture(() ->
                 client.getObject(
                     builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()),
-                    AsyncResponseTransformer.toBytes()))
+                    new MinimalCopyBytesResponseTransformer()))
             .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
             .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e))
             .publishOn(Schedulers.parallel())


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org