You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/01/13 04:47:06 UTC

[james-project] branch master updated (4f1fbeb -> f2d7aba)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 4f1fbeb  JAMES-3020 provide guice binding for AdditionalInformationUpdated event
     new c17eb02  JAMES-2721 DockerCassandraExtension is supposed to track number of tests run and allow container restart
     new a977b57  [Refactoring] AWS BlobPutter Optimizing constants usages
     new 3dd749a  [Refactoring] AWS putDirectly can upload InputStream directly
     new 9a35da7  [Refactoring] Shutdown TransferManager when uploading is done
     new f2d7aba  [Refactoring] AWSS3BlobPutter reuse S3Client and TransferManager

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/DockerCassandraExtension.java        |   8 +-
 .../james/blob/objectstorage/BlobPutter.java       |   3 +-
 .../blob/objectstorage/ObjectStorageBlobStore.java |   3 +-
 .../objectstorage/StreamCompatibleBlobPutter.java  |   6 ++
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 109 ++++++++++++---------
 .../ObjectStorageBlobStoreAWSCryptoTest.java       |   4 +-
 .../ObjectStorageBlobStoreAWSNamespaceTest.java    |   4 +-
 ...tStorageBlobStoreAWSPrefixAndNamespaceTest.java |   4 +-
 .../ObjectStorageBlobStoreAWSPrefixTest.java       |   4 +-
 .../ObjectStorageBlobStoreAWSTest.java             |   4 +-
 .../objectstorage/ObjectStorageBlobStoreTest.java  |   2 +-
 11 files changed, 95 insertions(+), 56 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/05: JAMES-2721 DockerCassandraExtension is supposed to track number of tests run and allow container restart

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c17eb02a0a5db5364ceb2abe6127e261f467b0c0
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Jan 6 16:03:05 2020 +0100

    JAMES-2721 DockerCassandraExtension is supposed to track number of tests run and allow container restart
---
 .../apache/james/backends/cassandra/DockerCassandraExtension.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraExtension.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraExtension.java
index 8045c3a..fdf585c 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraExtension.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraExtension.java
@@ -22,12 +22,13 @@ package org.apache.james.backends.cassandra;
 import org.apache.james.util.Host;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.ParameterContext;
 import org.junit.jupiter.api.extension.ParameterResolutionException;
 import org.junit.jupiter.api.extension.ParameterResolver;
 
-public class DockerCassandraExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver {
+public class DockerCassandraExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, ParameterResolver {
 
     private final DockerCassandraRule cassandraContainer;
     private DockerCassandra dockerCassandra;
@@ -48,6 +49,11 @@ public class DockerCassandraExtension implements BeforeAllCallback, AfterAllCall
     }
 
     @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+      cassandraContainer.before();
+    }
+
+    @Override
     public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
         return (parameterContext.getParameter().getType() == DockerCassandra.class);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/05: [Refactoring] AWS BlobPutter Optimizing constants usages

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a977b57bf2644b0d0cde1d801d85600ec4ccbe67
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Dec 25 10:57:24 2019 +0700

    [Refactoring] AWS BlobPutter Optimizing constants usages
