You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/27 02:19:43 UTC

[james-project] 07/14: JAMES-3140 Correct usage of push back input stream

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3066696d675258f3970f85f15b36f64f6aefbd34
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Apr 21 17:28:21 2020 +0700

    JAMES-3140 Correct usage of push back input stream
---
 .../james/blob/cassandra/cache/CacheBlobStore.java | 67 ++++++++++++----------
 .../cassandra/cache/BlobStoreCacheContract.java    | 12 ++++
 .../blob/cassandra/cache/CacheBlobStoreTest.java   | 42 +++++++++++++-
 3 files changed, 88 insertions(+), 33 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
index 227ddca..fdddc45 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
@@ -24,11 +24,12 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PushbackInputStream;
+import java.util.Arrays;
+import java.util.Optional;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
@@ -67,8 +68,7 @@ public class CacheBlobStore implements BlobStore {
             .filter(defaultBucket::equals)
             .flatMap(ignored ->
                 Mono.from(cache.read(blobId))
-                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes)))
-            )
+                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))))
             .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
             .blockOptional()
             .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
@@ -97,18 +97,23 @@ public class CacheBlobStore implements BlobStore {
     public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
         Preconditions.checkNotNull(inputStream, "InputStream must not be null");
 
+        if (isAbleToCache(bucketName, storagePolicy)) {
+            return saveInCache(bucketName, inputStream, storagePolicy);
+        }
+
+        return backend.save(bucketName, inputStream, storagePolicy);
+    }
+
+    private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
         PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes + 1);
-        return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
-            .flatMap(blobId ->
-                Mono.fromCallable(() -> isALargeStream(pushbackInputStream))
-                    .flatMap(largeStream -> {
-                        if (!largeStream) {
-                            return Mono.from(saveInCache(bucketName, blobId, pushbackInputStream, storagePolicy))
-                                .thenReturn(blobId);
-                        }
-                        return Mono.just(blobId);
-                    })
-            );
+
+        return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
+            .flatMap(Mono::justOrEmpty)
+            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+            .flatMap(bytes -> Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
+                .flatMap(blobId -> Mono.from(cache.cache(blobId, bytes))
+                    .thenReturn(blobId)))
+            .switchIfEmpty(Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)));
     }
 
     @Override
@@ -130,23 +135,23 @@ public class CacheBlobStore implements BlobStore {
         return Mono.from(backend.deleteBucket(bucketName));
     }
 
-    private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
-        return Mono.fromCallable(() -> copyBytesFromStream(pushbackInputStream))
-            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
-            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
-    }
-
-    private byte[] copyBytesFromStream(PushbackInputStream pushbackInputStream) throws IOException {
-        byte[] bytes = new byte[pushbackInputStream.available()];
-        int read = IOUtils.read(pushbackInputStream, bytes);
-        pushbackInputStream.unread(read);
-        return bytes;
-    }
-
-    private boolean isALargeStream(PushbackInputStream pushbackInputStream) throws IOException {
-        long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1);
-        pushbackInputStream.unread(Math.toIntExact(skip));
-        return skip >= sizeThresholdInBytes;
+    private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException {
+        int sizeToRead = sizeThresholdInBytes + 1;
+        byte[] bytes = new byte[sizeToRead];
+        int readByteCount = pushbackInputStream.read(bytes, 0, sizeToRead);
+        try {
+            if (readByteCount > sizeThresholdInBytes) {
+                return Optional.empty();
+            }
+            if (readByteCount < 0) {
+                return Optional.of(new byte[] {});
+            }
+            return Optional.of(Arrays.copyOf(bytes, readByteCount));
+        } finally {
+            if (readByteCount > 0) {
+                pushbackInputStream.unread(bytes, 0, readByteCount);
+            }
+        }
     }
 
     /**
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
index 6819aca..8d22d3f 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.awaitility.Awaitility.await;
 
+import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
@@ -107,4 +108,15 @@ public interface BlobStoreCacheContract {
         await().atMost(Duration.TWO_SECONDS.plus(Duration.FIVE_HUNDRED_MILLISECONDS)).await().untilAsserted(()
             -> assertThat(Mono.from(testee().read(blobId)).blockOptional()).isEmpty());
     }
+
+    @Test
+    default void readShouldReturnEmptyCachedByteArray() {
+        BlobId blobId = blobIdFactory().randomId();
+        byte[] emptyByteArray = new byte[] {};
+
+        Mono.from(testee().cache(blobId, emptyByteArray)).block();
+
+        assertThat(new ByteArrayInputStream(Mono.from(testee().read(blobId)).block()))
+            .hasSameContentAs(new ByteArrayInputStream(emptyByteArray));
+    }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
index 1c5ff09..a7808ab 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
@@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -45,12 +46,15 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.google.common.base.Strings;
+
 import reactor.core.publisher.Mono;
 
 public class CacheBlobStoreTest implements BlobStoreContract {
 
     private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
     private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+    byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 500).getBytes(StandardCharsets.UTF_8);
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
@@ -139,16 +143,50 @@ public class CacheBlobStoreTest implements BlobStoreContract {
     }
 
     @Test
-    public void shouldNotCacheWhenEmptyStream() {
+    public void shouldCacheWhenEmptyStream() {
         BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
-            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
             assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
         });
     }
 
     @Test
+    public void shouldNotCacheWhenEmptyByteArray() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+        });
+    }
+
+    @Test
+    public void shouldCacheWhenFiveKilobytesSteam() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
+    public void shouldCacheWhenFiveKilobytesBytes() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
     public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
         BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org