You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/06/14 01:59:35 UTC
[james-project] 03/04: [PERFORMANCE] Simplify reactive flow within
CachedBlobStore
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ac3f32fef8a84b88728ab9b3e38b70f17dc6979d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 21:31:54 2021 +0700
[PERFORMANCE] Simplify reactive flow within CachedBlobStore
---
.../blob/cassandra/cache/CachedBlobStore.java | 39 +++++++++++-----------
1 file changed, 19 insertions(+), 20 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index bf4a1a0..9b6d0b9 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -138,17 +138,21 @@ public class CachedBlobStore implements BlobStore {
if (storagePolicy == LOW_COST) {
return backend.read(bucketName, blobId);
}
- return Mono.just(bucketName)
- .filter(getDefaultBucketName()::equals)
- .flatMap(defaultBucket -> readInDefaultBucket(bucketName, blobId))
- .switchIfEmpty(readFromBackend(bucketName, blobId))
+ return readInputStream(bucketName, blobId)
.blockOptional()
.orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId.asString())));
}
+ private Mono<InputStream> readInputStream(BucketName bucketName, BlobId blobId) {
+ if (bucketName.equals(getDefaultBucketName())) {
+ return readInDefaultBucket(bucketName, blobId);
+ }
+ return readFromBackend(bucketName, blobId);
+ }
+
private Mono<InputStream> readInDefaultBucket(BucketName bucketName, BlobId blobId) {
return readFromCache(blobId)
- .flatMap(this::toInputStream)
+ .<InputStream>map(ByteArrayInputStream::new)
.switchIfEmpty(readFromBackend(bucketName, blobId)
.flatMap(inputStream ->
Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes))
@@ -242,8 +246,7 @@ public class CachedBlobStore implements BlobStore {
public Mono<Boolean> delete(BucketName bucketName, BlobId blobId) {
return Mono.from(backend.delete(bucketName, blobId))
.flatMap(deleted -> {
- if (backend.getDefaultBucketName().equals(bucketName)
- && deleted) {
+ if (backend.getDefaultBucketName().equals(bucketName) && deleted) {
return Mono.from(cache.remove(blobId)).thenReturn(deleted);
}
return Mono.just(deleted);
@@ -267,16 +270,18 @@ public class CachedBlobStore implements BlobStore {
}
private Mono<Void> putInCacheIfNeeded(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead, BlobId blobId) {
- return Mono.justOrEmpty(readAhead.firstBytes)
+ return readAhead.firstBytes
.filter(bytes -> isAbleToCache(bucketName, readAhead, storagePolicy))
- .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
+ .map(bytes -> Mono.from(cache.cache(blobId, bytes)))
+ .orElse(Mono.empty());
}
private Mono<Void> putInCacheIfNeeded(BucketName bucketName, ReadAheadInputStream readAhead, BlobId blobId) {
- return Mono.justOrEmpty(readAhead.firstBytes)
+ return readAhead.firstBytes
.filter(bytes -> isAbleToCache(readAhead, bucketName))
- .doOnNext(any -> metricRetrieveMissCount.increment())
- .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
+ .map(bytes -> Mono.fromRunnable(metricRetrieveMissCount::increment)
+ .then(Mono.from(cache.cache(blobId, bytes))))
+ .orElse(Mono.empty());
}
private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
@@ -303,10 +308,6 @@ public class CachedBlobStore implements BlobStore {
return bytes.length <= sizeThresholdInBytes;
}
- private Mono<InputStream> toInputStream(byte[] bytes) {
- return Mono.fromCallable(() -> new ByteArrayInputStream(bytes));
- }
-
private Mono<byte[]> readFromCache(BlobId blobId) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_CACHED_LATENCY_METRIC_NAME, cache.read(blobId)))
.doOnNext(any -> metricRetrieveHitCount.increment());
@@ -318,9 +319,7 @@ public class CachedBlobStore implements BlobStore {
}
private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
- return Mono.fromCallable(() -> metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
- .flatMap(timer -> Mono.from(backend.readBytes(bucketName, blobId))
- .doOnSuccess(any -> timer.stopAndPublish())
- .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish()));
+ return Mono.from(metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME,
+ backend.readBytes(bucketName, blobId)));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org