You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/12/18 11:23:16 UTC

[james-project] 01/13: JAMES-3008 Reactive BlobPutter

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9f3ed027972034b7823ce0ba15783cf388d39ba0
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Thu Dec 12 15:37:12 2019 +0700

    JAMES-3008 Reactive BlobPutter
    
    Avoids two threads waiting for the same Blob operation
---
 .../james/blob/objectstorage/BlobPutter.java       |  6 ++--
 .../blob/objectstorage/ObjectStorageBlobStore.java | 38 +++++++++++---------
 .../objectstorage/StreamCompatibleBlobPutter.java  | 20 +++++------
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 40 ++++++++++------------
 4 files changed, 53 insertions(+), 51 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
index 4a229bf..8fe1a2b 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
@@ -24,6 +24,8 @@ import java.util.function.Supplier;
 import org.apache.james.blob.api.BlobId;
 import org.jclouds.blobstore.domain.Blob;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Implementations may have specific behaviour when uploading a blob,
  * such cases are not well handled by jClouds.
@@ -36,7 +38,7 @@ import org.jclouds.blobstore.domain.Blob;
 
 public interface BlobPutter {
 
-    void putDirectly(ObjectStorageBucketName bucketName, Blob blob);
+    Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob);
 
-    BlobId putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier);
+    Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier);
 }
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
index 8a0628e..fbe82ed 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
@@ -100,16 +100,18 @@ public class ObjectStorageBlobStore implements BlobStore {
         Preconditions.checkNotNull(data);
         ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        BlobId blobId = blobIdFactory.forPayload(data);
-        Payload payload = payloadCodec.write(data);
+        return Mono.fromCallable(() -> blobIdFactory.forPayload(data))
+            .flatMap(blobId -> {
+                Payload payload = payloadCodec.write(data);
 
-        Blob blob = blobStore.blobBuilder(blobId.asString())
-            .payload(payload.getPayload())
-            .contentLength(payload.getLength().orElse(Long.valueOf(data.length)))
-            .build();
+                Blob blob = blobStore.blobBuilder(blobId.asString())
+                    .payload(payload.getPayload())
+                    .contentLength(payload.getLength().orElse(Long.valueOf(data.length)))
+                    .build();
 
-        return Mono.fromRunnable(() -> blobPutter.putDirectly(resolvedBucketName, blob))
-            .thenReturn(blobId);
+                return blobPutter.putDirectly(resolvedBucketName, blob)
+                    .thenReturn(blobId);
+            });
     }
 
     @Override
@@ -143,15 +145,17 @@ public class ObjectStorageBlobStore implements BlobStore {
     private Mono<BlobId> saveBigStream(BucketName bucketName, InputStream data) {
         ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        BlobId tmpId = blobIdFactory.randomId();
-        HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
-        Payload payload = payloadCodec.write(hashingInputStream);
-        Blob blob = blobStore.blobBuilder(tmpId.asString())
-                            .payload(payload.getPayload())
-                            .build();
-
-        Supplier<BlobId> blobIdSupplier = () -> blobIdFactory.from(hashingInputStream.hash().toString());
-        return Mono.fromRunnable(() -> blobPutter.putAndComputeId(resolvedBucketName, blob, blobIdSupplier));
+        return Mono.fromCallable(blobIdFactory::randomId)
+            .flatMap(tmpId -> {
+                HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
+                Payload payload = payloadCodec.write(hashingInputStream);
+                Blob blob = blobStore.blobBuilder(tmpId.asString())
+                    .payload(payload.getPayload())
+                    .build();
+
+                Supplier<BlobId> blobIdSupplier = () -> blobIdFactory.from(hashingInputStream.hash().toString());
+                return blobPutter.putAndComputeId(resolvedBucketName, blob, blobIdSupplier);
+            });
     }
 
     @Override
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
index 30ad89c..269a8a4 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
@@ -50,8 +50,8 @@ public class StreamCompatibleBlobPutter implements BlobPutter {
     }
 
     @Override
-    public void putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
-        Mono.fromRunnable(() -> blobStore.putBlob(bucketName.asString(), blob))
+    public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
+        return Mono.fromRunnable(() -> blobStore.putBlob(bucketName.asString(), blob))
             .publishOn(Schedulers.elastic())
             .retryWhen(Retry.onlyIf(retryContext -> needToCreateBucket(retryContext.exception(), bucketName))
                 .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
@@ -62,21 +62,21 @@ public class StreamCompatibleBlobPutter implements BlobPutter {
                 .withBackoffScheduler(Schedulers.elastic())
                 .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
                 .retryMax(RETRY_ONE_LAST_TIME_ON_CONCURRENT_SAVING))
-            .block();
+            .then();
     }
 
     @Override
-    public BlobId putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
-        putDirectly(bucketName, initialBlob);
-        BlobId finalId = blobIdSupplier.get();
-        updateBlobId(bucketName, initialBlob.getMetadata().getName(), finalId.asString());
-        return finalId;
+    public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
+        return putDirectly(bucketName, initialBlob)
+            .then(Mono.fromCallable(blobIdSupplier::get))
+            .map(blobId -> updateBlobId(bucketName, initialBlob.getMetadata().getName(), blobId));
     }
 
