You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/04/30 07:28:46 UTC

[james-project] 10/11: JAMES-3140: cache when read with the same rule with save, add test

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4c456e28b15ad26fad9ebdaaae5982e3f1098e39
Author: ducnv <du...@gmail.com>
AuthorDate: Wed Apr 22 18:22:58 2020 +0700

    JAMES-3140: cache when read with the same rule with save, add test
---
 .../blob/cassandra/cache/CachedBlobStore.java      | 101 +++++++++++++----
 .../blob/cassandra/cache/CachedBlobStoreTest.java  | 124 +++++++++++++++++----
 2 files changed, 179 insertions(+), 46 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 3d4abad..86aead0 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -57,10 +57,12 @@ public class CachedBlobStore implements BlobStore {
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
         return Mono.just(bucketName)
-            .filter(backend.getDefaultBucketName()::equals)
-            .flatMap(ignored -> Mono.from(cache.read(blobId))
-                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))))
-            .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
+            .filter(getDefaultBucketName()::equals)
+            .flatMap(ignored -> readFromCache(blobId)
+                .flatMap(this::toInputStream))
+            .switchIfEmpty(readFromBackend(bucketName, blobId)
+                .map(this::toPushbackStream)
+                .flatMap(pushbackInputStream -> saveInCache(pushbackInputStream, blobId, bucketName)))
             .blockOptional()
             .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
     }
@@ -68,9 +70,12 @@ public class CachedBlobStore implements BlobStore {
     @Override
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         return Mono.just(bucketName)
-            .filter(backend.getDefaultBucketName()::equals)
-            .flatMap(ignored -> Mono.from(cache.read(blobId)))
-            .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
+            .filter(getDefaultBucketName()::equals)
+            .flatMap(ignored -> readFromCache(blobId)
+                .switchIfEmpty(readBytesFromBackend(bucketName, blobId)
+                    .filter(this::isAbleToCache)
+                    .flatMap(bytes -> saveInCache(blobId, bytes).then(Mono.just(bytes)))))
+            .switchIfEmpty(readBytesFromBackend(bucketName, blobId));
     }
 
     @Override
@@ -78,7 +83,7 @@ public class CachedBlobStore implements BlobStore {
         return Mono.from(backend.save(bucketName, bytes, storagePolicy))
             .flatMap(blobId -> {
                 if (isAbleToCache(bucketName, bytes, storagePolicy)) {
-                    return Mono.from(cache.cache(blobId, bytes)).thenReturn(blobId);
+                    return saveInCache(blobId, bytes).thenReturn(blobId);
                 }
                 return Mono.just(blobId);
             });
@@ -89,24 +94,13 @@ public class CachedBlobStore implements BlobStore {
         Preconditions.checkNotNull(inputStream, "InputStream must not be null");
 
         if (isAbleToCache(bucketName, storagePolicy)) {
-            return saveInCache(bucketName, inputStream, storagePolicy);
+            return Mono.fromCallable(() -> toPushbackStream(inputStream))
+                .flatMap(pushbackInputStream -> saveInCache(bucketName, pushbackInputStream, storagePolicy));
         }
 
         return backend.save(bucketName, inputStream, storagePolicy);
     }
 
-    private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
-        PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes);
-
-        return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
-            .flatMap(Mono::justOrEmpty)
-            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
-            .flatMap(bytes -> Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
-                .flatMap(blobId -> Mono.from(cache.cache(blobId, bytes))
-                    .thenReturn(blobId)))
-            .switchIfEmpty(Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)));
-    }
-
     @Override
     public BucketName getDefaultBucketName() {
         return backend.getDefaultBucketName();
@@ -148,11 +142,74 @@ public class CachedBlobStore implements BlobStore {
         }
     }
 
+    private Mono<BlobId> saveInCache(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
+        return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
+            .flatMap(Mono::justOrEmpty)
+            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+            .flatMap(bytes -> saveInBackend(bucketName, pushbackInputStream, storagePolicy)
+                .flatMap(blobId -> saveInCache(blobId, bytes).thenReturn(blobId)))
+            .switchIfEmpty(saveInBackend(bucketName, pushbackInputStream, storagePolicy));
+    }
+
+    private Mono<BlobId> saveInBackend(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
+        return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy));
+    }
+
+    private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
+        return Mono.from(cache.cache(blobId, bytes));
+    }
+
+    private Mono<InputStream> saveInCache(PushbackInputStream pushbackInputStream, BlobId blobId, BucketName bucketName) {
+        return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
+            .flatMap(Mono::justOrEmpty)
+            .filter(bytes -> isAbleToCache(bytes, bucketName))
+            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))
+                .map(ignore -> pushbackBytesArrayRead(pushbackInputStream, bytes)))
+            .then(Mono.just(pushbackInputStream));
+    }
+
     private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
-        return isAbleToCache(bucketName, storagePolicy) && bytes.length <= sizeThresholdInBytes;
+        return isAbleToCache(bucketName, storagePolicy) && isAbleToCache(bytes);
     }
 
     private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) {
         return backend.getDefaultBucketName().equals(bucketName) && !storagePolicy.equals(LOW_COST);
     }
