You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/03/22 16:18:46 UTC

[james-project] 02/04: JAMES-2671 Add content length in save BlobStore API

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b028cd499b20bce86b174dcb4962797bef460452
Author: Antoine Duprat <ad...@linagora.com>
AuthorDate: Mon Mar 4 14:21:58 2019 +0100

    JAMES-2671 Add content length in save BlobStore API
---
 .../java/org/apache/james/blob/api/BlobStore.java  |  2 +-
 .../apache/james/blob/api/MetricableBlobStore.java |  4 +-
 .../main/java/org/apache/james/blob/api/Store.java | 28 ++++++++--
 .../apache/james/blob/api/BlobStoreContract.java   |  6 +--
 .../james/blob/api/FixedLengthInputStreamTest.java | 63 ++++++++++++++++++++++
 .../blob/api/MetricableBlobStoreContract.java      |  6 +--
 .../james/blob/cassandra/CassandraBlobsDAO.java    |  2 +-
 .../blob/cassandra/CassandraBlobsDAOTest.java      |  2 +-
 .../apache/james/blob/memory/MemoryBlobStore.java  |  2 +-
 .../blob/objectstorage/ObjectStorageBlobsDAO.java  | 16 +++---
 .../objectstorage/ObjectStorageBlobsDAOTest.java   | 20 ++++---
 .../apache/james/blob/union/UnionBlobStore.java    |  8 +--
 .../james/blob/union/UnionBlobStoreTest.java       | 14 ++---
 .../apache/james/blob/mail/MimeMessageStore.java   | 10 ++--
 14 files changed, 141 insertions(+), 42 deletions(-)

diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 8b68d93..3d28ab5 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -26,7 +26,7 @@ public interface BlobStore {
 
     Mono<BlobId> save(byte[] data);
 
-    Mono<BlobId> save(InputStream data);
+    Mono<BlobId> save(InputStream data, long contentLength);
 
     Mono<byte[]> readBytes(BlobId blobId);
 
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index 4ed7b17..b51e37b 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -54,9 +54,9 @@ public class MetricableBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data, long contentLength) {
         return metricFactory
-            .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data));
+            .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data, contentLength));
     }
 
     @Override
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index 07a5611..3cd4afa 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -26,6 +26,7 @@ import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import com.google.common.base.Preconditions;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
@@ -66,7 +67,7 @@ public interface Store<T, I> {
     class Impl<T, I extends BlobPartsId> implements Store<T, I> {
 
         public interface Encoder<T> {
-            Stream<Pair<BlobType, InputStream>> encode(T t);
+            Stream<Pair<BlobType, FixedLengthInputStream>> encode(T t);
         }
 
         public interface Decoder<T> {
@@ -93,9 +94,9 @@ public interface Store<T, I> {
                 .map(idFactory::generate);
         }
 
-        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) {
+        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, FixedLengthInputStream> entry) {
             return Mono.just(entry.getLeft())
-                .zipWith(blobStore.save(entry.getRight()));
+                .zipWith(blobStore.save(entry.getRight().getInputStream(), entry.getRight().getContentLength()));
         }
 
         @Override
@@ -110,4 +111,25 @@ public interface Store<T, I> {
                 .map(decoder::decode);
         }
     }
+
+    class FixedLengthInputStream {
+
+        private final InputStream inputStream;
+        private final long contentLength;
+
+        public FixedLengthInputStream(InputStream inputStream, long contentLength) {
+            Preconditions.checkNotNull(inputStream, "'inputStream' is mandatory");
+            Preconditions.checkArgument(contentLength >= 0, "'contentLength' should be greater than or equal to 0");
+            this.inputStream = inputStream;
+            this.contentLength = contentLength;
+        }
+
+        public InputStream getInputStream() {
+            return inputStream;
+        }
+
+        public long getContentLength() {
+            return contentLength;
+        }
+    }
 }
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
index 2164c74..6983485 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
@@ -49,7 +49,7 @@ public interface BlobStoreContract {
 
     @Test
     default void saveShouldThrowWhenNullInputStream() {
-        assertThatThrownBy(() -> testee().save((InputStream) null).block())
+        assertThatThrownBy(() -> testee().save((InputStream) null, 0).block())
             .isInstanceOf(NullPointerException.class);
     }
 
@@ -64,7 +64,7 @@ public interface BlobStoreContract {
 
     @Test
     default void saveShouldSaveEmptyInputStream() {
-        BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
+        BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY), EMPTY_BYTEARRAY.length).block();
 
         byte[] bytes = testee().readBytes(blobId).block();
 
@@ -81,7 +81,7 @@ public interface BlobStoreContract {
     @Test
     default void saveShouldReturnBlobIdOfInputStream() {
         BlobId blobId =
-            testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).block();
+            testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY), SHORT_BYTEARRAY.length).block();
 
         assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
     }
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java
new file mode 100644
index 0000000..c442b0c
--- /dev/null
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.james.blob.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+class FixedLengthInputStreamTest {
+
+    @Test
+    void fixedLengthInputStreamShouldThrowWhenInputStreamIsNull() {
+        assertThatThrownBy(() -> new Store.FixedLengthInputStream(null, 0))
+            .isInstanceOf(NullPointerException.class)
+            .hasMessage("'inputStream' is mandatory");
+    }
+
+    @Test
+    void fixedLengthInputStreamShouldThrowWhenContentLengthIsNegative() {
+        assertThatThrownBy(() -> new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), -1))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessage("'contentLength' should be greater than or equal to 0");
+    }
+
+    @Test
+    void lengthShouldBeStored() {
+        int contentLength = 1;
+
+        Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), contentLength);
+
+        assertThat(testee.getContentLength()).isEqualTo(contentLength);
+    }
+
+    @Test
+    void inputStreamShouldBeStored() {
+        ByteArrayInputStream inputStream = new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8));
+
+        Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(inputStream, 1);
+
+        assertThat(testee.getInputStream()).hasSameContentAs(inputStream);
+    }
+}
\ No newline at end of file
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
index c568764..ec495ed 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
@@ -86,9 +86,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
 
     @Test
     default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() {
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
-        testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
+        testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
         verify(metricsTestExtension.saveInputStreamTimeMetric, times(3)).stopAndPublish();
     }
 
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index cd77a88..3a9d88d 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -211,7 +211,7 @@ public class CassandraBlobsDAO implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data, long contentLength) {
         Preconditions.checkNotNull(data);
         return Mono.fromCallable(() -> IOUtils.toByteArray(data))
             .flatMap(this::saveAsMono);
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
index 51a9933..3e570ee 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
@@ -81,7 +81,7 @@ public class CassandraBlobsDAOTest implements MetricableBlobStoreContract {
 
     @Test
     void blobStoreShouldSupport100MBBlob() {
-        BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).block();
+        BlobId blobId = testee.save(new ZeroedInputStream(100_000_000), 100_000_000).block();
         InputStream bytes = testee.read(blobId);
         assertThat(bytes).hasSameContentAs(new ZeroedInputStream(100_000_000));
     }
diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
index 9f72fc4..2e203ef 100644
--- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
+++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
@@ -53,7 +53,7 @@ public class MemoryBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data, long contentLength) {
         Preconditions.checkNotNull(data);
         try {
             byte[] bytes = IOUtils.toByteArray(data);
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index 7b02932..d441e68 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
@@ -82,15 +83,15 @@ public class ObjectStorageBlobsDAO implements BlobStore {
 
     @Override
     public Mono<BlobId> save(byte[] data) {
-        return save(new ByteArrayInputStream(data));
+        return save(new ByteArrayInputStream(data), data.length);
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data, long contentLength) {
         Preconditions.checkNotNull(data);
 
         BlobId tmpId = blobIdFactory.randomId();
-        return save(data, tmpId)
+        return save(data, contentLength, tmpId)
             .flatMap(id -> updateBlobId(tmpId, id));
     }
 
@@ -102,11 +103,14 @@ public class ObjectStorageBlobsDAO implements BlobStore {
             .thenReturn(to);
     }
 
-    private Mono<BlobId> save(InputStream data, BlobId id) {
+    private Mono<BlobId> save(InputStream data, long contentLength, BlobId id) {
         String containerName = this.containerName.value();
         HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
         Payload payload = payloadCodec.write(hashingInputStream);
-        Blob blob = blobStore.blobBuilder(id.asString()).payload(payload).build();
+        Blob blob = blobStore.blobBuilder(id.asString())
+                            .payload(payload.getPayload())
+                            .contentLength(payload.getLength().orElse(contentLength))
+                            .build();
 
         return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob))
             .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString())));
@@ -123,7 +127,7 @@ public class ObjectStorageBlobsDAO implements BlobStore {
 
         try {
             if (blob != null) {
-                return payloadCodec.read(blob.getPayload());
+                return payloadCodec.read(new Payload(blob.getPayload(), Optional.empty()));
             } else {
                 throw new ObjectStoreException("fail to load blob with id " + blobId);
             }
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
index 95c1fec..63ebb7d 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.UUID;
@@ -46,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Strings;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -123,18 +125,21 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
     }
 
     @Test
-    void supportsEncryptionWithCustomPayloadCodec() {
+    void supportsEncryptionWithCustomPayloadCodec() throws IOException {
         ObjectStorageBlobsDAO encryptedDao = ObjectStorageBlobsDAO
             .builder(testConfig)
             .container(containerName)
             .blobIdFactory(blobIdFactory())
             .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG))
             .build();
-        byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8);
+        String content = "James is the best!";
+        byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
         BlobId blobId = encryptedDao.save(bytes).block();
 
         InputStream read = encryptedDao.read(blobId);
-        assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+        String expectedContent = IOUtils.toString(inputStream, Charsets.UTF_8);
+        assertThat(content).isEqualTo(expectedContent);
     }
 
     @Test
