You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/18 11:30:06 UTC

[camel] 01/05: CAMEL-14663 - Camel-AWS2 S3: Add support for multipart upload

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2846fc08880c1316acc20e8b3df9f2e1019bd54d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Mar 18 12:13:41 2020 +0100

    CAMEL-14663 - Camel-AWS2 S3: Add support for multipart upload
---
 .../camel/component/aws2/s3/AWS2S3Producer.java    | 114 ++++++++++++++++++++-
 .../src/test/resources/log4j2.properties           |   2 +-
 2 files changed, 114 insertions(+), 2 deletions(-)

diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
index a0d1fa2..71d2640 100644
--- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
+++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
@@ -22,7 +22,9 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Endpoint;
@@ -36,14 +38,22 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.core.sync.ResponseTransformer;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.BucketCannedACL;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
 import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
 import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@@ -55,6 +65,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
 import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 
 /**
  * A Producer which sends messages to the Amazon Web Service Simple Storage
@@ -74,7 +85,11 @@ public class AWS2S3Producer extends DefaultProducer {
     public void process(final Exchange exchange) throws Exception {
         AWS2S3Operations operation = determineOperation(exchange);
         if (ObjectHelper.isEmpty(operation)) {
-            processSingleOp(exchange);
+            if (getConfiguration().isMultiPartUpload()) {
+                processMultiPart(exchange);
+            } else {
+                processSingleOp(exchange);
+            }
         } else {
             switch (operation) {
                 case copyObject:
@@ -103,6 +118,103 @@ public class AWS2S3Producer extends DefaultProducer {
             }
         }
     }
+    
+    public void processMultiPart(final Exchange exchange) throws Exception {
+        File filePayload = null;
+        Object obj = exchange.getIn().getMandatoryBody();
+        // Need to check if the message body is WrappedFile
+        if (obj instanceof WrappedFile) {
+            obj = ((WrappedFile<?>)obj).getFile();
+        }
+        if (obj instanceof File) {
+            filePayload = (File)obj;
+        } else {
+            throw new IllegalArgumentException("aws-s3: MultiPart upload requires a File input.");
+        }
+
+        Map<String, String> objectMetadata = determineMetadata(exchange);
+        if (objectMetadata.containsKey("Content-Length")) {
+        if (objectMetadata.get("Content-Length").equalsIgnoreCase("0")) {
+            objectMetadata.put("Content-Length", String.valueOf(filePayload.length()));
+        }
+        } else {
+            objectMetadata.put("Content-Length", String.valueOf(filePayload.length()));
+        }
+
+        final String keyName = determineKey(exchange);
+        CreateMultipartUploadRequest.Builder createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
+            .bucket(getConfiguration().getBucketName()).key(keyName);
+
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            createMultipartUploadRequest.storageClass(storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl);
+            createMultipartUploadRequest.acl(objectAcl);
+        }
+
+        BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class);
+        if (acl != null) {
+            // note: if cannedacl and acl are both specified the last one will
+            // be used. refer to
+            // PutObjectRequest#setAccessControlList for more details
+            createMultipartUploadRequest.acl(acl.toString());
+        }
+
+        if (getConfiguration().isUseAwsKMS()) {
+            createMultipartUploadRequest.ssekmsKeyId(getConfiguration().getAwsKMSKeyId());
+        }
+
+        LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange);
+
+        CreateMultipartUploadResponse initResponse = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
+        final long contentLength = Long.valueOf(objectMetadata.get("Content-Length"));
+        final List<String> partETags = new ArrayList<>();
+        List<CompletedPart> completedParts = new ArrayList<CompletedPart>();
+        long partSize = getConfiguration().getPartSize();
+        CompleteMultipartUploadResponse uploadResult = null;
+
+        long filePosition = 0;
+
+        try {
+            for (int part = 1; filePosition < contentLength; part++) {
+                System.err.println("PART! " + part);
+                partSize = Math.min(partSize, contentLength - filePosition);
+
+                UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName()).key(keyName)
+                        .uploadId(initResponse.uploadId()).partNumber(part).build();
+
+                LOG.trace("Uploading part [{}] for {}", part, keyName);
+                String etag = getEndpoint().getS3Client().uploadPart(uploadRequest, RequestBody.fromFile(filePayload)).eTag();
+                partETags.add(etag);
+                CompletedPart partUpload = CompletedPart.builder().partNumber(part).eTag(etag).build();
+                completedParts.add(partUpload);
+                filePosition += partSize;
+                System.err.println(filePosition);
+            }
+            CompletedMultipartUpload completeMultipartUpload = CompletedMultipartUpload.builder().parts(completedParts).build();
+            CompleteMultipartUploadRequest compRequest = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload).bucket(getConfiguration().getBucketName()).key(keyName).uploadId(initResponse.uploadId()).build();
+
+            uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+
+        } catch (Exception e) {
+            getEndpoint().getS3Client().abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(keyName).uploadId(initResponse.uploadId()).build());
+            throw e;
+        }
+
+        Message message = getMessageForResponse(exchange);
+        message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
+        if (uploadResult.versionId() != null) {
+            message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
+        }
+
+        if (getConfiguration().isDeleteAfterWrite()) {
+            FileUtil.deleteFile(filePayload);
+        }
+    }
 
     public void processSingleOp(final Exchange exchange) throws Exception {
 
diff --git a/components/camel-aws2-s3/src/test/resources/log4j2.properties b/components/camel-aws2-s3/src/test/resources/log4j2.properties
index a287c66..e33abee 100644
--- a/components/camel-aws2-s3/src/test/resources/log4j2.properties
+++ b/components/camel-aws2-s3/src/test/resources/log4j2.properties
@@ -24,5 +24,5 @@ appender.out.type = Console
 appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
-rootLogger.level = INFO
+rootLogger.level = DEBUG
 rootLogger.appenderRef.file.ref = file