You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/04/30 07:28:47 UTC
[james-project] 11/11: JAMES-3140: Hiding technical details into
class for CachedBlobStore
This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 22d8d93ee74e4052f6b191c12330a43cbc76445f
Author: ducnv <du...@gmail.com>
AuthorDate: Tue Apr 28 17:17:37 2020 +0700
JAMES-3140: Hiding technical details into class for CachedBlobStore
---
.../blob/cassandra/cache/CachedBlobStore.java | 141 ++++++++++++---------
1 file changed, 84 insertions(+), 57 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 86aead0..04857e4 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -42,6 +42,62 @@ import com.google.common.base.Preconditions;
import reactor.core.publisher.Mono;
public class CachedBlobStore implements BlobStore {
+
+ private static class ReadAheadInputStream {
+
+ @FunctionalInterface
+ interface RequireStream {
+ RequireLength of(InputStream in);
+ }
+
+ interface RequireLength {
+ ReadAheadInputStream length(int length) throws IOException;
+ }
+
+
+ static RequireStream eager() {
+ return in -> length -> {
+ //+1 is to evaluate hasMore
+ var stream = new PushbackInputStream(in, length + 1);
+ var bytes = new byte[length];
+ int readByteCount = IOUtils.read(stream, bytes);
+ Optional<byte[]> firstBytes;
+ boolean hasMore;
+ if (readByteCount < 0) {
+ firstBytes = Optional.empty();
+ hasMore = false;
+ } else {
+ byte[] readBytes = Arrays.copyOf(bytes, readByteCount);
+ hasMore = hasMore(stream);
+ stream.unread(readBytes);
+ firstBytes = Optional.of(readBytes);
+ }
+ return new ReadAheadInputStream(stream, firstBytes, hasMore);
+ };
+ }
+
+ private static boolean hasMore(PushbackInputStream stream) throws IOException {
+ int nextByte = stream.read();
+ if (nextByte >= 0) {
+ stream.unread(nextByte);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ final PushbackInputStream in;
+ final Optional<byte[]> firstBytes;
+ final boolean hasMore;
+
+ private ReadAheadInputStream(PushbackInputStream in, Optional<byte[]> firstBytes, boolean hasMore) {
+ this.in = in;
+ this.firstBytes = firstBytes;
+ this.hasMore = hasMore;
+ }
+
+ }
+
private final BlobStoreCache cache;
private final BlobStore backend;
private final Integer sizeThresholdInBytes;
@@ -61,8 +117,10 @@ public class CachedBlobStore implements BlobStore {
.flatMap(ignored -> readFromCache(blobId)
.flatMap(this::toInputStream))
.switchIfEmpty(readFromBackend(bucketName, blobId)
- .map(this::toPushbackStream)
- .flatMap(pushbackInputStream -> saveInCache(pushbackInputStream, blobId, bucketName)))
+ .flatMap(inputStream ->
+ Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes))
+ .flatMap(readAheadInputStream -> putInCacheIfNeeded(bucketName, readAheadInputStream, blobId)
+ .thenReturn(readAheadInputStream.in))))
.blockOptional()
.orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
}
@@ -94,8 +152,7 @@ public class CachedBlobStore implements BlobStore {
Preconditions.checkNotNull(inputStream, "InputStream must not be null");
if (isAbleToCache(bucketName, storagePolicy)) {
- return Mono.fromCallable(() -> toPushbackStream(inputStream))
- .flatMap(pushbackInputStream -> saveInCache(bucketName, pushbackInputStream, storagePolicy));
+ return saveInCache(bucketName, inputStream, storagePolicy);
}
return backend.save(bucketName, inputStream, storagePolicy);
@@ -120,64 +177,47 @@ public class CachedBlobStore implements BlobStore {
return Mono.from(backend.deleteBucket(bucketName));
}
- private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException {
- byte[] bytes = new byte[sizeThresholdInBytes];
- int readByteCount = IOUtils.read(pushbackInputStream, bytes);
- int extraByte = pushbackInputStream.read();
- try {
- if (extraByte >= 0) {
- return Optional.empty();
- }
- if (readByteCount < 0) {
- return Optional.of(new byte[] {});
- }
- return Optional.of(Arrays.copyOf(bytes, readByteCount));
- } finally {
- if (extraByte >= 0) {
- pushbackInputStream.unread(extraByte);
- }
- if (readByteCount > 0) {
- pushbackInputStream.unread(bytes, 0, readByteCount);
- }
- }
+ private Mono<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
+ return Mono.fromCallable(() -> ReadAheadInputStream.eager().of(inputStream).length(sizeThresholdInBytes))
+ .flatMap(readAhead -> saveToBackend(bucketName, storagePolicy, readAhead)
+ .flatMap(blobId -> putInCacheIfNeeded(bucketName, storagePolicy, readAhead, blobId)
+ .thenReturn(blobId)));
}
- private Mono<BlobId> saveInCache(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
- return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
- .flatMap(Mono::justOrEmpty)
- .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
- .flatMap(bytes -> saveInBackend(bucketName, pushbackInputStream, storagePolicy)
- .flatMap(blobId -> saveInCache(blobId, bytes).thenReturn(blobId)))
- .switchIfEmpty(saveInBackend(bucketName, pushbackInputStream, storagePolicy));
+ private Mono<BlobId> saveToBackend(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead) {
+ return Mono.from(backend.save(bucketName, readAhead.in, storagePolicy));
}
- private Mono<BlobId> saveInBackend(BucketName bucketName, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
- return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy));
+ private Mono<Void> putInCacheIfNeeded(BucketName bucketName, StoragePolicy storagePolicy, ReadAheadInputStream readAhead, BlobId blobId) {
+ return Mono.justOrEmpty(readAhead.firstBytes)
+ .filter(bytes -> isAbleToCache(bucketName, readAhead, storagePolicy))
+ .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
}
- private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
- return Mono.from(cache.cache(blobId, bytes));
+ private Mono<Void> putInCacheIfNeeded(BucketName bucketName, ReadAheadInputStream readAhead, BlobId blobId) {
+ return Mono.justOrEmpty(readAhead.firstBytes)
+ .filter(bytes -> isAbleToCache(readAhead, bucketName))
+ .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
}
- private Mono<InputStream> saveInCache(PushbackInputStream pushbackInputStream, BlobId blobId, BucketName bucketName) {
- return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
- .flatMap(Mono::justOrEmpty)
- .filter(bytes -> isAbleToCache(bytes, bucketName))
- .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))
- .map(ignore -> pushbackBytesArrayRead(pushbackInputStream, bytes)))
- .then(Mono.just(pushbackInputStream));
+ private Mono<Void> saveInCache(BlobId blobId, byte[] bytes) {
+ return Mono.from(cache.cache(blobId, bytes));
}
private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
return isAbleToCache(bucketName, storagePolicy) && isAbleToCache(bytes);
}
+ private boolean isAbleToCache(BucketName bucketName, ReadAheadInputStream readAhead, StoragePolicy storagePolicy) {
+ return isAbleToCache(bucketName, storagePolicy) && !readAhead.hasMore;
+ }
+
private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) {
return backend.getDefaultBucketName().equals(bucketName) && !storagePolicy.equals(LOW_COST);
}
- private boolean isAbleToCache(byte[] bytes, BucketName bucketName) {
- return isAbleToCache(bytes) && backend.getDefaultBucketName().equals(bucketName);
+ private boolean isAbleToCache(ReadAheadInputStream readAhead, BucketName bucketName) {
+ return !readAhead.hasMore && backend.getDefaultBucketName().equals(bucketName);
}
private boolean isAbleToCache(byte[] bytes) {
@@ -192,10 +232,6 @@ public class CachedBlobStore implements BlobStore {
return Mono.fromCallable(() -> backend.read(bucketName, blobId));
}
- private PushbackInputStream toPushbackStream(InputStream inputStream) {
- return new PushbackInputStream(inputStream, sizeThresholdInBytes);
- }
-
private Mono<byte[]> readFromCache(BlobId blobId) {
return Mono.from(cache.read(blobId));
}
@@ -203,13 +239,4 @@ public class CachedBlobStore implements BlobStore {
private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
return Mono.from(backend.readBytes(bucketName, blobId));
}
-
- private Mono<Void> pushbackBytesArrayRead(PushbackInputStream pushbackInputStream, byte[] bytes) {
- try {
- pushbackInputStream.unread(bytes);
- } catch (IOException e) {
- // Ignore
- }
- return Mono.empty();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org