+
+    private boolean isAbleToCache(byte[] bytes, BucketName bucketName) {
+        return isAbleToCache(bytes) && backend.getDefaultBucketName().equals(bucketName);
+    }
+
+    private boolean isAbleToCache(byte[] bytes) {
+        return bytes.length <= sizeThresholdInBytes;
+    }
+
+    private Mono<InputStream> toInputStream(byte[] bytes) {
+        return Mono.fromCallable(() -> new ByteArrayInputStream(bytes));
+    }
+
+    private Mono<InputStream> readFromBackend(BucketName bucketName, BlobId blobId) {
+        return Mono.fromCallable(() -> backend.read(bucketName, blobId));
+    }
+
+    private PushbackInputStream toPushbackStream(InputStream inputStream) {
+        return new PushbackInputStream(inputStream, sizeThresholdInBytes);
+    }
+
+    private Mono<byte[]> readFromCache(BlobId blobId) {
+        return Mono.from(cache.read(blobId));
+    }
+
+    private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
+        return Mono.from(backend.readBytes(bucketName, blobId));
+    }
+
+    private Mono<Void> pushbackBytesArrayRead(PushbackInputStream pushbackInputStream, byte[] bytes) {
+        try {
+            pushbackInputStream.unread(bytes);
+        } catch (IOException e) {
+            // Ignore
+        }
+        return Mono.empty();
+    }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index debea77..8da8e61 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -29,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -52,8 +53,8 @@ import reactor.core.publisher.Mono;
 
 public class CachedBlobStoreTest implements BlobStoreContract {
 
-    private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
-    private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+    private static final BucketName DEFAULT_BUCKETNAME = DEFAULT;
+    private static final BucketName TEST_BUCKETNAME = BucketName.of("test");
     byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 500).getBytes(StandardCharsets.UTF_8);
 
     @RegisterExtension
@@ -69,6 +70,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
         backend = CassandraBlobStore.forTesting(cassandra.getConf());
         CassandraCacheConfiguration cacheConfig = new CassandraCacheConfiguration.Builder()
             .sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1)
+            .timeOut(Duration.ofSeconds(60))
             .build();
         cache = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfig);
         testee = new CachedBlobStore(cache, backend, cacheConfig);
@@ -86,7 +88,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
 
     @Test
     public void shouldCacheWhenDefaultBucketName() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
 
         byte[] actual = Mono.from(cache.read(blobId)).block();
         assertThat(actual).containsExactly(EIGHT_KILOBYTES);
@@ -94,108 +96,182 @@ public class CachedBlobStoreTest implements BlobStoreContract {
 
     @Test
     public void shouldNotCacheWhenNotDefaultBucketName() {
-        BlobId blobId = Mono.from(testee().save(TEST_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(TEST_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(ignored -> {
             assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
-            assertThat(Mono.from(backend.readBytes(TEST_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(TEST_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
         });
     }
 
     @Test
     public void shouldNotCacheWhenDefaultBucketNameAndBigByteDataAndSizeBase() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, TWELVE_MEGABYTES, SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, TWELVE_MEGABYTES, SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(ignored -> {
             assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
-            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(TWELVE_MEGABYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(TWELVE_MEGABYTES);
         });
     }
 
     @Test
     public void shouldSavedBothInCacheAndBackendWhenSizeBase() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
             assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
-            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
         });
     }
 
     @Test
     public void shouldSavedBothInCacheAndBackendWhenHighPerformance() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, HIGH_PERFORMANCE)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, HIGH_PERFORMANCE)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
             assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
-            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
         });
     }
 
     @Test
     public void shouldNotCacheWhenLowCost() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, LOW_COST)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, LOW_COST)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
             assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
-            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
         });
     }
 
     @Test
     public void shouldCacheWhenEmptyStream() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
             assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
-            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
         });
     }
 
     @Test
     public void shouldNotCacheWhenEmptyByteArray() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
             assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
-            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
         });
     }
 
     @Test
     public void shouldCacheWhenFiveKilobytesSteam() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
             assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
                 .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
-            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
                 .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
         });
     }
 
     @Test
     public void shouldCacheWhenFiveKilobytesBytes() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
             assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
                 .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
-            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
                 .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
         });
     }
 
     @Test
     public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
-        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(ignored -> {
-            assertThatCode(Mono.from(testee().delete(DEFAULT_BUCKERNAME, blobId))::block)
+            assertThatCode(Mono.from(testee().delete(DEFAULT_BUCKETNAME, blobId))::block)
                 .doesNotThrowAnyException();
             assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
-            assertThatThrownBy(() -> Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block())
+            assertThatThrownBy(() -> Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block())
                 .isInstanceOf(ObjectNotFoundException.class);
         });
     }
+
+    @Test
+    public void shouldCacheWhenReadBytesWithDefaultBucket() {
+        BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
+    public void shouldCacheWhenReadWithDefaultBucket() {
+        BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(testee().read(DEFAULT_BUCKETNAME, blobId))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenReadBytesWithOutDefaultBucket() {
+        BlobId blobId = Mono.from(backend.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(TEST_BUCKETNAME, blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenReadWithOutDefaultBucket() {
+        BlobId blobId = Mono.from(backend.save(TEST_BUCKETNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(testee().read(TEST_BUCKETNAME, blobId))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenReadWithBigByteArray() {
+        BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(TWELVE_MEGABYTES), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenReadWithBigStream() {
+        BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(TWELVE_MEGABYTES), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(testee().read(DEFAULT_BUCKETNAME, blobId))
+                .hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+        });
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org