You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/04/30 07:28:47 UTC

[james-project] 11/11: JAMES-3140: Hiding technical details into class for CachedBlobStore

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 22d8d93ee74e4052f6b191c12330a43cbc76445f
Author: ducnv <du...@gmail.com>
AuthorDate: Tue Apr 28 17:17:37 2020 +0700

    JAMES-3140: Hiding technical details into class for CachedBlobStore
---
 .../blob/cassandra/cache/CachedBlobStore.java      | 141 ++++++++++++---------
 1 file changed, 84 insertions(+), 57 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 86aead0..04857e4 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -42,6 +42,62 @@ import com.google.common.base.Preconditions;
 import reactor.core.publisher.Mono;
 
 public class CachedBlobStore implements BlobStore {
+
+    private static class ReadAheadInputStream {
+
+        @FunctionalInterface
+        interface RequireStream {
+            RequireLength of(InputStream in);
+        }
+
+        interface RequireLength {
+            ReadAheadInputStream length(int length) throws IOException;
+        }
+
+
+        static RequireStream eager() {
+            return in -> length -> {
+                //+1 is to evaluate hasMore
+                var stream = new PushbackInputStream(in, length + 1);
+                var bytes = new byte[length];
+                int readByteCount = IOUtils.read(stream, bytes);
+                Optional<byte[]> firstBytes;
+                boolean hasMore;
+                if (readByteCount < 0) {
+                    firstBytes = Optional.empty();
+                    hasMore = false;
+                } else {
+                    byte[] readBytes = Arrays.copyOf(bytes, readByteCount);
+                    hasMore = hasMore(stream);
+                    stream.unread(readBytes);
+                    firstBytes = Optional.of(readBytes);
+                }
+                return new ReadAheadInputStream(stream, firstBytes, hasMore);
+            };
+        }
+
+        private static boolean hasMore(PushbackInputStream stream) throws IOException {
+            int nextByte = stream.read();
+            if (nextByte >= 0) {
+                stream.unread(nextByte);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        final PushbackInputStream in;
+        final Optional<byte[]> firstBytes;
+        final boolean hasMore;
+
+        private ReadAheadInputStream(PushbackInputStream in, Optional<byte[]> firstBytes, boolean hasMore) {
+            this.in = in;
+            this.firstBytes = firstBytes;
+            this.hasMore = hasMore;
+        }
+
+    }
+
     private final BlobStoreCache cache;
     private final BlobStore backend;
     private final Integer sizeThresholdInBytes;
@@ -61,8 +117,10 @@ public class CachedBlobStore implements BlobStore {
             .flatMap(ignored -> readFromCache(blobId)
                 .flatMap(this::toInputStream))
             .switchIfEmpty(readFromBackend(bucketName, blobId)
-                .map(this::toPushbackStream)
-                .flatMap(pushbackInputStream -> saveInCache(pushbackInputStream, blobId, bucketName)))
+                .flatMap(inputStream ->
+                    Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes))
+                        .flatMap(readAheadInputStream -> putInCacheIfNeeded(bucketName, readAheadInputStream, blobId)
+                            .thenReturn(readAheadInputStream.in))))
             .blockOptional()
             .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
     }
@@ -94,8 +152,7 @@ public class CachedBlobStore implements BlobStore {
         Preconditions.checkNotNull(inputStream, "InputStream must not be null");
 
         if (isAbleToCache(bucketName, storagePolicy)) {
-            return Mono.fromCallable(() -> toPushbackStream(inputStream))
-                .flatMap(pushbackInputStream -> saveInCache(bucketName, pushbackInputStream, storagePolicy));
+            return saveInCache(bucketName, inputStream, storagePolicy);
         }
 
         return backend.save(bucketName, inputStream, storagePolicy);
@@ -120,64 +177,47 @@ public class CachedBlobStore implements BlobStore {
         return Mono.from(backend.deleteBucket(bucketName));
     }
 
-    private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException {
-        byte[] bytes = new byte[sizeThresholdInBytes];
-        int readByteCount = IOUtils.read(pushbackInputStream, bytes);
-        int extraByte = pushbackInputStream.read();
-        try {
-            if (extraByte >= 0) {
-                return Optional.empty();
-            }
-            if (readByteCount < 0) {
-                return Optional.of(new byte[] {});
-            }
-            return Optional.of(Arrays.copyOf(bytes, readByteCount));
-        } finally {
-            if (extraByte >= 0) {
-                pushbackInputStream.unread(extraByte);
-            }
-            if (readByteCount > 0) {
-                pushbackInputStream.unread(bytes, 0, readByteCount);
-            }
-        }
+    private Mono<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
+        return Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes))
+            .flatMap(readAhead -> saveToBackend(bucketName, storagePolicy, readAhead)
+                .flatMap(blobId -> putInCacheIfNeeded(bucketName, storagePolicy, readAhead, blobId)
+                    .thenReturn(blobId)));
     }
 
