You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/01/13 04:47:10 UTC

[james-project] 04/05: [Refactoring] Shutdown TransferManager when uploading is done

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9a35da785ee10ca143a761f29cb1cb88e12f94b5
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Dec 25 11:30:47 2019 +0700

    [Refactoring] Shutdown TransferManager when uploading is done
    
    From its documentation
    
     * // After the upload is complete, call shutdownNow to release the resources.
     * tx.shutdownNow();
---
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 47 ++++++++++++----------
 1 file changed, 26 insertions(+), 21 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index fa5384e..f4a35bb 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -28,7 +28,6 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 import javax.annotation.PreDestroy;
@@ -148,32 +147,36 @@ public class AwsS3ObjectStorage {
 
         @Override
         public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
-            return putWithRetry(bucketName, configuration, () -> uploadByBlob(bucketName, blob));
+            return putWithRetry(bucketName, () -> uploadByBlob(bucketName, blob));
         }
 
         @Override
         public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
-            Function<File, Mono<Void>> putChangedBlob = file -> {
-                BlobId blobId = blobIdSupplier.get();
-                return putWithRetry(bucketName, configuration, () -> uploadByFile(bucketName, blobId, file));
-            };
-            return writeFileAndAct(initialBlob, putChangedBlob)
-                .then(Mono.fromCallable(blobIdSupplier::get));
+            return Mono.using(
+                () -> copyToTempFile(initialBlob),
+                file -> putByFile(bucketName, blobIdSupplier, file),
+                this::deleteFileAsync);
         }
 
-        private Mono<Void> writeFileAndAct(Blob blob, Function<File, Mono<Void>> putFile) {
-            return Mono.using(
-                () -> {
-                    File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
-                    FileUtils.copyToFile(blob.getPayload().openStream(), file);
-                    return file;
-                },
-                putFile::apply,
-                FileUtils::deleteQuietly
-            );
+        private Mono<BlobId> putByFile(ObjectStorageBucketName bucketName, Supplier<BlobId> blobIdSupplier, File file) {
+            return Mono.fromSupplier(blobIdSupplier)
+                .flatMap(blobId -> putWithRetry(bucketName, () -> uploadByFile(bucketName, blobId, file))
+                    .then(Mono.just(blobId)));
+        }
+
+        private File copyToTempFile(Blob blob) throws IOException {
+            File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
+            FileUtils.copyToFile(blob.getPayload().openStream(), file);
+            return file;
+        }
+
+        private void deleteFileAsync(File file) {
+            Mono.fromRunnable(() -> FileUtils.deleteQuietly(file))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
         }
 
-        private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, ThrowingRunnable puttingAttempt) {
+        private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, ThrowingRunnable puttingAttempt) {
             return Mono.<Void>fromRunnable(puttingAttempt)
                 .publishOn(Schedulers.elastic())
                 .retryWhen(Retry
@@ -201,9 +204,11 @@ public class AwsS3ObjectStorage {
         }
 
         private void upload(PutObjectRequest request) throws InterruptedException {
-            getTransferManager(configuration)
+            TransferManager transferManager = getTransferManager();
+            transferManager
                 .upload(request)
                 .waitForUploadResult();
+            transferManager.shutdownNow();
         }
 
         private void createBucket(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration) {
@@ -221,7 +226,7 @@ public class AwsS3ObjectStorage {
             return false;
         }
 
-        private TransferManager getTransferManager(AwsS3AuthConfiguration configuration) {
+        private TransferManager getTransferManager() {
             ClientConfiguration clientConfiguration = getClientConfiguration();
             AmazonS3 amazonS3 = getS3Client(configuration, clientConfiguration);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org