-    private void updateBlobId(ObjectStorageBucketName bucketName, String from, String to) {
+    private BlobId updateBlobId(ObjectStorageBucketName bucketName, String from, BlobId to) {
         String bucketNameAsString = bucketName.asString();
-        blobStore.copyBlob(bucketNameAsString, from, bucketNameAsString, to, CopyOptions.NONE);
+        blobStore.copyBlob(bucketNameAsString, from, bucketNameAsString, to.asString(), CopyOptions.NONE);
         blobStore.removeBlob(bucketNameAsString, from);
+        return to;
     }
 
     private boolean needToCreateBucket(Throwable throwable, ObjectStorageBucketName bucketName) {
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index db7cd67..e02577a 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -20,14 +20,13 @@
 package org.apache.james.blob.objectstorage.aws;
 
 import java.io.File;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import javax.annotation.PreDestroy;
@@ -146,33 +145,30 @@ public class AwsS3ObjectStorage {
         }
 
         @Override
-        public void putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
-            writeFileAndAct(blob, (file) -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY).block());
+        public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
+            return writeFileAndAct(blob, file -> putWithRetry(bucketName, configuration, blob, file, FIRST_TRY));
         }
 
         @Override
-        public BlobId putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
-            Consumer<File> putChangedBlob = (file) -> {
+        public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
+            Function<File, Mono<Void>> putChangedBlob = file -> {
                 initialBlob.getMetadata().setName(blobIdSupplier.get().asString());
-                putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY).block();
+                return putWithRetry(bucketName, configuration, initialBlob, file, FIRST_TRY);
             };
-            writeFileAndAct(initialBlob, putChangedBlob);
-            return blobIdSupplier.get();
+            return writeFileAndAct(initialBlob, putChangedBlob)
+                .then(Mono.fromCallable(blobIdSupplier::get));
         }
 
-        private void writeFileAndAct(Blob blob, Consumer<File> putFile) {
-            File file = null;
-            try {
-                file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
-                FileUtils.copyToFile(blob.getPayload().openStream(), file);
-                putFile.accept(file);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            } finally {
-                if (file != null) {
-                    FileUtils.deleteQuietly(file);
-                }
-            }
+        private Mono<Void> writeFileAndAct(Blob blob, Function<File, Mono<Void>> putFile) {
+            return Mono.using(
+                () -> {
+                    File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
+                    FileUtils.copyToFile(blob.getPayload().openStream(), file);
+                    return file;
+                },
+                putFile::apply,
+                FileUtils::deleteQuietly
+            );
         }
 
         private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org