---
 .../org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java  | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index f1e8f77..cced72b 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -68,12 +68,11 @@ import reactor.retry.Retry;
 public class AwsS3ObjectStorage {
 
     private static final Iterable<Module> JCLOUDS_MODULES = ImmutableSet.of(new SLF4JLoggingModule());
-    public  static final int MAX_THREADS = 5;
+    private static final int MAX_THREADS = 5;
     private static final boolean DO_NOT_SHUTDOWN_THREAD_POOL = false;
     private static final int MAX_ERROR_RETRY = 5;
-    private static final int FIRST_TRY = 0;
     private static final int MAX_RETRY_ON_EXCEPTION = 3;
-    public static Size MULTIPART_UPLOAD_THRESHOLD;
+    private static Size MULTIPART_UPLOAD_THRESHOLD;
 
     static {
         try {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/05: [Refactoring] AWS putDirectly can upload InputStream directly

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3dd749a8ac07955c78f960005d2e0d02e9fab993
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Dec 25 11:12:59 2019 +0700

    [Refactoring] AWS putDirectly can upload InputStream directly
    
    To avoid an intermediate files transforming step. While keeping the same
    behavior of putAndCompute
---
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 35 +++++++++++++++-------
 1 file changed, 25 insertions(+), 10 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index cced72b..fa5384e 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -20,6 +20,8 @@
 package org.apache.james.blob.objectstorage.aws;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Properties;
@@ -53,10 +55,11 @@ import com.amazonaws.retry.PredefinedRetryPolicies;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
-import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.runnable.ThrowingRunnable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Module;
@@ -145,14 +148,14 @@ public class AwsS3ObjectStorage {
 
         @Override
         public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
-            return writeFileAndAct(blob, file -> putWithRetry(bucketName, configuration, blob, file));
+            return putWithRetry(bucketName, configuration, () -> uploadByBlob(bucketName, blob));
         }
 
         @Override
         public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
             Function<File, Mono<Void>> putChangedBlob = file -> {
-                initialBlob.getMetadata().setName(blobIdSupplier.get().asString());
-                return putWithRetry(bucketName, configuration, initialBlob, file);
+                BlobId blobId = blobIdSupplier.get();
+                return putWithRetry(bucketName, configuration, () -> uploadByFile(bucketName, blobId, file));
             };
             return writeFileAndAct(initialBlob, putChangedBlob)
                 .then(Mono.fromCallable(blobIdSupplier::get));
@@ -170,8 +173,8 @@ public class AwsS3ObjectStorage {
             );
         }
 
-        private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) {
-            return Mono.<Void>fromRunnable(Throwing.runnable(() -> put(bucketName, configuration, blob, file)).sneakyThrow())
+        private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, ThrowingRunnable puttingAttempt) {
+            return Mono.<Void>fromRunnable(puttingAttempt)
                 .publishOn(Schedulers.elastic())
                 .retryWhen(Retry
                     .<Void>onlyIf(retryContext -> needToCreateBucket(retryContext.exception()))
@@ -181,11 +184,23 @@ public class AwsS3ObjectStorage {
                     .doOnRetry(retryContext -> createBucket(bucketName, configuration)));
         }
 
-        private void put(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file) throws InterruptedException {
-            PutObjectRequest request = new PutObjectRequest(bucketName.asString(),
-                blob.getMetadata().getName(),
-                file);
+        private void uploadByFile(ObjectStorageBucketName bucketName, BlobId blobId, File file) throws InterruptedException {
+            PutObjectRequest request = new PutObjectRequest(bucketName.asString(), blobId.asString(), file);
+            upload(request);
+        }
+
+        private void uploadByBlob(ObjectStorageBucketName bucketName, Blob blob) throws InterruptedException, IOException {
+            try (InputStream payload = blob.getPayload().openStream()) {
+                PutObjectRequest request = new PutObjectRequest(bucketName.asString(),
+                    blob.getMetadata().getName(),
+                    payload,
+                    new ObjectMetadata());
+
+                upload(request);
+            }
+        }
 
+        private void upload(PutObjectRequest request) throws InterruptedException {
             getTransferManager(configuration)
                 .upload(request)
                 .waitForUploadResult();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/05: [Refactoring] AWSS3BlobPutter reuse S3Client and TransferManager

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f2d7abae2fdbcdaf4c192b1e3d6d4e283417cefd
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Mon Jan 6 18:09:12 2020 +0700

    [Refactoring] AWSS3BlobPutter reuse S3Client and TransferManager
    
    As they are documented as ThreadSafe.
    
    And reusing the TransferManager is encouraged - see the Java doc
---
 .../james/blob/objectstorage/BlobPutter.java       |  3 +-
 .../blob/objectstorage/ObjectStorageBlobStore.java |  3 +-
 .../objectstorage/StreamCompatibleBlobPutter.java  |  6 ++++
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 32 ++++++++++------------
 .../ObjectStorageBlobStoreAWSCryptoTest.java       |  4 ++-
 .../ObjectStorageBlobStoreAWSNamespaceTest.java    |  4 ++-
 ...tStorageBlobStoreAWSPrefixAndNamespaceTest.java |  4 ++-
 .../ObjectStorageBlobStoreAWSPrefixTest.java       |  4 ++-
 .../ObjectStorageBlobStoreAWSTest.java             |  4 ++-
 .../objectstorage/ObjectStorageBlobStoreTest.java  |  2 +-
 10 files changed, 40 insertions(+), 26 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
index 8fe1a2b..e521db5 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/BlobPutter.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.Closeable;
 import java.util.function.Supplier;
 
 import org.apache.james.blob.api.BlobId;
@@ -36,7 +37,7 @@ import reactor.core.publisher.Mono;
  *
  */
 
-public interface BlobPutter {
+public interface BlobPutter extends Closeable {
 
     Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob);
 
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
index fbe82ed..fba4fa1 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
@@ -91,8 +91,9 @@ public class ObjectStorageBlobStore implements BlobStore {
     }
 
     @PreDestroy
-    public void close() {
+    public void close() throws IOException {
         blobStore.getContext().close();
+        blobPutter.close();
     }
 
     @Override
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
index 269a8a4..46d1d2e 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.function.Supplier;
@@ -112,4 +113,9 @@ public class StreamCompatibleBlobPutter implements BlobPutter {
 
         return Optional.empty();
     }
+
+    @Override
+    public void close() throws IOException {
+        // No resource to clean up
+    }
 }
\ No newline at end of file
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index f4a35bb..fa58add 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -137,12 +137,12 @@ public class AwsS3ObjectStorage {
         private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
         private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
 
-        private final AwsS3AuthConfiguration configuration;
-        private final ExecutorService executorService;
+        private final AmazonS3 s3Client;
+        private final TransferManager transferManager;
 
-        AwsS3BlobPutter(AwsS3AuthConfiguration configuration, ExecutorService executorService) {
-            this.configuration = configuration;
-            this.executorService = executorService;
+        AwsS3BlobPutter(AwsS3AuthConfiguration authConfiguration, ExecutorService executorService) {
+            this.s3Client = getS3Client(authConfiguration, getClientConfiguration());
+            this.transferManager = getTransferManager(s3Client, executorService);
         }
 
         @Override
@@ -184,7 +184,7 @@ public class AwsS3ObjectStorage {
                     .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
                     .withBackoffScheduler(Schedulers.elastic())
                     .retryMax(MAX_RETRY_ON_EXCEPTION)
-                    .doOnRetry(retryContext -> createBucket(bucketName, configuration)));
+                    .doOnRetry(retryContext -> s3Client.createBucket(bucketName.asString())));
         }
 
         private void uploadByFile(ObjectStorageBucketName bucketName, BlobId blobId, File file) throws InterruptedException {
@@ -204,16 +204,9 @@ public class AwsS3ObjectStorage {
         }
 
         private void upload(PutObjectRequest request) throws InterruptedException {
-            TransferManager transferManager = getTransferManager();
             transferManager
                 .upload(request)
                 .waitForUploadResult();
-            transferManager.shutdownNow();
-        }
-
-        private void createBucket(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration) {
-            getS3Client(configuration, getClientConfiguration())
-                .createBucket(bucketName.asString());
         }
 
         private boolean needToCreateBucket(Throwable th) {
@@ -226,13 +219,10 @@ public class AwsS3ObjectStorage {
             return false;
         }
 
-        private TransferManager getTransferManager() {
-            ClientConfiguration clientConfiguration = getClientConfiguration();
-            AmazonS3 amazonS3 = getS3Client(configuration, clientConfiguration);
-
+        private static TransferManager getTransferManager(AmazonS3 s3Client, ExecutorService executorService) {
             return TransferManagerBuilder
                     .standard()
-                    .withS3Client(amazonS3)
+                    .withS3Client(s3Client)
                     .withMultipartUploadThreshold(MULTIPART_UPLOAD_THRESHOLD.getValue())
                     .withExecutorFactory(() -> executorService)
                     .withShutDownThreadPools(DO_NOT_SHUTDOWN_THREAD_POOL)
@@ -253,5 +243,11 @@ public class AwsS3ObjectStorage {
             clientConfiguration.setRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(MAX_ERROR_RETRY));
             return clientConfiguration;
         }
+
+        @Override
+        public void close() throws IOException {
+            transferManager.shutdownNow();
+            s3Client.shutdown();
+        }
     }
 }
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSCryptoTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSCryptoTest.java
index d120c24..aba7d13 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSCryptoTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSCryptoTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.IOException;
+
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.HashBlobId;
@@ -69,7 +71,7 @@ public class ObjectStorageBlobStoreAWSCryptoTest implements MetricableBlobStoreC
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws IOException {
         objectStorageBlobStore.deleteAllBuckets().block();
         objectStorageBlobStore.close();
         awsS3ObjectStorage.tearDown();
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSNamespaceTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSNamespaceTest.java
index 8fcc3a2..fdf697b 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSNamespaceTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSNamespaceTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.IOException;
+
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
@@ -62,7 +64,7 @@ public class ObjectStorageBlobStoreAWSNamespaceTest implements MetricableBlobSto
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws IOException {
         objectStorageBlobStore.deleteAllBuckets().block();
         objectStorageBlobStore.close();
         awsS3ObjectStorage.tearDown();
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixAndNamespaceTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixAndNamespaceTest.java
index 55b6af6..aea96a0 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixAndNamespaceTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixAndNamespaceTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.IOException;
+
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
@@ -63,7 +65,7 @@ public class ObjectStorageBlobStoreAWSPrefixAndNamespaceTest implements Metricab
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws IOException {
         objectStorageBlobStore.deleteAllBuckets().block();
         objectStorageBlobStore.close();
         awsS3ObjectStorage.tearDown();
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixTest.java
index 8f3ce20..f82ed8f 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSPrefixTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.IOException;
+
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.HashBlobId;
@@ -61,7 +63,7 @@ public class ObjectStorageBlobStoreAWSPrefixTest implements MetricableBlobStoreC
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws IOException {
         objectStorageBlobStore.deleteAllBuckets().block();
         objectStorageBlobStore.close();
         awsS3ObjectStorage.tearDown();
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSTest.java
index d076dae..6af3fd3 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreAWSTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.IOException;
+
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.HashBlobId;
@@ -61,7 +63,7 @@ public class ObjectStorageBlobStoreAWSTest implements MetricableBlobStoreContrac
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws IOException {
         objectStorageBlobStore.deleteAllBuckets().block();
         objectStorageBlobStore.close();
         awsS3ObjectStorage.tearDown();
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
index 5b05311..e342525 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
@@ -95,7 +95,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
     }
 
     @AfterEach
-    void tearDown() {
+    void tearDown() throws IOException {
         objectStorageBlobStore.deleteAllBuckets().block();
         objectStorageBlobStore.close();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/05: [Refactoring] Shutdown TransferManager when uploading is done

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9a35da785ee10ca143a761f29cb1cb88e12f94b5
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Dec 25 11:30:47 2019 +0700

    [Refactoring] Shutdown TransferManager when uploading is done
    
    From its documentation
    
     * // After the upload is complete, call shutdownNow to release the resources.
     * tx.shutdownNow();
---
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java | 47 ++++++++++++----------
 1 file changed, 26 insertions(+), 21 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index fa5384e..f4a35bb 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -28,7 +28,6 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 import javax.annotation.PreDestroy;
@@ -148,32 +147,36 @@ public class AwsS3ObjectStorage {
 
         @Override
         public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
-            return putWithRetry(bucketName, configuration, () -> uploadByBlob(bucketName, blob));
+            return putWithRetry(bucketName, () -> uploadByBlob(bucketName, blob));
         }
 
         @Override
         public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
-            Function<File, Mono<Void>> putChangedBlob = file -> {
-                BlobId blobId = blobIdSupplier.get();
-                return putWithRetry(bucketName, configuration, () -> uploadByFile(bucketName, blobId, file));
-            };
-            return writeFileAndAct(initialBlob, putChangedBlob)
-                .then(Mono.fromCallable(blobIdSupplier::get));
+            return Mono.using(
+                () -> copyToTempFile(initialBlob),
+                file -> putByFile(bucketName, blobIdSupplier, file),
+                this::deleteFileAsync);
         }
 
-        private Mono<Void> writeFileAndAct(Blob blob, Function<File, Mono<Void>> putFile) {
-            return Mono.using(
-                () -> {
-                    File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
-                    FileUtils.copyToFile(blob.getPayload().openStream(), file);
-                    return file;
-                },
-                putFile::apply,
-                FileUtils::deleteQuietly
-            );
+        private Mono<BlobId> putByFile(ObjectStorageBucketName bucketName, Supplier<BlobId> blobIdSupplier, File file) {
+            return Mono.fromSupplier(blobIdSupplier)
+                .flatMap(blobId -> putWithRetry(bucketName, () -> uploadByFile(bucketName, blobId, file))
+                    .then(Mono.just(blobId)));
+        }
+
+        private File copyToTempFile(Blob blob) throws IOException {
+            File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
+            FileUtils.copyToFile(blob.getPayload().openStream(), file);
+            return file;
+        }
+
+        private void deleteFileAsync(File file) {
+            Mono.fromRunnable(() -> FileUtils.deleteQuietly(file))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
         }
 
-        private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, ThrowingRunnable puttingAttempt) {
+        private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, ThrowingRunnable puttingAttempt) {
             return Mono.<Void>fromRunnable(puttingAttempt)
                 .publishOn(Schedulers.elastic())
                 .retryWhen(Retry
@@ -201,9 +204,11 @@ public class AwsS3ObjectStorage {
         }
 
         private void upload(PutObjectRequest request) throws InterruptedException {
-            getTransferManager(configuration)
+            TransferManager transferManager = getTransferManager();
+            transferManager
                 .upload(request)
                 .waitForUploadResult();
+            transferManager.shutdownNow();
         }
 
         private void createBucket(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration) {
@@ -221,7 +226,7 @@ public class AwsS3ObjectStorage {
             return false;
         }
 
-        private TransferManager getTransferManager(AwsS3AuthConfiguration configuration) {
+        private TransferManager getTransferManager() {
             ClientConfiguration clientConfiguration = getClientConfiguration();
             AmazonS3 amazonS3 = getS3Client(configuration, clientConfiguration);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org