-    private Mono<BlobId> saveInCache(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
-        return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
-            .flatMap(Mono::justOrEmpty)
-            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
-            .flatMap(bytes -> saveInBackend(bucketName, pushbackInputStream, storagePolicy)
-                .flatMap(blobId -> saveInCache(blobId, bytes).thenReturn(blobId)))
-            .switchIfEmpty(saveInBackend(bucketName, pushbackInputStream, storagePolicy));
+    private Mono<BlobId> saveToBackend(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead) {
+        return Mono.from(backend.save(bucketName, readAhead.in, storagePolicy));
     }
 
-    private Mono<BlobId> saveInBackend(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
-        return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy));
+    private Mono<Void> putInCacheIfNeeded(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead, BlobId blobId) {
+        return Mono.justOrEmpty(readAhead.firstBytes)
+            .filter(bytes -> isAbleToCache(bucketName, readAhead, storagePolicy))
+            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
     }
 
-    private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
-        return Mono.from(cache.cache(blobId, bytes));
+    private Mono<Void> putInCacheIfNeeded(BucketName bucketName, ReadAheadInputStream readAhead, BlobId blobId) {
+        return Mono.justOrEmpty(readAhead.firstBytes)
+            .filter(bytes -> isAbleToCache(readAhead, bucketName))
+            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
     }
 
-    private Mono<InputStream> saveInCache(PushbackInputStream pushbackInputStream, BlobId blobId, BucketName bucketName) {
-        return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
-            .flatMap(Mono::justOrEmpty)
-            .filter(bytes -> isAbleToCache(bytes, bucketName))
-            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))
-                .map(ignore -> pushbackBytesArrayRead(pushbackInputStream, bytes)))
-            .then(Mono.just(pushbackInputStream));
+    private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
+        return Mono.from(cache.cache(blobId, bytes));
     }
 
     private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
         return isAbleToCache(bucketName, storagePolicy) && isAbleToCache(bytes);
     }
 
+    private boolean isAbleToCache(BucketName bucketName, ReadAheadInputStream readAhead, StoragePolicy storagePolicy) {
+        return isAbleToCache(bucketName, storagePolicy) && !readAhead.hasMore;
+    }
+
     private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) {
         return backend.getDefaultBucketName().equals(bucketName) && !storagePolicy.equals(LOW_COST);
     }
 
-    private boolean isAbleToCache(byte[] bytes, BucketName bucketName) {
-        return isAbleToCache(bytes) && backend.getDefaultBucketName().equals(bucketName);
+    private boolean isAbleToCache(ReadAheadInputStream readAhead, BucketName bucketName) {
+        return !readAhead.hasMore && backend.getDefaultBucketName().equals(bucketName);
     }
 
     private boolean isAbleToCache(byte[] bytes) {
@@ -192,10 +232,6 @@ public class CachedBlobStore implements BlobStore {
         return Mono.fromCallable(() -> backend.read(bucketName, blobId));
     }
 
-    private PushbackInputStream toPushbackStream(InputStream inputStream) {
-        return new PushbackInputStream(inputStream, sizeThresholdInBytes);
-    }
-
     private Mono<byte[]> readFromCache(BlobId blobId) {
         return Mono.from(cache.read(blobId));
     }
@@ -203,13 +239,4 @@ public class CachedBlobStore implements BlobStore {
     private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
         return Mono.from(backend.readBytes(bucketName, blobId));
     }
-
-    private Mono<Void> pushbackBytesArrayRead(PushbackInputStream pushbackInputStream, byte[] bytes) {
-        try {
-            pushbackInputStream.unread(bytes);
-        } catch (IOException e) {
-            // Ignore
-        }
-        return Mono.empty();
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org