You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/12/18 11:23:16 UTC
[james-project] 01/13: JAMES-3008 Reactive BlobPutter
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9f3ed027972034b7823ce0ba15783cf388d39ba0
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Thu Dec 12 15:37:12 2019 +0700
JAMES-3008 Reactive BlobPutter
Avoids two threads waiting for the same Blob operation
---
.../james/blob/objectstorage/BlobPutter.java | 6 ++--
.../blob/objectstorage/ObjectStorageBlobStore.java | 38 +++++++++++---------
.../objectstorage/StreamCompatibleBlobPutter.java | 20 +++++------
.../blob/objectstorage/aws/AwsS3ObjectStorage.java | 40 ++++++++++------------
4 files changed, 53 insertions(+), 51 deletions(-)
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
index 4a229bf..8fe1a2b 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
@@ -24,6 +24,8 @@ import java.util.function.Supplier;
import org.apache.james.blob.api.BlobId;
import org.jclouds.blobstore.domain.Blob;
+import reactor.core.publisher.Mono;
+
/**
* Implementations may have specific behaviour when uploading a blob,
* such cases are not well handled by jClouds.
@@ -36,7 +38,7 @@ import org.jclouds.blobstore.domain.Blob;
public interface BlobPutter {
- void putDirectly(ObjectStorageBucketName bucketName, Blob blob);
+ Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob);
- BlobId putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier);
+ Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier);
}
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
index 8a0628e..fbe82ed 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
@@ -100,16 +100,18 @@ public class ObjectStorageBlobStore implements BlobStore {
Preconditions.checkNotNull(data);
ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
- BlobId blobId = blobIdFactory.forPayload(data);
- Payload payload = payloadCodec.write(data);
+ return Mono.fromCallable(() -> blobIdFactory.forPayload(data))
+ .flatMap(blobId -> {
+ Payload payload = payloadCodec.write(data);
- Blob blob = blobStore.blobBuilder(blobId.asString())
- .payload(payload.getPayload())
- .contentLength(payload.getLength().orElse(Long.valueOf(data.length)))
- .build();
+ Blob blob = blobStore.blobBuilder(blobId.asString())
+ .payload(payload.getPayload())
+ .contentLength(payload.getLength().orElse(Long.valueOf(data.length)))
+ .build();
- return Mono.fromRunnable(() -> blobPutter.putDirectly(resolvedBucketName, blob))
- .thenReturn(blobId);
+ return blobPutter.putDirectly(resolvedBucketName, blob)
+ .thenReturn(blobId);
+ });
}
@Override
@@ -143,15 +145,17 @@ public class ObjectStorageBlobStore implements BlobStore {
private Mono<BlobId> saveBigStream(BucketName bucketName, InputStream data) {
ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
- BlobId tmpId = blobIdFactory.randomId();
- HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
- Payload payload = payloadCodec.write(hashingInputStream);
- Blob blob = blobStore.blobBuilder(tmpId.asString())
- .payload(payload.getPayload())
- .build();
-
- Supplier<BlobId> blobIdSupplier = () -> blobIdFactory.from(hashingInputStream.hash().toString());
- return Mono.fromRunnable(() -> blobPutter.putAndComputeId(resolvedBucketName, blob, blobIdSupplier));
+ return Mono.fromCallable(blobIdFactory::randomId)
+ .flatMap(tmpId -> {
+ HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
+ Payload payload = payloadCodec.write(hashingInputStream);
+ Blob blob = blobStore.blobBuilder(tmpId.asString())
+ .payload(payload.getPayload())
+ .build();
+
+ Supplier<BlobId> blobIdSupplier = () -> blobIdFactory.from(hashingInputStream.hash().toString());
+ return blobPutter.putAndComputeId(resolvedBucketName, blob, blobIdSupplier);
+ });
}
@Override
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
index 30ad89c..269a8a4 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
@@ -50,8 +50,8 @@ public class StreamCompatibleBlobPutter implements BlobPutter {
}
@Override
- public void putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
- Mono.fromRunnable(() -> blobStore.putBlob(bucketName.asString(), blob))
+ public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
+ return Mono.fromRunnable(() -> blobStore.putBlob(bucketName.asString(), blob))
.publishOn(Schedulers.elastic())
.retryWhen(Retry.onlyIf(retryContext -> needToCreateBucket(retryContext.exception(), bucketName))
.exponentialBackoff(FIRST_BACK_OFF, FOREVER)
@@ -62,21 +62,21 @@ public class StreamCompatibleBlobPutter implements BlobPutter {
.withBackoffScheduler(Schedulers.elastic())
.exponentialBackoff(FIRST_BACK_OFF, FOREVER)
.retryMax(RETRY_ONE_LAST_TIME_ON_CONCURRENT_SAVING))
- .block();
+ .then();
}
@Override
- public BlobId putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
- putDirectly(bucketName, initialBlob);
- BlobId finalId = blobIdSupplier.get();
- updateBlobId(bucketName, initialBlob.getMetadata().getName(), finalId.asString());
- return finalId;
+ public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
+ return putDirectly(bucketName, initialBlob)
+ .then(Mono.fromCallable(blobIdSupplier::get))
+ .map(blobId -> updateBlobId(bucketName, initialBlob.getMetadata().getName(), blobId));
}
- private void updateBlobId(ObjectStorageBucketName bucketName, String from, String to) {
+ private BlobId updateBlobId(ObjectStorageBucketName bucketName, String from, BlobId to) {
String bucketNameAsString = bucketName.asString();
- blobStore.copyBlob(bucketNameAsString, from, bucketNameAsString, to, CopyOptions.NONE);
+ blobStore.copyBlob(bucketNameAsString, from, bucketNameAsString, to.asString(), CopyOptions.NONE);
blobStore.removeBlob(bucketNameAsString, from);
+ return to;
}
private boolean needToCreateBucket(Throwable throwable, ObjectStorageBucketName bucketName) {
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index db7cd67..e02577a 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -20,14 +20,13 @@
package org.apache.james.blob.objectstorage.aws;
import java.io.File;
-import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.PreDestroy;
@@ -146,33 +145,30 @@ public class AwsS3ObjectStorage {
}
@Override
- public void putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
- writeFileAndAct(blob, (file) -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY).block());
+ public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
+ return writeFileAndAct(blob, file -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY));
}
@Override
- public BlobId putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
- Consumer<File> putChangedBlob = (file) -> {
+ public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
+ Function<File, Mono<Void>> putChangedBlob = file -> {
initialBlob.getMetadata().setName(blobIdSupplier.get().asString());
- putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY).block();
+ return putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY);
};
- writeFileAndAct(initialBlob, putChangedBlob);
- return blobIdSupplier.get();
+ return writeFileAndAct(initialBlob, putChangedBlob)
+ .then(Mono.fromCallable(blobIdSupplier::get));
}
- private void writeFileAndAct(Blob blob, Consumer<File> putFile) {
- File file = null;
- try {
- file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
- FileUtils.copyToFile(blob.getPayload().openStream(), file);
- putFile.accept(file);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- if (file != null) {
- FileUtils.deleteQuietly(file);
- }
- }
+ private Mono<Void> writeFileAndAct(Blob blob, Function<File, Mono<Void>> putFile) {
+ return Mono.using(
+ () -> {
+ File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
+ FileUtils.copyToFile(blob.getPayload().openStream(), file);
+ return file;
+ },
+ putFile::apply,
+ FileUtils::deleteQuietly
+ );
}
private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org