You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/04/30 07:28:46 UTC
[james-project] 10/11: JAMES-3140: cache when read with the same
rule with save, add test
This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4c456e28b15ad26fad9ebdaaae5982e3f1098e39
Author: ducnv <du...@gmail.com>
AuthorDate: Wed Apr 22 18:22:58 2020 +0700
JAMES-3140: cache when read with the same rule with save, add test
---
.../blob/cassandra/cache/CachedBlobStore.java | 101 +++++++++++++----
.../blob/cassandra/cache/CachedBlobStoreTest.java | 124 +++++++++++++++++----
2 files changed, 179 insertions(+), 46 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 3d4abad..86aead0 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -57,10 +57,12 @@ public class CachedBlobStore implements BlobStore {
@Override
public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
return Mono.just(bucketName)
- .filter(backend.getDefaultBucketName()::equals)
- .flatMap(ignored -> Mono.from(cache.read(blobId))
- .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))))
- .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
+ .filter(getDefaultBucketName()::equals)
+ .flatMap(ignored -> readFromCache(blobId)
+ .flatMap(this::toInputStream))
+ .switchIfEmpty(readFromBackend(bucketName, blobId)
+ .map(this::toPushbackStream)
+ .flatMap(pushbackInputStream -> saveInCache(pushbackInputStream, blobId, bucketName)))
.blockOptional()
.orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
}
@@ -68,9 +70,12 @@ public class CachedBlobStore implements BlobStore {
@Override
public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
return Mono.just(bucketName)
- .filter(backend.getDefaultBucketName()::equals)
- .flatMap(ignored -> Mono.from(cache.read(blobId)))
- .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
+ .filter(getDefaultBucketName()::equals)
+ .flatMap(ignored -> readFromCache(blobId)
+ .switchIfEmpty(readBytesFromBackend(bucketName, blobId)
+ .filter(this::isAbleToCache)
+ .flatMap(bytes -> saveInCache(blobId, bytes).then(Mono.just(bytes)))))
+ .switchIfEmpty(readBytesFromBackend(bucketName, blobId));
}
@Override
@@ -78,7 +83,7 @@ public class CachedBlobStore implements BlobStore {
return Mono.from(backend.save(bucketName, bytes, storagePolicy))
.flatMap(blobId -> {
if (isAbleToCache(bucketName, bytes, storagePolicy)) {
- return Mono.from(cache.cache(blobId, bytes)).thenReturn(blobId);
+ return saveInCache(blobId, bytes).thenReturn(blobId);
}
return Mono.just(blobId);
});
@@ -89,24 +94,13 @@ public class CachedBlobStore implements BlobStore {
Preconditions.checkNotNull(inputStream, "InputStream must not be null");
if (isAbleToCache(bucketName, storagePolicy)) {
- return saveInCache(bucketName, inputStream, storagePolicy);
+ return Mono.fromCallable(() -> toPushbackStream(inputStream))
+ .flatMap(pushbackInputStream -> saveInCache(bucketName, pushbackInputStream, storagePolicy));
}
return backend.save(bucketName, inputStream, storagePolicy);
}
- private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
- PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes);
-
- return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
- .flatMap(Mono::justOrEmpty)
- .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
- .flatMap(bytes -> Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
- .flatMap(blobId -> Mono.from(cache.cache(blobId, bytes))
- .thenReturn(blobId)))
- .switchIfEmpty(Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)));
- }
-
@Override
public BucketName getDefaultBucketName() {
return backend.getDefaultBucketName();
@@ -148,11 +142,74 @@ public class CachedBlobStore implements BlobStore {
}
}
+ private Mono<BlobId> saveInCache(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
+ return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
+ .flatMap(Mono::justOrEmpty)
+ .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+ .flatMap(bytes -> saveInBackend(bucketName, pushbackInputStream, storagePolicy)
+ .flatMap(blobId -> saveInCache(blobId, bytes).thenReturn(blobId)))
+ .switchIfEmpty(saveInBackend(bucketName, pushbackInputStream, storagePolicy));
+ }
+
+ private Mono<BlobId> saveInBackend(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
+ return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy));
+ }
+
+ private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
+ return Mono.from(cache.cache(blobId, bytes));
+ }
+
+ private Mono<InputStream> saveInCache(PushbackInputStream pushbackInputStream, BlobId blobId, BucketName bucketName) {
+ return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
+ .flatMap(Mono::justOrEmpty)
+ .filter(bytes -> isAbleToCache(bytes, bucketName))
+ .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))
+ .map(ignore -> pushbackBytesArrayRead(pushbackInputStream, bytes)))
+ .then(Mono.just(pushbackInputStream));
+ }
+
private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
- return isAbleToCache(bucketName, storagePolicy) && bytes.length <= sizeThresholdInBytes;
+ return isAbleToCache(bucketName, storagePolicy) && isAbleToCache(bytes);
}
private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) {
return backend.getDefaultBucketName().equals(bucketName) && !storagePolicy.equals(LOW_COST);
}
+
+ private boolean isAbleToCache(byte[] bytes, BucketName bucketName) {
+ return isAbleToCache(bytes) && backend.getDefaultBucketName().equals(bucketName);
+ }
+
+ private boolean isAbleToCache(byte[] bytes) {
+ return bytes.length <= sizeThresholdInBytes;
+ }
+
+ private Mono<InputStream> toInputStream(byte[] bytes) {
+ return Mono.fromCallable(() -> new ByteArrayInputStream(bytes));
+ }
+
+ private Mono<InputStream> readFromBackend(BucketName bucketName, BlobId blobId) {
+ return Mono.fromCallable(() -> backend.read(bucketName, blobId));
+ }
+
+ private PushbackInputStream toPushbackStream(InputStream inputStream) {
+ return new PushbackInputStream(inputStream, sizeThresholdInBytes);
+ }
+
+ private Mono<byte[]> readFromCache(BlobId blobId) {
+ return Mono.from(cache.read(blobId));
+ }
+
+ private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
+ return Mono.from(backend.readBytes(bucketName, blobId));
+ }
+
+ private Mono<Void> pushbackBytesArrayRead(PushbackInputStream pushbackInputStream, byte[] bytes) {
+ try {
+ pushbackInputStream.unread(bytes);
+ } catch (IOException e) {
+ // Ignore
+ }
+ return Mono.empty();
+ }
}
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index debea77..8da8e61 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -29,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -52,8 +53,8 @@ import reactor.core.publisher.Mono;
public class CachedBlobStoreTest implements BlobStoreContract {
- private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
- private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+ private static final BucketName DEFAULT_BUCKETNAME = DEFAULT;
+ private static final BucketName TEST_BUCKETNAME = BucketName.of("test");
byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 500).getBytes(StandardCharsets.UTF_8);
@RegisterExtension
@@ -69,6 +70,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
backend = CassandraBlobStore.forTesting(cassandra.getConf());
CassandraCacheConfiguration cacheConfig = new CassandraCacheConfiguration.Builder()
.sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1)
+ .timeOut(Duration.ofSeconds(60))
.build();
cache = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfig);
testee = new CachedBlobStore(cache, backend, cacheConfig);
@@ -86,7 +88,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
@Test
public void shouldCacheWhenDefaultBucketName() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
byte[] actual = Mono.from(cache.read(blobId)).block();
assertThat(actual).containsExactly(EIGHT_KILOBYTES);
@@ -94,108 +96,182 @@ public class CachedBlobStoreTest implements BlobStoreContract {
@Test
public void shouldNotCacheWhenNotDefaultBucketName() {
- BlobId blobId = Mono.from(testee().save(TEST_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(TEST_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
SoftAssertions.assertSoftly(ignored -> {
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- assertThat(Mono.from(backend.readBytes(TEST_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+ assertThat(Mono.from(backend.readBytes(TEST_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
});
}
@Test
public void shouldNotCacheWhenDefaultBucketNameAndBigByteDataAndSizeBase() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, TWELVE_MEGABYTES, SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, TWELVE_MEGABYTES, SIZE_BASED)).block();
SoftAssertions.assertSoftly(ignored -> {
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(TWELVE_MEGABYTES);
+ assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(TWELVE_MEGABYTES);
});
}
@Test
public void shouldSavedBothInCacheAndBackendWhenSizeBase() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
- assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+ assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
});
}
@Test
public void shouldSavedBothInCacheAndBackendWhenHighPerformance() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, HIGH_PERFORMANCE)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, HIGH_PERFORMANCE)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
- assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+ assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
});
}
@Test
public void shouldNotCacheWhenLowCost() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, LOW_COST)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, LOW_COST)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+ assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
});
}
@Test
public void shouldCacheWhenEmptyStream() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
- assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+ assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
});
}
@Test
public void shouldNotCacheWhenEmptyByteArray() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
- assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+ assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
});
}
@Test
public void shouldCacheWhenFiveKilobytesSteam() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
- assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+ assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
});
}
@Test
public void shouldCacheWhenFiveKilobytesBytes() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
SoftAssertions.assertSoftly(soflty -> {
assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
- assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+ assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
.hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
});
}
@Test
public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
- BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKETNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
SoftAssertions.assertSoftly(ignored -> {
- assertThatCode(Mono.from(testee().delete(DEFAULT_BUCKERNAME, blobId))::block)
+ assertThatCode(Mono.from(testee().delete(DEFAULT_BUCKETNAME, blobId))::block)
.doesNotThrowAnyException();
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
- assertThatThrownBy(() -> Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block())
+ assertThatThrownBy(() -> Mono.from(backend.readBytes(DEFAULT_BUCKETNAME, blobId)).block())
.isInstanceOf(ObjectNotFoundException.class);
});
}
+
+ @Test
+ public void shouldCacheWhenReadBytesWithDefaultBucket() {
+ BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ });
+ }
+
+ @Test
+ public void shouldCacheWhenReadWithDefaultBucket() {
+ BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ assertThat(testee().read(DEFAULT_BUCKETNAME, blobId))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ });
+ }
+
+ @Test
+ public void shouldNotCacheWhenReadBytesWithOutDefaultBucket() {
+ BlobId blobId = Mono.from(backend.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(TEST_BUCKETNAME, blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ });
+ }
+
+ @Test
+ public void shouldNotCacheWhenReadWithOutDefaultBucket() {
+ BlobId blobId = Mono.from(backend.save(TEST_BUCKETNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ assertThat(testee().read(TEST_BUCKETNAME, blobId))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ });
+ }
+
+ @Test
+ public void shouldNotCacheWhenReadWithBigByteArray() {
+ BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(TWELVE_MEGABYTES), SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ });
+ }
+
+ @Test
+ public void shouldNotCacheWhenReadWithBigStream() {
+ BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, new ByteArrayInputStream(TWELVE_MEGABYTES), SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ assertThat(testee().read(DEFAULT_BUCKETNAME, blobId))
+ .hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
+ assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org