You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/18 11:30:06 UTC
[camel] 01/05: CAMEL-14663 - Camel-AWS2 S3: Add support for
multipart upload
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2846fc08880c1316acc20e8b3df9f2e1019bd54d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Mar 18 12:13:41 2020 +0100
CAMEL-14663 - Camel-AWS2 S3: Add support for multipart upload
---
.../camel/component/aws2/s3/AWS2S3Producer.java | 114 ++++++++++++++++++++-
.../src/test/resources/log4j2.properties | 2 +-
2 files changed, 114 insertions(+), 2 deletions(-)
diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
index a0d1fa2..71d2640 100644
--- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
+++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
@@ -22,7 +22,9 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.camel.Endpoint;
@@ -36,14 +38,22 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.BucketCannedACL;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@@ -55,6 +65,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
/**
* A Producer which sends messages to the Amazon Web Service Simple Storage
@@ -74,7 +85,11 @@ public class AWS2S3Producer extends DefaultProducer {
public void process(final Exchange exchange) throws Exception {
AWS2S3Operations operation = determineOperation(exchange);
if (ObjectHelper.isEmpty(operation)) {
- processSingleOp(exchange);
+ if (getConfiguration().isMultiPartUpload()) {
+ processMultiPart(exchange);
+ } else {
+ processSingleOp(exchange);
+ }
} else {
switch (operation) {
case copyObject:
@@ -103,6 +118,103 @@ public class AWS2S3Producer extends DefaultProducer {
}
}
}
+
+ public void processMultiPart(final Exchange exchange) throws Exception {
+ File filePayload = null;
+ Object obj = exchange.getIn().getMandatoryBody();
+ // Need to check if the message body is WrappedFile
+ if (obj instanceof WrappedFile) {
+ obj = ((WrappedFile<?>)obj).getFile();
+ }
+ if (obj instanceof File) {
+ filePayload = (File)obj;
+ } else {
+ throw new IllegalArgumentException("aws-s3: MultiPart upload requires a File input.");
+ }
+
+ Map<String, String> objectMetadata = determineMetadata(exchange);
+ if (objectMetadata.containsKey("Content-Length")) {
+ if (objectMetadata.get("Content-Length").equalsIgnoreCase("0")) {
+ objectMetadata.put("Content-Length", String.valueOf(filePayload.length()));
+ }
+ } else {
+ objectMetadata.put("Content-Length", String.valueOf(filePayload.length()));
+ }
+
+ final String keyName = determineKey(exchange);
+ CreateMultipartUploadRequest.Builder createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
+ .bucket(getConfiguration().getBucketName()).key(keyName);
+
+ String storageClass = determineStorageClass(exchange);
+ if (storageClass != null) {
+ createMultipartUploadRequest.storageClass(storageClass);
+ }
+
+ String cannedAcl = exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class);
+ if (cannedAcl != null) {
+ ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl);
+ createMultipartUploadRequest.acl(objectAcl);
+ }
+
+ BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class);
+ if (acl != null) {
+ // note: if cannedacl and acl are both specified the last one will
+ // be used. refer to
+ // PutObjectRequest#setAccessControlList for more details
+ createMultipartUploadRequest.acl(acl.toString());
+ }
+
+ if (getConfiguration().isUseAwsKMS()) {
+ createMultipartUploadRequest.ssekmsKeyId(getConfiguration().getAwsKMSKeyId());
+ }
+
+ LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange);
+
+ CreateMultipartUploadResponse initResponse = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
+ final long contentLength = Long.valueOf(objectMetadata.get("Content-Length"));
+ final List<String> partETags = new ArrayList<>();
+ List<CompletedPart> completedParts = new ArrayList<CompletedPart>();
+ long partSize = getConfiguration().getPartSize();
+ CompleteMultipartUploadResponse uploadResult = null;
+
+ long filePosition = 0;
+
+ try {
+ for (int part = 1; filePosition < contentLength; part++) {
+ System.err.println("PART! " + part);
+ partSize = Math.min(partSize, contentLength - filePosition);
+
+ UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName()).key(keyName)
+ .uploadId(initResponse.uploadId()).partNumber(part).build();
+
+ LOG.trace("Uploading part [{}] for {}", part, keyName);
+ String etag = getEndpoint().getS3Client().uploadPart(uploadRequest, RequestBody.fromFile(filePayload)).eTag();
+ partETags.add(etag);
+ CompletedPart partUpload = CompletedPart.builder().partNumber(part).eTag(etag).build();
+ completedParts.add(partUpload);
+ filePosition += partSize;
+ System.err.println(filePosition);
+ }
+ CompletedMultipartUpload completeMultipartUpload = CompletedMultipartUpload.builder().parts(completedParts).build();
+ CompleteMultipartUploadRequest compRequest = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload).bucket(getConfiguration().getBucketName()).key(keyName).uploadId(initResponse.uploadId()).build();
+
+ uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+
+ } catch (Exception e) {
+ getEndpoint().getS3Client().abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(keyName).uploadId(initResponse.uploadId()).build());
+ throw e;
+ }
+
+ Message message = getMessageForResponse(exchange);
+ message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
+ if (uploadResult.versionId() != null) {
+ message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
+ }
+
+ if (getConfiguration().isDeleteAfterWrite()) {
+ FileUtil.deleteFile(filePayload);
+ }
+ }
public void processSingleOp(final Exchange exchange) throws Exception {
diff --git a/components/camel-aws2-s3/src/test/resources/log4j2.properties b/components/camel-aws2-s3/src/test/resources/log4j2.properties
index a287c66..e33abee 100644
--- a/components/camel-aws2-s3/src/test/resources/log4j2.properties
+++ b/components/camel-aws2-s3/src/test/resources/log4j2.properties
@@ -24,5 +24,5 @@ appender.out.type = Console
appender.out.name = out
appender.out.layout.type = PatternLayout
appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
-rootLogger.level = INFO
+rootLogger.level = DEBUG
rootLogger.appenderRef.file.ref = file