You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/01/13 04:47:10 UTC
[james-project] 04/05: [Refactoring] Shutdown TransferManager when
uploading is done
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9a35da785ee10ca143a761f29cb1cb88e12f94b5
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Dec 25 11:30:47 2019 +0700
[Refactoring] Shutdown TransferManager when uploading is done
From its documentation
* // After the upload is complete, call shutdownNow to release the resources.
* tx.shutdownNow();
---
.../blob/objectstorage/aws/AwsS3ObjectStorage.java | 47 ++++++++++++----------
1 file changed, 26 insertions(+), 21 deletions(-)
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index fa5384e..f4a35bb 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -28,7 +28,6 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.PreDestroy;
@@ -148,32 +147,36 @@ public class AwsS3ObjectStorage {
@Override
public Mono<Void> putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
- return putWithRetry(bucketName, configuration, () -> uploadByBlob(bucketName, blob));
+ return putWithRetry(bucketName, () -> uploadByBlob(bucketName, blob));
}
@Override
public Mono<BlobId> putAndComputeId(ObjectStorageBucketName bucketName, Blob initialBlob, Supplier<BlobId> blobIdSupplier) {
- Function<File, Mono<Void>> putChangedBlob = file -> {
- BlobId blobId = blobIdSupplier.get();
- return putWithRetry(bucketName, configuration, () -> uploadByFile(bucketName, blobId, file));
- };
- return writeFileAndAct(initialBlob, putChangedBlob)
- .then(Mono.fromCallable(blobIdSupplier::get));
+ return Mono.using(
+ () -> copyToTempFile(initialBlob),
+ file -> putByFile(bucketName, blobIdSupplier, file),
+ this::deleteFileAsync);
}
- private Mono<Void> writeFileAndAct(Blob blob, Function<File, Mono<Void>> putFile) {
- return Mono.using(
- () -> {
- File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
- FileUtils.copyToFile(blob.getPayload().openStream(), file);
- return file;
- },
- putFile::apply,
- FileUtils::deleteQuietly
- );
+ private Mono<BlobId> putByFile(ObjectStorageBucketName bucketName, Supplier<BlobId> blobIdSupplier, File file) {
+ return Mono.fromSupplier(blobIdSupplier)
+ .flatMap(blobId -> putWithRetry(bucketName, () -> uploadByFile(bucketName, blobId, file))
+ .then(Mono.just(blobId)));
+ }
+
+ private File copyToTempFile(Blob blob) throws IOException {
+ File file = File.createTempFile(UUID.randomUUID().toString(), ".tmp");
+ FileUtils.copyToFile(blob.getPayload().openStream(), file);
+ return file;
+ }
+
+ private void deleteFileAsync(File file) {
+ Mono.fromRunnable(() -> FileUtils.deleteQuietly(file))
+ .subscribeOn(Schedulers.elastic())
+ .subscribe();
}
- private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, ThrowingRunnable puttingAttempt) {
+ private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, ThrowingRunnable puttingAttempt) {
return Mono.<Void>fromRunnable(puttingAttempt)
.publishOn(Schedulers.elastic())
.retryWhen(Retry
@@ -201,9 +204,11 @@ public class AwsS3ObjectStorage {
}
private void upload(PutObjectRequest request) throws InterruptedException {
- getTransferManager(configuration)
+ TransferManager transferManager = getTransferManager();
+ transferManager
.upload(request)
.waitForUploadResult();
+ transferManager.shutdownNow();
}
private void createBucket(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration) {
@@ -221,7 +226,7 @@ public class AwsS3ObjectStorage {
return false;
}
- private TransferManager getTransferManager(AwsS3AuthConfiguration configuration) {
+ private TransferManager getTransferManager() {
ClientConfiguration clientConfiguration = getClientConfiguration();
AmazonS3 amazonS3 = getS3Client(configuration, clientConfiguration);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org