You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/03/22 16:18:46 UTC
[james-project] 02/04: JAMES-2671 Add content length in save
BlobStore API
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b028cd499b20bce86b174dcb4962797bef460452
Author: Antoine Duprat <ad...@linagora.com>
AuthorDate: Mon Mar 4 14:21:58 2019 +0100
JAMES-2671 Add content length in save BlobStore API
---
.../java/org/apache/james/blob/api/BlobStore.java | 2 +-
.../apache/james/blob/api/MetricableBlobStore.java | 4 +-
.../main/java/org/apache/james/blob/api/Store.java | 28 ++++++++--
.../apache/james/blob/api/BlobStoreContract.java | 6 +--
.../james/blob/api/FixedLengthInputStreamTest.java | 63 ++++++++++++++++++++++
.../blob/api/MetricableBlobStoreContract.java | 6 +--
.../james/blob/cassandra/CassandraBlobsDAO.java | 2 +-
.../blob/cassandra/CassandraBlobsDAOTest.java | 2 +-
.../apache/james/blob/memory/MemoryBlobStore.java | 2 +-
.../blob/objectstorage/ObjectStorageBlobsDAO.java | 16 +++---
.../objectstorage/ObjectStorageBlobsDAOTest.java | 20 ++++---
.../apache/james/blob/union/UnionBlobStore.java | 8 +--
.../james/blob/union/UnionBlobStoreTest.java | 14 ++---
.../apache/james/blob/mail/MimeMessageStore.java | 10 ++--
14 files changed, 141 insertions(+), 42 deletions(-)
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 8b68d93..3d28ab5 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -26,7 +26,7 @@ public interface BlobStore {
Mono<BlobId> save(byte[] data);
- Mono<BlobId> save(InputStream data);
+ Mono<BlobId> save(InputStream data, long contentLength);
Mono<byte[]> readBytes(BlobId blobId);
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index 4ed7b17..b51e37b 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -54,9 +54,9 @@ public class MetricableBlobStore implements BlobStore {
}
@Override
- public Mono<BlobId> save(InputStream data) {
+ public Mono<BlobId> save(InputStream data, long contentLength) {
return metricFactory
- .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data));
+ .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(data, contentLength));
}
@Override
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index 07a5611..3cd4afa 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -26,6 +26,7 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
+import com.google.common.base.Preconditions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@@ -66,7 +67,7 @@ public interface Store<T, I> {
class Impl<T, I extends BlobPartsId> implements Store<T, I> {
public interface Encoder<T> {
- Stream<Pair<BlobType, InputStream>> encode(T t);
+ Stream<Pair<BlobType, FixedLengthInputStream>> encode(T t);
}
public interface Decoder<T> {
@@ -93,9 +94,9 @@ public interface Store<T, I> {
.map(idFactory::generate);
}
- private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) {
+ private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, FixedLengthInputStream> entry) {
return Mono.just(entry.getLeft())
- .zipWith(blobStore.save(entry.getRight()));
+ .zipWith(blobStore.save(entry.getRight().getInputStream(), entry.getRight().getContentLength()));
}
@Override
@@ -110,4 +111,25 @@ public interface Store<T, I> {
.map(decoder::decode);
}
}
+
+ class FixedLengthInputStream {
+
+ private final InputStream inputStream;
+ private final long contentLength;
+
+ public FixedLengthInputStream(InputStream inputStream, long contentLength) {
+ Preconditions.checkNotNull(inputStream, "'inputStream' is mandatory");
+ Preconditions.checkArgument(contentLength >= 0, "'contentLength' should be greater than or equal to 0");
+ this.inputStream = inputStream;
+ this.contentLength = contentLength;
+ }
+
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ public long getContentLength() {
+ return contentLength;
+ }
+ }
}
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
index 2164c74..6983485 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
@@ -49,7 +49,7 @@ public interface BlobStoreContract {
@Test
default void saveShouldThrowWhenNullInputStream() {
- assertThatThrownBy(() -> testee().save((InputStream) null).block())
+ assertThatThrownBy(() -> testee().save((InputStream) null, 0).block())
.isInstanceOf(NullPointerException.class);
}
@@ -64,7 +64,7 @@ public interface BlobStoreContract {
@Test
default void saveShouldSaveEmptyInputStream() {
- BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
+ BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY), EMPTY_BYTEARRAY.length).block();
byte[] bytes = testee().readBytes(blobId).block();
@@ -81,7 +81,7 @@ public interface BlobStoreContract {
@Test
default void saveShouldReturnBlobIdOfInputStream() {
BlobId blobId =
- testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).block();
+ testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY), SHORT_BYTEARRAY.length).block();
assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
}
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java
new file mode 100644
index 0000000..c442b0c
--- /dev/null
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/FixedLengthInputStreamTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.james.blob.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+class FixedLengthInputStreamTest {
+
+ @Test
+ void fixedLengthInputStreamShouldThrowWhenInputStreamIsNull() {
+ assertThatThrownBy(() -> new Store.FixedLengthInputStream(null, 0))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("'inputStream' is mandatory");
+ }
+
+ @Test
+ void fixedLengthInputStreamShouldThrowWhenContentLengthIsNegative() {
+ assertThatThrownBy(() -> new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), -1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("'contentLength' should be greater than or equal to 0");
+ }
+
+ @Test
+ void lengthShouldBeStored() {
+ int contentLength = 1;
+
+ Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), contentLength);
+
+ assertThat(testee.getContentLength()).isEqualTo(contentLength);
+ }
+
+ @Test
+ void inputStreamShouldBeStored() {
+ ByteArrayInputStream inputStream = new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8));
+
+ Store.FixedLengthInputStream testee = new Store.FixedLengthInputStream(inputStream, 1);
+
+ assertThat(testee.getInputStream()).hasSameContentAs(inputStream);
+ }
+}
\ No newline at end of file
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
index c568764..ec495ed 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
@@ -86,9 +86,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
@Test
default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() {
- testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
- testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
- testee().save(new ByteArrayInputStream(BYTES_CONTENT)).block();
+ testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
+ testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
+ testee().save(new ByteArrayInputStream(BYTES_CONTENT), BYTES_CONTENT.length).block();
verify(metricsTestExtension.saveInputStreamTimeMetric, times(3)).stopAndPublish();
}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index cd77a88..3a9d88d 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -211,7 +211,7 @@ public class CassandraBlobsDAO implements BlobStore {
}
@Override
- public Mono<BlobId> save(InputStream data) {
+ public Mono<BlobId> save(InputStream data, long contentLength) {
Preconditions.checkNotNull(data);
return Mono.fromCallable(() -> IOUtils.toByteArray(data))
.flatMap(this::saveAsMono);
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
index 51a9933..3e570ee 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
@@ -81,7 +81,7 @@ public class CassandraBlobsDAOTest implements MetricableBlobStoreContract {
@Test
void blobStoreShouldSupport100MBBlob() {
- BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).block();
+ BlobId blobId = testee.save(new ZeroedInputStream(100_000_000), 100_000_000).block();
InputStream bytes = testee.read(blobId);
assertThat(bytes).hasSameContentAs(new ZeroedInputStream(100_000_000));
}
diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
index 9f72fc4..2e203ef 100644
--- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
+++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
@@ -53,7 +53,7 @@ public class MemoryBlobStore implements BlobStore {
}
@Override
- public Mono<BlobId> save(InputStream data) {
+ public Mono<BlobId> save(InputStream data, long contentLength) {
Preconditions.checkNotNull(data);
try {
byte[] bytes = IOUtils.toByteArray(data);
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index 7b02932..d441e68 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.james.blob.api.BlobId;
@@ -82,15 +83,15 @@ public class ObjectStorageBlobsDAO implements BlobStore {
@Override
public Mono<BlobId> save(byte[] data) {
- return save(new ByteArrayInputStream(data));
+ return save(new ByteArrayInputStream(data), data.length);
}
@Override
- public Mono<BlobId> save(InputStream data) {
+ public Mono<BlobId> save(InputStream data, long contentLength) {
Preconditions.checkNotNull(data);
BlobId tmpId = blobIdFactory.randomId();
- return save(data, tmpId)
+ return save(data, contentLength, tmpId)
.flatMap(id -> updateBlobId(tmpId, id));
}
@@ -102,11 +103,14 @@ public class ObjectStorageBlobsDAO implements BlobStore {
.thenReturn(to);
}
- private Mono<BlobId> save(InputStream data, BlobId id) {
+ private Mono<BlobId> save(InputStream data, long contentLength, BlobId id) {
String containerName = this.containerName.value();
HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
Payload payload = payloadCodec.write(hashingInputStream);
- Blob blob = blobStore.blobBuilder(id.asString()).payload(payload).build();
+ Blob blob = blobStore.blobBuilder(id.asString())
+ .payload(payload.getPayload())
+ .contentLength(payload.getLength().orElse(contentLength))
+ .build();
return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob))
.then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString())));
@@ -123,7 +127,7 @@ public class ObjectStorageBlobsDAO implements BlobStore {
try {
if (blob != null) {
- return payloadCodec.read(blob.getPayload());
+ return payloadCodec.read(new Payload(blob.getPayload(), Optional.empty()));
} else {
throw new ObjectStoreException("fail to load blob with id " + blobId);
}
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
index 95c1fec..63ebb7d 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAOTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@@ -46,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -123,18 +125,21 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
}
@Test
- void supportsEncryptionWithCustomPayloadCodec() {
+ void supportsEncryptionWithCustomPayloadCodec() throws IOException {
ObjectStorageBlobsDAO encryptedDao = ObjectStorageBlobsDAO
.builder(testConfig)
.container(containerName)
.blobIdFactory(blobIdFactory())
.payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG))
.build();
- byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8);
+ String content = "James is the best!";
+ byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
BlobId blobId = encryptedDao.save(bytes).block();
InputStream read = encryptedDao.read(blobId);
- assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes));
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+ String expectedContent = IOUtils.toString(inputStream, Charsets.UTF_8);
+ assertThat(content).isEqualTo(expectedContent);
}
@Test
@@ -145,7 +150,8 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
.blobIdFactory(blobIdFactory())
.payloadCodec(new AESPayloadCodec(CRYPTO_CONFIG))
.build();
- byte[] bytes = "James is the best!".getBytes(StandardCharsets.UTF_8);
+ String content = "James is the best!";
+ byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
BlobId blobId = encryptedDao.save(bytes).block();
InputStream encryptedIs = testee.read(blobId);
@@ -154,7 +160,9 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
assertThat(encryptedBytes).isNotEqualTo(bytes);
InputStream clearTextIs = encryptedDao.read(blobId);
- assertThat(clearTextIs).hasSameContentAs(new ByteArrayInputStream(bytes));
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+ String expectedContent = IOUtils.toString(inputStream, Charsets.UTF_8);
+ assertThat(content).isEqualTo(expectedContent);
}
@Test
@@ -176,7 +184,7 @@ public class ObjectStorageBlobsDAOTest implements MetricableBlobStoreContract {
@Test
void saveInputStreamShouldNotCompleteWhenDoesNotAwait() {
Mono<BlobId> blobIdFuture = testee
- .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)))
+ .save(new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)), BIG_STRING.length())
.subscribeOn(Schedulers.elastic());
assertThat(blobIdFuture.toFuture()).isNotCompleted();
}
diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
index 950fcf7..ee02c8d 100644
--- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
+++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java
@@ -92,14 +92,14 @@ public class UnionBlobStore implements BlobStore {
}
@Override
- public Mono<BlobId> save(InputStream data) {
+ public Mono<BlobId> save(InputStream data, long contentLength) {
try {
return saveToCurrentFallbackIfFails(
- Mono.defer(() -> currentBlobStore.save(data)),
- () -> Mono.defer(() -> legacyBlobStore.save(data)));
+ Mono.defer(() -> currentBlobStore.save(data, contentLength)),
+ () -> Mono.defer(() -> legacyBlobStore.save(data, contentLength)));
} catch (Exception e) {
LOGGER.error("exception directly happens while saving InputStream data, fall back to legacy blob store", e);
- return legacyBlobStore.save(data);
+ return legacyBlobStore.save(data, contentLength);
}
}
diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
index 2bed094..563f3cd 100644
--- a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
+++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java
@@ -60,7 +60,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
}
@Override
- public Mono<BlobId> save(InputStream data) {
+ public Mono<BlobId> save(InputStream data, long contentLength) {
return Mono.error(new RuntimeException("broken everywhere"));
}
@@ -89,7 +89,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
}
@Override
- public Mono<BlobId> save(InputStream data) {
+ public Mono<BlobId> save(InputStream data, long contentLength) {
throw new RuntimeException("broken everywhere");
}
@@ -164,7 +164,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
.current(new ThrowingBlobStore())
.legacy(legacyBlobStore)
.build();
- BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+ BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(unionBlobStore.read(blobId))
@@ -202,7 +202,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
.current(new FailingBlobStore())
.legacy(legacyBlobStore)
.build();
- BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+ BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(unionBlobStore.read(blobId))
@@ -284,7 +284,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
Stream<Function<UnionBlobStore, Mono<?>>> blobStoreOperationsReturnFutures() {
return Stream.of(
blobStore -> blobStore.save(BLOB_CONTENT),
- blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT)),
+ blobStore -> blobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length),
blobStore -> blobStore.readBytes(BLOB_ID_FACTORY.randomId()));
}
@@ -394,7 +394,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
@Test
void saveInputStreamShouldWriteToCurrent() {
- BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+ BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
assertThat(currentBlobStore.readBytes(blobId).block())
.isEqualTo(BLOB_CONTENT);
@@ -402,7 +402,7 @@ class UnionBlobStoreTest implements BlobStoreContract {
@Test
void saveInputStreamShouldNotWriteToLegacy() {
- BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).block();
+ BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT), BLOB_CONTENT.length).block();
assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).block())
.isInstanceOf(ObjectStoreException.class);
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
index 71905de..506b895 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
@@ -69,18 +69,20 @@ public class MimeMessageStore {
static class MimeMessageEncoder implements Store.Impl.Encoder<MimeMessage> {
@Override
- public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) {
+ public Stream<Pair<BlobType, Store.FixedLengthInputStream>> encode(MimeMessage message) {
try {
byte[] messageAsArray = messageToArray(message);
int bodyStartOctet = computeBodyStartOctet(messageAsArray);
+ byte[] headerBytes = getHeaderBytes(messageAsArray, bodyStartOctet);
+ byte[] bodyBytes = getBodyBytes(messageAsArray, bodyStartOctet);
return Stream.of(
- Pair.of(HEADER_BLOB_TYPE, new ByteArrayInputStream(getHeaderBytes(messageAsArray, bodyStartOctet))),
- Pair.of(BODY_BLOB_TYPE, new ByteArrayInputStream(getBodyBytes(messageAsArray, bodyStartOctet))));
+ Pair.of(HEADER_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(headerBytes), headerBytes.length)),
+ Pair.of(BODY_BLOB_TYPE, new Store.FixedLengthInputStream(new ByteArrayInputStream(bodyBytes), bodyBytes.length)));
} catch (MessagingException | IOException e) {
throw new RuntimeException(e);
}
}
-
+
private static byte[] messageToArray(MimeMessage message) throws IOException, MessagingException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
message.writeTo(byteArrayOutputStream);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org