You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/13 12:38:10 UTC

[james-project] 01/07: JAMES-2725 Remove length from Blob API This means that the S3 implementation should implement its own way to put blobs without knowing the length

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9e2c0cfef679a6e92e0bd92abaf0607de7948ef3
Author: Antoine Duprat <ad...@linagora.com>
AuthorDate: Fri Apr 5 11:37:07 2019 +0200

    JAMES-2725 Remove length from Blob API
    This means that the S3 implementation should implement its own way to put blobs without knowing the length
---
 .../java/org/apache/james/blob/api/BlobStore.java  |  2 +-
 .../apache/james/blob/api/MetricableBlobStore.java |  4 +-
 .../main/java/org/apache/james/blob/api/Store.java | 28 +------
 .../apache/james/blob/api/BlobStoreContract.java   |  6 +-
 .../james/blob/api/FixedLengthInputStreamTest.java | 63 ----------------
 .../blob/api/MetricableBlobStoreContract.java      |  4 +-
 .../james/blob/cassandra/CassandraBlobsDAO.java    |  2 +-
 .../blob/cassandra/CassandraBlobsDAOTest.java      |  2 +-
 .../apache/james/blob/memory/MemoryBlobStore.java  |  2 +-
 server/blob/blob-objectstorage/pom.xml             |  5 ++
 .../blob/objectstorage/ObjectStorageBlobsDAO.java  | 17 +++--
 .../ObjectStorageBlobsDAOBuilder.java              | 19 ++++-
 .../james/blob/objectstorage/PutBlobFunction.java  | 37 ++++++++++
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 85 +++++++++++++++++++++-
 .../objectstorage/ObjectStorageBlobsDAOTest.java   |  2 +-
 .../aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java |  4 +-
 .../apache/james/blob/union/UnionBlobStore.java    |  8 +-
 .../james/blob/union/UnionBlobStoreTest.java       | 14 ++--
 .../apache/james/blob/mail/MimeMessageStore.java   |  6 +-
 .../ObjectStorageDependenciesModule.java           | 14 ++++
 .../james/CassandraRabbitMQJamesServerTest.java    | 10 +--
 .../james/webadmin/vault/routes/ExportService.java |  2 +-
 22 files changed, 205 insertions(+), 131 deletions(-)

diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index d22be29..762d916 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -27,7 +27,7 @@ public interface BlobStore {
 
     Mono<BlobId> save(byte[] data);
 
-    Mono<BlobId> save(InputStream data, long contentLength);
+    Mono<BlobId> save(InputStream data);
 
     Mono<byte[]> readBytes(BlobId blobId);
 
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index b51e37b..4ed7b17 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -54,9 +54,9 @@ public class MetricableBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data, long contentLength) {
+    public Mono<BlobId> save(InputStream data) {
         return metricFactory
-            .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data, contentLength));
+            .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data));
     }
 
     @Override
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index 3cd4afa..07a5611 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -26,7 +26,6 @@ import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
 
