You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/06/14 01:59:35 UTC

[james-project] 03/04: [PERFORMANCE] Simplify reactive flow within CachedBlobStore

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ac3f32fef8a84b88728ab9b3e38b70f17dc6979d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 21:31:54 2021 +0700

    [PERFORMANCE] Simplify reactive flow within CachedBlobStore
---
 .../blob/cassandra/cache/CachedBlobStore.java      | 39 +++++++++++-----------
 1 file changed, 19 insertions(+), 20 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index bf4a1a0..9b6d0b9 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -138,17 +138,21 @@ public class CachedBlobStore implements BlobStore {
         if (storagePolicy == LOW_COST) {
             return backend.read(bucketName, blobId);
         }
-        return Mono.just(bucketName)
-            .filter(getDefaultBucketName()::equals)
-            .flatMap(defaultBucket -> readInDefaultBucket(bucketName, blobId))
-            .switchIfEmpty(readFromBackend(bucketName, blobId))
+        return readInputStream(bucketName, blobId)
             .blockOptional()
             .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId.asString())));
     }
 
+    private Mono<InputStream> readInputStream(BucketName bucketName, BlobId blobId) {
+        if (bucketName.equals(getDefaultBucketName())) {
+            return readInDefaultBucket(bucketName, blobId);
+        }
+        return readFromBackend(bucketName, blobId);
+    }
+
     private Mono<InputStream> readInDefaultBucket(BucketName bucketName, BlobId blobId) {
         return readFromCache(blobId)
-            .flatMap(this::toInputStream)
+            .<InputStream>map(ByteArrayInputStream::new)
             .switchIfEmpty(readFromBackend(bucketName, blobId)
                 .flatMap(inputStream ->
                     Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes))
@@ -242,8 +246,7 @@ public class CachedBlobStore implements BlobStore {
     public Mono<Boolean> delete(BucketName bucketName, BlobId blobId) {
         return Mono.from(backend.delete(bucketName, blobId))
             .flatMap(deleted -> {
-                if (backend.getDefaultBucketName().equals(bucketName)
-                      && deleted) {
+                if (backend.getDefaultBucketName().equals(bucketName) && deleted) {
                     return Mono.from(cache.remove(blobId)).thenReturn(deleted);
                 }
                 return Mono.just(deleted);
@@ -267,16 +270,18 @@ public class CachedBlobStore implements BlobStore {
     }
 
     private Mono<Void> putInCacheIfNeeded(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead, BlobId blobId) {
-        return Mono.justOrEmpty(readAhead.firstBytes)
+        return readAhead.firstBytes
             .filter(bytes -> isAbleToCache(bucketName, readAhead, storagePolicy))
-            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
+            .map(bytes -> Mono.from(cache.cache(blobId, bytes)))
+            .orElse(Mono.empty());
     }
 
     private Mono<Void> putInCacheIfNeeded(BucketName bucketName, ReadAheadInputStream readAhead, BlobId blobId) {
-        return Mono.justOrEmpty(readAhead.firstBytes)
+        return readAhead.firstBytes
             .filter(bytes -> isAbleToCache(readAhead, bucketName))
-            .doOnNext(any -> metricRetrieveMissCount.increment())
-            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
+            .map(bytes -> Mono.fromRunnable(metricRetrieveMissCount::increment)
+                .then(Mono.from(cache.cache(blobId, bytes))))
+            .orElse(Mono.empty());
     }
 
     private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
@@ -303,10 +308,6 @@ public class CachedBlobStore implements BlobStore {
         return bytes.length <= sizeThresholdInBytes;
     }
 
-    private Mono<InputStream> toInputStream(byte[] bytes) {
-        return Mono.fromCallable(() -> new ByteArrayInputStream(bytes));
-    }
-
     private Mono<byte[]> readFromCache(BlobId blobId) {
         return Mono.from(metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_CACHED_LATENCY_METRIC_NAME, cache.read(blobId)))
             .doOnNext(any -> metricRetrieveHitCount.increment());
@@ -318,9 +319,7 @@ public class CachedBlobStore implements BlobStore {
     }
 
     private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
-        return Mono.fromCallable(() -> metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
-            .flatMap(timer -> Mono.from(backend.readBytes(bucketName, blobId))
-                .doOnSuccess(any -> timer.stopAndPublish())
-                .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish()));
+        return Mono.from(metricFactory.decoratePublisherWithTimerMetric(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME,
+            backend.readBytes(bucketName, blobId)));
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org