You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/27 02:19:43 UTC
[james-project] 07/14: JAMES-3140 Correct usage of push back input
stream
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3066696d675258f3970f85f15b36f64f6aefbd34
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Apr 21 17:28:21 2020 +0700
JAMES-3140 Correct usage of push back input stream
---
.../james/blob/cassandra/cache/CacheBlobStore.java | 67 ++++++++++++----------
.../cassandra/cache/BlobStoreCacheContract.java | 12 ++++
.../blob/cassandra/cache/CacheBlobStoreTest.java | 42 +++++++++++++-
3 files changed, 88 insertions(+), 33 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
index 227ddca..fdddc45 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
@@ -24,11 +24,12 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
+import java.util.Arrays;
+import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Named;
-import org.apache.commons.io.IOUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
@@ -67,8 +68,7 @@ public class CacheBlobStore implements BlobStore {
.filter(defaultBucket::equals)
.flatMap(ignored ->
Mono.from(cache.read(blobId))
- .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes)))
- )
+ .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))))
.switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
.blockOptional()
.orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
@@ -97,18 +97,23 @@ public class CacheBlobStore implements BlobStore {
public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
Preconditions.checkNotNull(inputStream, "InputStream must not be null");
+ if (isAbleToCache(bucketName, storagePolicy)) {
+ return saveInCache(bucketName, inputStream, storagePolicy);
+ }
+
+ return backend.save(bucketName, inputStream, storagePolicy);
+ }
+
+ private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes + 1);
- return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
- .flatMap(blobId ->
- Mono.fromCallable(() -> isALargeStream(pushbackInputStream))
- .flatMap(largeStream -> {
- if (!largeStream) {
- return Mono.from(saveInCache(bucketName, blobId, pushbackInputStream, storagePolicy))
- .thenReturn(blobId);
- }
- return Mono.just(blobId);
- })
- );
+
+ return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
+ .flatMap(Mono::justOrEmpty)
+ .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+ .flatMap(bytes -> Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
+ .flatMap(blobId -> Mono.from(cache.cache(blobId, bytes))
+ .thenReturn(blobId)))
+ .switchIfEmpty(Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)));
}
@Override
@@ -130,23 +135,23 @@ public class CacheBlobStore implements BlobStore {
return Mono.from(backend.deleteBucket(bucketName));
}
- private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
- return Mono.fromCallable(() -> copyBytesFromStream(pushbackInputStream))
- .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
- .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
- }
-
- private byte[] copyBytesFromStream(PushbackInputStream pushbackInputStream) throws IOException {
- byte[] bytes = new byte[pushbackInputStream.available()];
- int read = IOUtils.read(pushbackInputStream, bytes);
- pushbackInputStream.unread(read);
- return bytes;
- }
-
- private boolean isALargeStream(PushbackInputStream pushbackInputStream) throws IOException {
- long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1);
- pushbackInputStream.unread(Math.toIntExact(skip));
- return skip >= sizeThresholdInBytes;
+ private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException {
+ int sizeToRead = sizeThresholdInBytes + 1;
+ byte[] bytes = new byte[sizeToRead];
+ int readByteCount = pushbackInputStream.read(bytes, 0, sizeToRead);
+ try {
+ if (readByteCount > sizeThresholdInBytes) {
+ return Optional.empty();
+ }
+ if (readByteCount < 0) {
+ return Optional.of(new byte[] {});
+ }
+ return Optional.of(Arrays.copyOf(bytes, readByteCount));
+ } finally {
+ if (readByteCount > 0) {
+ pushbackInputStream.unread(bytes, 0, readByteCount);
+ }
+ }
}
/**
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
index 6819aca..8d22d3f 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.awaitility.Awaitility.await;
+import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@@ -107,4 +108,15 @@ public interface BlobStoreCacheContract {
await().atMost(Duration.TWO_SECONDS.plus(Duration.FIVE_HUNDRED_MILLISECONDS)).await().untilAsserted(()
-> assertThat(Mono.from(testee().read(blobId)).blockOptional()).isEmpty());
}
+
+ @Test
+ default void readShouldReturnEmptyCachedByteArray() {
+ BlobId blobId = blobIdFactory().randomId();
+ byte[] emptyByteArray = new byte[] {};
+
+ Mono.from(testee().cache(blobId, emptyByteArray)).block();
+
+ assertThat(new ByteArrayInputStream(Mono.from(testee().read(blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(emptyByteArray));
+ }
}
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
index 1c5ff09..a7808ab 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
@@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -45,12 +46,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import com.google.common.base.Strings;
+
import reactor.core.publisher.Mono;
public class CacheBlobStoreTest implements BlobStoreContract {
private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+ byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 500).getBytes(StandardCharsets.UTF_8);
@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
@@ -139,16 +143,50 @@ public class CacheBlobStoreTest implements BlobStoreContract {
}
@Test
- public void shouldNotCacheWhenEmptyStream() {
+ public void shouldCacheWhenEmptyStream() {
BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
SoftAssertions.assertSoftly(soflty -> {
- assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+ assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
});
}
@Test
+ public void shouldNotCacheWhenEmptyByteArray() {
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
+ assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+ });
+ }
+
+ @Test
+ public void shouldCacheWhenFiveKilobytesSteam() {
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ });
+ }
+
+ @Test
+ public void shouldCacheWhenFiveKilobytesBytes() {
+ BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+ SoftAssertions.assertSoftly(soflty -> {
+ assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+ .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+ });
+ }
+
+ @Test
public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org