-import com.google.common.base.Preconditions;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
@@ -67,7 +66,7 @@ public interface Store<T, I> {
     class Impl<T, I extends BlobPartsId> implements Store<T, I> {
 
         public interface Encoder<T> {
-            Stream<Pair<BlobType, FixedLengthInputStream>> encode(T t);
+            Stream<Pair<BlobType, InputStream>> encode(T t);
         }
 
         public interface Decoder<T> {
@@ -94,9 +93,9 @@ public interface Store<T, I> {
                 .map(idFactory::generate);
         }
 
-        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, FixedLengthInputStream> entry) {
+        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) {
             return Mono.just(entry.getLeft())
-                .zipWith(blobStore.save(entry.getRight().getInputStream(), entry.getRight().getContentLength()));
+                .zipWith(blobStore.save(entry.getRight()));
         }
 
         @Override
@@ -111,25 +110,4 @@ public interface Store<T, I> {
                 .map(decoder::decode);
         }
     }
-
-    class FixedLengthInputStream {
-
-        private final InputStream inputStream;
-        private final long contentLength;
-
-        public FixedLengthInputStream(InputStream inputStream, long contentLength) {
-            Preconditions.checkNotNull(inputStream, "'inputStream' is mandatory");
-            Preconditions.checkArgument(contentLength >= 0, "'contentLength' should be greater than or equal to 0");
-            this.inputStream = inputStream;
-            this.contentLength = contentLength;
-        }
-
-        public InputStream getInputStream() {
-            return inputStream;
-        }
-
-        public long getContentLength() {
-            return contentLength;
-        }
-    }
 }
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
index dbb9863..5cb31be 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
@@ -56,7 +56,7 @@ public interface BlobStoreContract {
 
     @Test
     default void saveShouldThrowWhenNullInputStream() {
-        assertThatThrownBy(() -> testee().save((InputStream) null, 0).block())
+        assertThatThrownBy(() -> testee().save((InputStream) null).block())
             .isInstanceOf(NullPointerException.class);
     }
 
@@ -80,7 +80,7 @@ public interface BlobStoreContract {
 
     @Test
     default void saveShouldSaveEmptyInputStream() {
-        BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY), EMPTY_BYTEARRAY.length).block();
+        BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
 
         byte[] bytes = testee().readBytes(blobId).block();
 
@@ -104,7 +104,7 @@ public interface BlobStoreContract {
     @Test
     default void saveShouldReturnBlobIdOfInputStream() {
         BlobId blobId =
-            testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY), SHORT_BYTEARRAY.length).block();
+            testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).block();
 
         assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
     }
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java
deleted file mode 100644
index c442b0c..0000000
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.james.blob.api;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import java.io.ByteArrayInputStream;
-import java.nio.charset.StandardCharsets;
-
-import org.junit.jupiter.api.Test;
-
-class FixedLengthInputStreamTest {
-
-    @Test
-    void fixedLengthInputStreamShouldThrowWhenInputStreamIsNull() {
-        assertThatThrownBy(() -> new Store.FixedLengthInputStream(null, 0))
-            .isInstanceOf(NullPointerException.class)
-            .hasMessage("'inputStream' is mandatory");
-    }
-
-    @Test
-    void fixedLengthInputStreamShouldThrowWhenContentLengthIsNegative() {
-        assertThatThrownBy(() -> new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), -1))
-            .isInstanceOf(IllegalArgumentException.class)
-            .hasMessage("'contentLength' should be greater than or equal to 0");
-    }
-
-    @Test
-    void lengthShouldBeStored() {
-        int contentLength = 1;
-
-        Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), contentLength);
-
-        assertThat(testee.getContentLength()).isEqualTo(contentLength);
-    }
-
-    @Test
-    void inputStreamShouldBeStored() {
-        ByteArrayInputStream inputStream = new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8));
-
-        Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(inputStream, 1);
-
-        assertThat(testee.getInputStream()).hasSameContentAs(inputStream);
-    }
-}
\ No newline at end of file
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
index 7ed9b7e..ce57b53 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
@@ -75,8 +75,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
 
     @Test
     default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() {
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
 
         assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_INPUT_STREAM_TIMER_NAME))
             .hasSize(2);
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index b637601..444c55a 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -212,7 +212,7 @@ public class CassandraBlobsDAO implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data, long contentLength) {
+    public Mono<BlobId> save(InputStream data) {
         Preconditions.checkNotNull(data);
         return Mono.fromCallable(() -> IOUtils.toByteArray(data))
             .flatMap(this::saveAsMono);
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
index 9831bed..a372a35 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
@@ -81,7 +81,7 @@ public class CassandraBlobsDAOTest implements MetricableBlobStoreContract {
 
     @Test
     void blobStoreShouldSupport100MBBlob() {
-        BlobId blobId = testee.save(new ZeroedInputStream(100_000_000), 100_000_000).block();
+        BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).block();
         InputStream bytes = testee.read(blobId);
         assertThat(bytes).hasSameContentAs(new ZeroedInputStream(100_000_000));
     }
diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
index c47d83f..21f47ef 100644
--- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
+++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
@@ -57,7 +57,7 @@ public class MemoryBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data, long contentLength) {
+    public Mono<BlobId> save(InputStream data) {
         Preconditions.checkNotNull(data);
         try {
             byte[] bytes = IOUtils.toByteArray(data);
diff --git a/server/blob/blob-objectstorage/pom.xml b/server/blob/blob-objectstorage/pom.xml
index fe8481d..c1bbf81 100644
--- a/server/blob/blob-objectstorage/pom.xml
+++ b/server/blob/blob-objectstorage/pom.xml
@@ -67,6 +67,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>1.11.532</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.crypto.tink</groupId>
             <artifactId>tink</artifactId>
             <version>1.2.0</version>
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index 6000680..113d6b4 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -54,13 +54,17 @@ public class ObjectStorageBlobsDAO implements BlobStore {
 
     private final ContainerName containerName;
     private final org.jclouds.blobstore.BlobStore blobStore;
+    private final PutBlobFunction putBlobFunction;
     private final PayloadCodec payloadCodec;
 
     ObjectStorageBlobsDAO(ContainerName containerName, BlobId.Factory blobIdFactory,
-                          org.jclouds.blobstore.BlobStore blobStore, PayloadCodec payloadCodec) {
+                          org.jclouds.blobstore.BlobStore blobStore,
+                          PutBlobFunction putBlobFunction,
+                          PayloadCodec payloadCodec) {
         this.blobIdFactory = blobIdFactory;
         this.containerName = containerName;
         this.blobStore = blobStore;
+        this.putBlobFunction = putBlobFunction;
         this.payloadCodec = payloadCodec;
     }
 
@@ -89,15 +93,15 @@ public class ObjectStorageBlobsDAO implements BlobStore {
 
     @Override
     public Mono<BlobId> save(byte[] data) {
-        return save(new ByteArrayInputStream(data), data.length);
+        return save(new ByteArrayInputStream(data));
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data, long contentLength) {
+    public Mono<BlobId> save(InputStream data) {
         Preconditions.checkNotNull(data);
 
         BlobId tmpId = blobIdFactory.randomId();
-        return save(data, contentLength, tmpId)
+        return save(data, tmpId)
             .flatMap(id -> updateBlobId(tmpId, id));
     }
 
@@ -109,16 +113,15 @@ public class ObjectStorageBlobsDAO implements BlobStore {
             .thenReturn(to);
     }
 
-    private Mono<BlobId> save(InputStream data, long contentLength, BlobId id) {
+    private Mono<BlobId> save(InputStream data, BlobId id) {
         String containerName = this.containerName.value();
         HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
         Payload payload = payloadCodec.write(hashingInputStream);
         Blob blob = blobStore.blobBuilder(id.asString())
                             .payload(payload.getPayload())
-                            .contentLength(payload.getLength().orElse(contentLength))
                             .build();
 
-        return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob))
+        return Mono.fromRunnable(() -> putBlobFunction.putBlob(blob))
             .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString())));
     }
 
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java
index 4e5f941..f6aa7ff 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOBuilder.java
@@ -50,12 +50,14 @@ public class ObjectStorageBlobsDAOBuilder {
         private final ContainerName containerName;
         private final BlobId.Factory blobIdFactory;
         private Optional<PayloadCodec> payloadCodec;
+        private Optional<PutBlobFunction> putBlob;
 
         public ReadyToBuild(Supplier<BlobStore> supplier, BlobId.Factory blobIdFactory, ContainerName containerName) {
             this.blobIdFactory = blobIdFactory;
             this.containerName = containerName;
             this.payloadCodec = Optional.empty();
             this.supplier = supplier;
+            this.putBlob = Optional.empty();
         }
 
         public ReadyToBuild payloadCodec(PayloadCodec payloadCodec) {
@@ -68,11 +70,26 @@ public class ObjectStorageBlobsDAOBuilder {
             return this;
         }
 
+        public ReadyToBuild putBlob(Optional<PutBlobFunction> putBlob) {
+            this.putBlob = putBlob;
+            return this;
+        }
+
         public ObjectStorageBlobsDAO build() {
             Preconditions.checkState(containerName != null);
             Preconditions.checkState(blobIdFactory != null);
 
-            return new ObjectStorageBlobsDAO(containerName, blobIdFactory, supplier.get(), payloadCodec.orElse(PayloadCodec.DEFAULT_CODEC));
+            BlobStore blobStore = supplier.get();
+
+            return new ObjectStorageBlobsDAO(containerName,
+                blobIdFactory,
+                blobStore,
+                putBlob.orElse(defaultPutBlob(blobStore)),
+                payloadCodec.orElse(PayloadCodec.DEFAULT_CODEC));
+        }
+
+        private PutBlobFunction defaultPutBlob(BlobStore blobStore) {
+            return (blob) -> blobStore.putBlob(containerName.value(), blob);
         }
 
         @VisibleForTesting
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PutBlobFunction.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PutBlobFunction.java
new file mode 100644
index 0000000..ac58aef
--- /dev/null
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PutBlobFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.james.blob.objectstorage;
+
+import org.jclouds.blobstore.domain.Blob;
+
+/**
+ * Implementations may have specific behaviour when uploading a blob,
+ * such cases are not well handled by jClouds.
+ *
+ * For example:
+ * AWS S3 need a length while uploading with jClouds
+ * whereas you don't need one by using the S3 client.
+ *
+ */
+@FunctionalInterface
+public interface PutBlobFunction {
+
+    void putBlob(Blob blob);
+}
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index be5eac5..7940ef0 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -19,27 +19,108 @@
 
 package org.apache.james.blob.objectstorage.aws;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Supplier;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.objectstorage.ContainerName;
 import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAOBuilder;
+import org.apache.james.blob.objectstorage.PutBlobFunction;
+import org.apache.james.util.Size;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.jclouds.ContextBuilder;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
 
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Module;
 
 public class AwsS3ObjectStorage {
 
-    private static final Iterable<Module> JCLOUDS_MODULES =
-        ImmutableSet.of(new SLF4JLoggingModule());
+    private static final Iterable<Module> JCLOUDS_MODULES = ImmutableSet.of(new SLF4JLoggingModule());
+    public static final int MAX_THREADS = 5;
+    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(MAX_THREADS, NamedThreadFactory.withClassName(AwsS3ObjectStorage.class));
+    private static final boolean DO_NOT_SHUTDOWN_THREAD_POOL = false;
+    private static Size MULTIPART_UPLOAD_THRESHOLD;
+
+    static {
+        try {
+            MULTIPART_UPLOAD_THRESHOLD = Size.parse("5M");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     public static ObjectStorageBlobsDAOBuilder.RequireContainerName daoBuilder(AwsS3AuthConfiguration configuration) {
         return ObjectStorageBlobsDAOBuilder.forBlobStore(new BlobStoreBuilder(configuration));
     }
 
+    public static Optional<PutBlobFunction> putBlob(BlobId.Factory blobIdFactory, ContainerName containerName, AwsS3AuthConfiguration configuration) {
+        return Optional.of((blob) -> {
+            File file = null;
+            try {
+                file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
+                FileUtils.copyToFile(blob.getPayload().openStream(), file);
+
+                put(blobIdFactory, containerName, configuration, blob, file);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } finally {
+                if (file != null) {
+                    FileUtils.deleteQuietly(file);
+                }
+            }
+        });
+    }
+
+    private static void put(BlobId.Factory blobIdFactory, ContainerName containerName, AwsS3AuthConfiguration configuration, Blob blob, File file) {
+        try {
+            PutObjectRequest request = new PutObjectRequest(containerName.value(),
+                blob.getMetadata().getName(),
+                file);
+
+            getTransferManager(configuration)
+                .upload(request)
+                .waitForUploadResult();
+        } catch (AmazonClientException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static TransferManager getTransferManager(AwsS3AuthConfiguration configuration) {
+        AmazonS3 amazonS3 = AmazonS3ClientBuilder
+            .standard()
+            .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(configuration.getAccessKeyId(), configuration.getSecretKey())))
+            .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(configuration.getEndpoint(), null))
+            .build();
+
+        return TransferManagerBuilder
+            .standard()
+            .withS3Client(amazonS3)
+            .withMultipartUploadThreshold(MULTIPART_UPLOAD_THRESHOLD.getValue())
+            .withExecutorFactory(() -> EXECUTOR_SERVICE)
+            .withShutDownThreadPools(DO_NOT_SHUTDOWN_THREAD_POOL)
+            .build();
+    }
+
     private static class BlobStoreBuilder implements Supplier<BlobStore> {
         private final AwsS3AuthConfiguration configuration;
 
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
index b1ec0c4..caabb42 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
@@ -189,7 +189,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
     @Test
     void saveInputStreamShouldNotCompleteWhenDoesNotAwait() {
         Mono<BlobId> blobIdFuture = testee
-            .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)), BIG_STRING.length())
+            .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)))
             .subscribeOn(Schedulers.elastic());
         assertThat(blobIdFuture.toFuture()).isNotCompleted();
     }
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java
index 147c130..9ecda9f 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorageBlobsDAOBuilderTest.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.util.UUID;
 
 import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.TestBlobId;
 import org.apache.james.blob.objectstorage.ContainerName;
 import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAO;
 import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAOBuilder;
@@ -78,7 +79,8 @@ class AwsS3ObjectStorageBlobsDAOBuilderTest implements ObjectStorageBlobsDAOCont
         ObjectStorageBlobsDAOBuilder.ReadyToBuild builder = ObjectStorageBlobsDAO
             .builder(configuration)
             .container(containerName)
-            .blobIdFactory(new HashBlobId.Factory());
+            .blobIdFactory(new HashBlobId.Factory())
+            .putBlob(AwsS3ObjectStorage.putBlob(new TestBlobId.Factory(), containerName, configuration));
 
         assertBlobsDAOCanStoreAndRetrieve(builder);
     }
diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
index 07d0830..1991196 100644
--- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
+++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
@@ -105,14 +105,14 @@ public class UnionBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data, long contentLength) {
+    public Mono<BlobId> save(InputStream data) {
         try {
             return saveToCurrentFallbackIfFails(
-                Mono.defer(() -> currentBlobStore.save(data, contentLength)),
-                () -> Mono.defer(() -> legacyBlobStore.save(data, contentLength)));
+                Mono.defer(() -> currentBlobStore.save(data)),
+                () -> Mono.defer(() -> legacyBlobStore.save(data)));
         } catch (Exception e) {
             LOGGER.error("exception directly happens while saving InputStream data, fall back to legacy blob store", e);
-            return legacyBlobStore.save(data, contentLength);
+            return legacyBlobStore.save(data);
         }
     }
 
diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
index d567a94..1956350 100644
--- a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
+++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
@@ -66,7 +66,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
         }
 
         @Override
-        public Mono<BlobId> save(InputStream data, long contentLength) {
+        public Mono<BlobId> save(InputStream data) {
             return Mono.error(new RuntimeException("broken everywhere"));
         }
 
@@ -100,7 +100,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
         }
 
         @Override
-        public Mono<BlobId> save(InputStream data, long contentLength) {
+        public Mono<BlobId> save(InputStream data) {
             throw new RuntimeException("broken everywhere");
         }
 
@@ -176,7 +176,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
                 .current(new ThrowingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
+            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -214,7 +214,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
                 .current(new FailingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
+            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -297,7 +297,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
             return Stream.of(
                 blobStore -> blobStore.save(BLOB_CONTENT),
                 blobStore -> blobStore.save(STRING_CONTENT),
-                blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length),
+                blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT)),
                 blobStore -> blobStore.readBytes(BLOB_ID_FACTORY.randomId()));
         }
 
@@ -423,7 +423,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
 
     @Test
     void saveInputStreamShouldWriteToCurrent() {
-        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
+        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
 
         assertThat(currentBlobStore.readBytes(blobId).block())
             .isEqualTo(BLOB_CONTENT);
@@ -431,7 +431,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
 
     @Test
     void saveInputStreamShouldNotWriteToLegacy() {
-        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
+        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
 
         assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block())
             .isInstanceOf(ObjectStoreException.class);
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
index 506b895..12683f9 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
@@ -69,15 +69,15 @@ public class MimeMessageStore {
 
     static class MimeMessageEncoder implements Store.Impl.Encoder<MimeMessage> {
         @Override
-        public Stream<Pair<BlobType, Store.FixedLengthInputStream>> encode(MimeMessage message) {
+        public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) {
             try {
                 byte[] messageAsArray = messageToArray(message);
                 int bodyStartOctet = computeBodyStartOctet(messageAsArray);
                 byte[] headerBytes = getHeaderBytes(messageAsArray, bodyStartOctet);
                 byte[] bodyBytes = getBodyBytes(messageAsArray, bodyStartOctet);
                 return Stream.of(
-                    Pair.of(HEADER_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(headerBytes), headerBytes.length)),
-                    Pair.of(BODY_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(bodyBytes), bodyBytes.length)));
+                    Pair.of(HEADER_BLOB_TYPE, new ByteArrayInputStream(headerBytes)),
+                    Pair.of(BODY_BLOB_TYPE, new ByteArrayInputStream(bodyBytes)));
             } catch (MessagingException | IOException e) {
                 throw new RuntimeException(e);
             }
diff --git a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
index 0d725ed..23c40b9 100644
--- a/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
+++ b/server/container/guice/blob-objectstorage-guice/src/main/java/org/apache/james/modules/objectstorage/ObjectStorageDependenciesModule.java
@@ -21,6 +21,7 @@ package org.apache.james.modules.objectstorage;
 
 import java.io.FileNotFoundException;
 import java.time.Duration;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
@@ -32,6 +33,7 @@ import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAO;
 import org.apache.james.blob.objectstorage.ObjectStorageBlobsDAOBuilder;
+import org.apache.james.blob.objectstorage.PutBlobFunction;
 import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration;
 import org.apache.james.blob.objectstorage.aws.AwsS3ObjectStorage;
 import org.apache.james.modules.mailbox.ConfigurationComponent;
@@ -67,6 +69,7 @@ public class ObjectStorageDependenciesModule extends AbstractModule {
             .container(configuration.getNamespace())
             .blobIdFactory(blobIdFactory)
             .payloadCodec(configuration.getPayloadCodec())
+            .putBlob(putBlob(blobIdFactory, configuration))
             .build();
         dao.createContainer(configuration.getNamespace()).block(Duration.ofMinutes(1));
         return dao;
@@ -82,4 +85,15 @@ public class ObjectStorageDependenciesModule extends AbstractModule {
         throw new IllegalArgumentException("unknown provider " + configuration.getProvider());
     }
 
+    private Optional<PutBlobFunction> putBlob(BlobId.Factory blobIdFactory, ObjectStorageBlobConfiguration configuration) {
+        switch (configuration.getProvider()) {
+            case SWIFT:
+                return Optional.empty();
+            case AWSS3:
+                return AwsS3ObjectStorage.putBlob(blobIdFactory, configuration.getNamespace(), (AwsS3AuthConfiguration) configuration.getSpecificAuthConfiguration());
+        }
+        throw new IllegalArgumentException("unknown provider " + configuration.getProvider());
+
+    }
+
 }
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
index 4c3c8a0..631c93a 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
@@ -94,7 +94,7 @@ class CassandraRabbitMQJamesServerTest {
             .overrideWith(JmapJamesServerContract.DOMAIN_LIST_CONFIGURATION_MODULE);
 
     @Nested
-    @TestInstance(Lifecycle.PER_METHOD)
+    @TestInstance(Lifecycle.PER_CLASS)
     class WithEncryptedSwift implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder()
@@ -113,7 +113,7 @@ class CassandraRabbitMQJamesServerTest {
     }
 
     @Nested
-    @TestInstance(Lifecycle.PER_METHOD)
+    @TestInstance(Lifecycle.PER_CLASS)
     class WithDefaultSwift implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder()
@@ -131,14 +131,14 @@ class CassandraRabbitMQJamesServerTest {
     }
 
     @Nested
-    @TestInstance(Lifecycle.PER_METHOD)
+    @TestInstance(Lifecycle.PER_CLASS)
     class WithoutSwiftOrAwsS3 implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder().build();
     }
 
     @Nested
-    @TestInstance(Lifecycle.PER_METHOD)
+    @TestInstance(Lifecycle.PER_CLASS)
     class WithEncryptedAwsS3 implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder()
@@ -157,7 +157,7 @@ class CassandraRabbitMQJamesServerTest {
     }
 
     @Nested
-    @TestInstance(Lifecycle.PER_METHOD)
+    @TestInstance(Lifecycle.PER_CLASS)
     class WithDefaultAwsS3 implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder()
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
index a3ca2ce..14c100c 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
@@ -74,7 +74,7 @@ class ExportService {
         try (FileBackedOutputStream fileOutputStream = new FileBackedOutputStream(FileUtils.ONE_MB_BI.intValue())) {
             zipper.zip(contentLoader(user), messages.toStream(), fileOutputStream);
             ByteSource byteSource = fileOutputStream.asByteSource();
-            return blobStore.save(byteSource.openStream(), byteSource.size()).block();
+            return blobStore.save(byteSource.openStream()).block();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org