@@ -145,7 +150,8 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
             .blobIdFactory(blobIdFactory())
             .payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG))
             .build();
-        byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8);
+        String content = "James is the best!";
+        byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
         BlobId blobId = encryptedDao.save(bytes).block();
 
         InputStream encryptedIs = testee.read(blobId);
@@ -154,7 +160,9 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
         assertThat(encryptedBytes).isNotEqualTo(bytes);
 
         InputStream clearTextIs = encryptedDao.read(blobId);
-        assertThat(clearTextIs).hasSameContentAs(new ByteArrayInputStream(bytes));
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+        String expectedContent = IOUtils.toString(inputStream, Charsets.UTF_8);
+        assertThat(content).isEqualTo(expectedContent);
     }
 
     @Test
@@ -176,7 +184,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
     @Test
     void saveInputStreamShouldNotCompleteWhenDoesNotAwait() {
         Mono<BlobId> blobIdFuture = testee
-            .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)))
+            .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)), BIG_STRING.length())
             .subscribeOn(Schedulers.elastic());
         assertThat(blobIdFuture.toFuture()).isNotCompleted();
     }
diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
index 950fcf7..ee02c8d 100644
--- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
+++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
@@ -92,14 +92,14 @@ public class UnionBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(InputStream data) {
+    public Mono<BlobId> save(InputStream data, long contentLength) {
         try {
             return saveToCurrentFallbackIfFails(
-                Mono.defer(() -> currentBlobStore.save(data)),
-                () -> Mono.defer(() -> legacyBlobStore.save(data)));
+                Mono.defer(() -> currentBlobStore.save(data, contentLength)),
+                () -> Mono.defer(() -> legacyBlobStore.save(data, contentLength)));
         } catch (Exception e) {
             LOGGER.error("exception directly happens while saving InputStream data, fall back to legacy blob store", e);
-            return legacyBlobStore.save(data);
+            return legacyBlobStore.save(data, contentLength);
         }
     }
 
diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
index 2bed094..563f3cd 100644
--- a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
+++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
@@ -60,7 +60,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
         }
 
         @Override
-        public Mono<BlobId> save(InputStream data) {
+        public Mono<BlobId> save(InputStream data, long contentLength) {
             return Mono.error(new RuntimeException("broken everywhere"));
         }
 
@@ -89,7 +89,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
         }
 
         @Override
-        public Mono<BlobId> save(InputStream data) {
+        public Mono<BlobId> save(InputStream data, long contentLength) {
             throw new RuntimeException("broken everywhere");
         }
 
@@ -164,7 +164,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
                 .current(new ThrowingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -202,7 +202,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
                 .current(new FailingBlobStore())
                 .legacy(legacyBlobStore)
                 .build();
-            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+            BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(unionBlobStore.read(blobId))
@@ -284,7 +284,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
         Stream<Function<UnionBlobStore, Mono<?>>> blobStoreOperationsReturnFutures() {
             return Stream.of(
                 blobStore -> blobStore.save(BLOB_CONTENT),
-                blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT)),
+                blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length),
                 blobStore -> blobStore.readBytes(BLOB_ID_FACTORY.randomId()));
         }
 
@@ -394,7 +394,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
 
     @Test
     void saveInputStreamShouldWriteToCurrent() {
-        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
 
         assertThat(currentBlobStore.readBytes(blobId).block())
             .isEqualTo(BLOB_CONTENT);
@@ -402,7 +402,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
 
     @Test
     void saveInputStreamShouldNotWriteToLegacy() {
-        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+        BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
 
         assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block())
             .isInstanceOf(ObjectStoreException.class);
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
index 71905de..506b895 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
@@ -69,18 +69,20 @@ public class MimeMessageStore {
 
     static class MimeMessageEncoder implements Store.Impl.Encoder<MimeMessage> {
         @Override
-        public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) {
+        public Stream<Pair<BlobType, Store.FixedLengthInputStream>> encode(MimeMessage message) {
             try {
                 byte[] messageAsArray = messageToArray(message);
                 int bodyStartOctet = computeBodyStartOctet(messageAsArray);
+                byte[] headerBytes = getHeaderBytes(messageAsArray, bodyStartOctet);
+                byte[] bodyBytes = getBodyBytes(messageAsArray, bodyStartOctet);
                 return Stream.of(
-                    Pair.of(HEADER_BLOB_TYPE, new ByteArrayInputStream(getHeaderBytes(messageAsArray, bodyStartOctet))),
-                    Pair.of(BODY_BLOB_TYPE, new ByteArrayInputStream(getBodyBytes(messageAsArray, bodyStartOctet))));
+                    Pair.of(HEADER_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(headerBytes), headerBytes.length)),
+                    Pair.of(BODY_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(bodyBytes), bodyBytes.length)));
             } catch (MessagingException | IOException e) {
                 throw new RuntimeException(e);
             }
         }
-
+        
         private static byte[] messageToArray(MimeMessage message) throws IOException, MessagingException {
             ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             message.writeTo(byteArrayOutputStream);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org