You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/26 02:55:13 UTC
[james-project] 10/11: JAMES-3433 BlobStore reads should have a
StoragePolicy associated
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2c2f2ea125ffe6d300487352aa0d34c7e9fe7213
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 22 08:45:00 2020 +0700
JAMES-3433 BlobStore reads should have a StoragePolicy associated
Cached blob store only attempts cache reads when HIGH_PERFORMANCE in being used.
---
.../cassandra/mail/CassandraAttachmentMapper.java | 2 +-
.../cassandra/mail/CassandraMessageDAO.java | 12 ++--
.../cassandra/mail/CassandraMessageDAOV3.java | 12 ++--
.../vault/blob/BlobStoreDeletedMessageVault.java | 2 +-
.../java/org/apache/james/blob/api/BlobStore.java | 8 +++
.../java/org/apache/james/blob/api/BlobType.java | 13 +++-
.../blob/cassandra/cache/CachedBlobStore.java | 20 +++++-
.../blob/cassandra/cache/CachedBlobStoreTest.java | 71 +++++++++++-----------
.../main/java/org/apache/james/blob/api/Store.java | 2 +-
.../export/file/LocalFileBlobExportMechanism.java | 4 +-
.../apache/james/blob/mail/MimeMessagePartsId.java | 7 ++-
.../linshare/LinshareBlobExportMechanism.java | 4 +-
12 files changed, 97 insertions(+), 60 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index dd6edec..9db614a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -96,7 +96,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
@Override
public InputStream loadAttachmentContent(AttachmentId attachmentId) throws AttachmentNotFoundException, IOException {
return attachmentDAOV2.getAttachment(attachmentId)
- .map(daoAttachment -> blobStore.read(blobStore.getDefaultBucketName(), daoAttachment.getBlobId()))
+ .map(daoAttachment -> blobStore.read(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST))
.blockOptional()
.orElseThrow(() -> new AttachmentNotFoundException(attachmentId.toString()));
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 8d97ff5..98b22d6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -319,9 +319,9 @@ public class CassandraMessageDAO {
case Full:
return getFullContent(headerId, bodyId);
case Headers:
- return getContent(headerId);
+ return getContent(headerId, SIZE_BASED);
case Body:
- return getContent(bodyId)
+ return getContent(bodyId, LOW_COST)
.map(data -> Bytes.concat(new byte[bodyStartOctet], data));
case Metadata:
return Mono.just(EMPTY_BYTE_ARRAY);
@@ -331,12 +331,12 @@ public class CassandraMessageDAO {
}
private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) {
- return getContent(headerId)
- .zipWith(getContent(bodyId), Bytes::concat);
+ return getContent(headerId, SIZE_BASED)
+ .zipWith(getContent(bodyId, LOW_COST), Bytes::concat);
}
- private Mono<byte[]> getContent(BlobId blobId) {
- return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId));
+ private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePolicy) {
+ return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId, storagePolicy));
}
private BlobId retrieveBlobId(String field, Row row) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 8f349da..c665a85 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -346,9 +346,9 @@ public class CassandraMessageDAOV3 {
case Full:
return getFullContent(headerId, bodyId);
case Headers:
- return getContent(headerId);
+ return getContent(headerId, SIZE_BASED);
case Body:
- return getContent(bodyId)
+ return getContent(bodyId, LOW_COST)
.map(data -> Bytes.concat(new byte[bodyStartOctet], data));
case Metadata:
return Mono.just(EMPTY_BYTE_ARRAY);
@@ -358,12 +358,12 @@ public class CassandraMessageDAOV3 {
}
private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) {
- return getContent(headerId)
- .zipWith(getContent(bodyId), Bytes::concat);
+ return getContent(headerId, SIZE_BASED)
+ .zipWith(getContent(bodyId, LOW_COST), Bytes::concat);
}
- private Mono<byte[]> getContent(BlobId blobId) {
- return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId));
+ private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePolicy) {
+ return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId, storagePolicy));
}
private BlobId retrieveBlobId(String field, Row row) {
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
index d61247b..e10cd2b 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
@@ -122,7 +122,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
}
private Mono<InputStream> loadMimeMessage(StorageInformation storageInformation, Username username, MessageId messageId) {
- return Mono.fromSupplier(() -> blobStore.read(storageInformation.getBucketName(), storageInformation.getBlobId()))
+ return Mono.fromSupplier(() -> blobStore.read(storageInformation.getBucketName(), storageInformation.getBlobId(), LOW_COST))
.onErrorResume(
ObjectNotFoundException.class,
ex -> Mono.error(new DeletedMessageContentNotFoundException(username, messageId)));
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index a887a32..dc4abef 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -43,6 +43,14 @@ public interface BlobStore {
InputStream read(BucketName bucketName, BlobId blobId);
+ default Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) {
+ return readBytes(bucketName, blobId);
+ }
+
+ default InputStream read(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) {
+ return read(bucketName, blobId);
+ }
+
BucketName getDefaultBucketName();
Publisher<Void> deleteBucket(BucketName bucketName);
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java
index e068c68..ecc0d61 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java
@@ -23,27 +23,34 @@ import java.util.Objects;
public class BlobType {
private final String name;
+ private final BlobStore.StoragePolicy storagePolicy;
- public BlobType(String name) {
+ public BlobType(String name, BlobStore.StoragePolicy storagePolicy) {
this.name = name;
+ this.storagePolicy = storagePolicy;
}
public String getName() {
return name;
}
+ public BlobStore.StoragePolicy getStoragePolicy() {
+ return storagePolicy;
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof BlobType) {
BlobType blobType = (BlobType) o;
- return Objects.equals(this.name, blobType.name);
+ return Objects.equals(this.name, blobType.name)
+ && Objects.equals(this.storagePolicy, blobType.storagePolicy);
}
return false;
}
@Override
public final int hashCode() {
- return Objects.hash(name);
+ return Objects.hash(name, storagePolicy);
}
}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 0e998c0..87fdfa5 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -132,7 +132,10 @@ public class CachedBlobStore implements BlobStore {
}
@Override
- public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
+ public InputStream read(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) throws ObjectStoreIOException, ObjectNotFoundException {
+ if (storagePolicy == LOW_COST) {
+ return backend.read(bucketName, blobId);
+ }
return Mono.just(bucketName)
.filter(getDefaultBucketName()::equals)
.flatMap(defaultBucket -> readInDefaultBucket(bucketName, blobId))
@@ -152,13 +155,26 @@ public class CachedBlobStore implements BlobStore {
}
@Override
- public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+ public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) {
+ if (storagePolicy == LOW_COST) {
+ return readBytesFromBackend(bucketName, blobId);
+ }
if (getDefaultBucketName().equals(bucketName)) {
return readBytesInDefaultBucket(bucketName, blobId);
}
return readBytesFromBackend(bucketName, blobId);
}
+ @Override
+ public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+ return readBytes(bucketName, blobId, LOW_COST);
+ }
+
+ @Override
+ public InputStream read(BucketName bucketName, BlobId blobId) {
+ return read(bucketName, blobId, LOW_COST);
+ }
+
private Mono<byte[]> readBytesInDefaultBucket(BucketName bucketName, BlobId blobId) {
return readFromCache(blobId).switchIfEmpty(
readBytesFromBackend(bucketName, blobId)
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index bf0a405..580f910 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -216,7 +216,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
+ soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
soflty.assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
@@ -229,7 +229,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId))
+ soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
soflty.assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
@@ -242,7 +242,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(TEST_BUCKETNAME, blobId)).block()))
+ soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
});
@@ -254,7 +254,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- soflty.assertThat(testee().read(TEST_BUCKETNAME, blobId))
+ soflty.assertThat(testee().read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
});
@@ -266,7 +266,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
+ soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block()))
.hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
});
@@ -278,7 +278,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId))
+ soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE))
.hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
});
@@ -294,7 +294,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
Mono.from(cache.read(blobId)).block();
- assertThat(testee().read(DEFAULT_BUCKETNAME, blobId))
+ assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
}
@@ -304,8 +304,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBlobStoreCacheWithNoneDefaultBucketNameShouldNotImpact() {
BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- testee.read(TEST_BUCKETNAME, blobId);
- testee.read(TEST_BUCKETNAME, blobId);
+ testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE);
+ testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE);
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME))
@@ -324,8 +324,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBlobStoreWithNoneDefaultBucketNameShouldRecordByBackendLatency() {
BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- testee.read(TEST_BUCKETNAME, blobId);
- testee.read(TEST_BUCKETNAME, blobId);
+ testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE);
+ testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE);
SoftAssertions.assertSoftly(soflty ->
soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
@@ -337,9 +337,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBytesWithNoneDefaultBucketNameShouldNotImpact() {
BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block();
- Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block();
-
+ Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
+ Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME))
@@ -361,8 +360,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBytesWithNoneDefaultBucketNameShouldPublishBackendTimerMetrics() {
BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block();
- Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block();
+ Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
+ Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
SoftAssertions.assertSoftly(soflty ->
soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
@@ -374,8 +373,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBlobStoreCacheShouldPublishTimerMetrics() {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- testee.read(DEFAULT_BUCKETNAME, blobId);
- testee.read(DEFAULT_BUCKETNAME, blobId);
+ testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE);
+ testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE);
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_CACHED_LATENCY_METRIC_NAME))
@@ -388,8 +387,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBytesCacheShouldPublishTimerMetrics() {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_CACHED_LATENCY_METRIC_NAME))
@@ -405,8 +404,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBytesShouldPublishBackendTimerMetricsForBigBlobs() {
BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
SoftAssertions.assertSoftly(soflty ->
soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
@@ -418,8 +417,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readInputStreamShouldPublishBackendTimerForBigBlobs() {
BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block();
- testee.read(DEFAULT_BUCKETNAME, blobId);
- testee.read(DEFAULT_BUCKETNAME, blobId);
+ testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE);
+ testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE);
SoftAssertions.assertSoftly(soflty ->
soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
@@ -431,8 +430,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBytesShouldNotIncreaseCacheCounterForBigBlobs() {
BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
SoftAssertions.assertSoftly(soflty -> {
soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME))
@@ -466,7 +465,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block();
Duration delay = Duration.ofMillis(500);
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId))
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE))
.then(Mono.delay(delay))
.repeat(2)
.blockLast();
@@ -481,7 +480,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
Duration delay = Duration.ofMillis(500);
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId))
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE))
.then(Mono.delay(delay))
.repeat(2)
.blockLast();
@@ -495,8 +494,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBlobStoreCacheShouldCountWhenHit() {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- testee.read(DEFAULT_BUCKETNAME, blobId);
- testee.read(DEFAULT_BUCKETNAME, blobId);
+ testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE);
+ testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE);
assertThat(metricFactory.countFor(BLOBSTORE_CACHED_HIT_COUNT_METRIC_NAME)).isEqualTo(2);
}
@@ -505,8 +504,8 @@ public class CachedBlobStoreTest implements BlobStoreContract {
void readBytesCacheShouldCountWhenHit() {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
assertThat(metricFactory.countFor(BLOBSTORE_CACHED_HIT_COUNT_METRIC_NAME)).isEqualTo(2);
}
@@ -517,7 +516,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
Mono.from(cache.remove(blobId)).block();
- testee.read(DEFAULT_BUCKETNAME, blobId);
+ testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE);
assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)).isEqualTo(1);
}
@@ -527,7 +526,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
Mono.from(cache.remove(blobId)).block();
- Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block();
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block();
assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)).isEqualTo(1);
}
@@ -535,7 +534,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
@Test
void metricsShouldNotWorkExceptLatencyWhenReadNonExistingBlob() {
SoftAssertions.assertSoftly(soflty -> {
- soflty.assertThatThrownBy(() -> testee.read(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId()))
+ soflty.assertThatThrownBy(() -> testee.read(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId(), HIGH_PERFORMANCE))
.isInstanceOf(ObjectNotFoundException.class);
soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME))
@@ -556,7 +555,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
@Test
void metricsShouldNotWorkExceptLatencyWhenReadNonExistingBlobAsBytes() {
SoftAssertions.assertSoftly(soflty -> {
- soflty.assertThatThrownBy(() -> Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId())).blockOptional())
+ soflty.assertThatThrownBy(() -> Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId(), HIGH_PERFORMANCE)).blockOptional())
.isInstanceOf(ObjectNotFoundException.class);
soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME))
diff --git a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
index 8deba51..9c20cba 100644
--- a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
@@ -97,7 +97,7 @@ public interface Store<T, I> {
return Flux.fromIterable(blobIds.asMap().entrySet())
.publishOn(Schedulers.elastic())
.flatMapSequential(
- entry -> Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue()))
+ entry -> Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue(), entry.getKey().getStoragePolicy()))
.zipWith(Mono.just(entry.getKey())))
.map(entry -> Pair.of(entry.getT2(), entry.getT1()))
.collectList()
diff --git a/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java b/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java
index a2968ea..e6aec2b 100644
--- a/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java
+++ b/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java
@@ -19,6 +19,8 @@
package org.apache.james.blob.export.file;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -121,7 +123,7 @@ public class LocalFileBlobExportMechanism implements BlobExportMechanism {
String fileName = ExportedFileNamesGenerator.generateFileName(fileCustomPrefix, blobId, fileExtension);
String fileURL = configuration.exportDirectory + "/" + fileName;
File file = fileSystem.getFile(fileURL);
- FileUtils.copyToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId), file);
+ FileUtils.copyToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId, LOW_COST), file);
return file.getAbsolutePath();
} catch (IOException e) {
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java
index 936efe5..935a9b7 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java
@@ -19,6 +19,9 @@
package org.apache.james.blob.mail;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
+
import java.util.Map;
import java.util.Objects;
@@ -75,8 +78,8 @@ public class MimeMessagePartsId implements BlobPartsId {
}
}
- static final BlobType HEADER_BLOB_TYPE = new BlobType("mailHeader");
- static final BlobType BODY_BLOB_TYPE = new BlobType("mailBody");
+ static final BlobType HEADER_BLOB_TYPE = new BlobType("mailHeader", SIZE_BASED);
+ static final BlobType BODY_BLOB_TYPE = new BlobType("mailBody", LOW_COST);
private final BlobId headerBlobId;
private final BlobId bodyBlobId;
diff --git a/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java b/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java
index 92c280a..7f39129 100644
--- a/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java
+++ b/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java
@@ -19,6 +19,8 @@
package org.apache.james.linshare;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
import java.io.File;
import java.io.IOException;
import java.util.Optional;
@@ -67,7 +69,7 @@ public class LinshareBlobExportMechanism implements BlobExportMechanism {
String fileName = ExportedFileNamesGenerator.generateFileName(fileCustomPrefix, blobId, fileExtension);
File tempFile = new File(tempDir, fileName);
try {
- FileUtils.copyInputStreamToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId), tempFile);
+ FileUtils.copyInputStreamToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId, LOW_COST), tempFile);
uploadDocumentToTargetMail(mailAddress, tempFile);
} finally {
FileUtils.forceDelete(tempFile);
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org