You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/04/01 12:39:50 UTC
[camel] 01/05: CAMEL-16185 - AWS S3: improve multipart support -
streaming upload
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a47bbc3ec2b3766be516cffd801313492db3655f
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Apr 1 07:10:30 2021 +0200
CAMEL-16185 - AWS S3: improve multipart support - streaming upload
---
.../camel/catalog/docs/aws2-s3-component.adoc | 6 +-
.../aws2/s3/AWS2S3ComponentConfigurer.java | 6 +
.../aws2/s3/AWS2S3EndpointConfigurer.java | 6 +
.../aws2/s3/AWS2S3EndpointUriFactory.java | 3 +-
.../apache/camel/component/aws2/s3/aws2-s3.json | 2 +
.../src/main/docs/aws2-s3-component.adoc | 6 +-
.../component/aws2/s3/AWS2S3Configuration.java | 13 +
.../camel/component/aws2/s3/AWS2S3Endpoint.java | 6 +-
.../aws2/s3/AWS2S3StreamUploadProducer.java | 289 +++++++++++++++++++++
.../S3StreamUploadOperationIntegrationTest.java | 76 ++++++
.../dsl/Aws2S3ComponentBuilderFactory.java | 16 ++
.../endpoint/dsl/AWS2S3EndpointBuilderFactory.java | 93 +++++++
.../modules/ROOT/pages/aws2-s3-component.adoc | 6 +-
13 files changed, 520 insertions(+), 8 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc
index 78bcbae..e497bde 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc
@@ -47,7 +47,7 @@ from("aws2-s3://helloBucket?accessKey=yourAccessKey&secretKey=yourSecretKey&pref
// component options: START
-The AWS 2 S3 Storage Service component supports 43 options, which are listed below.
+The AWS 2 S3 Storage Service component supports 44 options, which are listed below.
@@ -65,6 +65,7 @@ The AWS 2 S3 Storage Service component supports 43 options, which are listed bel
| *proxyPort* (common) | Specify a proxy port to be used inside the client definition. | | Integer
| *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
| *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() | | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
| *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
| *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option | | String
| *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
@@ -129,7 +130,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (61 parameters):
+=== Query Parameters (62 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -145,6 +146,7 @@ with the following path and query parameters:
| *proxyPort* (common) | Specify a proxy port to be used inside the client definition. | | Integer
| *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
| *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() | | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
| *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
| *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option | | String
| *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
index 77bb7d0..6f0e69b 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
+++ b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
@@ -98,6 +98,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme
case "secretKey": getOrCreateConfiguration(target).setSecretKey(property(camelContext, java.lang.String.class, value)); return true;
case "storageclass":
case "storageClass": getOrCreateConfiguration(target).setStorageClass(property(camelContext, java.lang.String.class, value)); return true;
+ case "streammode":
+ case "streamMode": getOrCreateConfiguration(target).setStreamMode(property(camelContext, boolean.class, value)); return true;
case "trustallcertificates":
case "trustAllCertificates": getOrCreateConfiguration(target).setTrustAllCertificates(property(camelContext, boolean.class, value)); return true;
case "uriendpointoverride":
@@ -190,6 +192,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme
case "secretKey": return java.lang.String.class;
case "storageclass":
case "storageClass": return java.lang.String.class;
+ case "streammode":
+ case "streamMode": return boolean.class;
case "trustallcertificates":
case "trustAllCertificates": return boolean.class;
case "uriendpointoverride":
@@ -278,6 +282,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme
case "secretKey": return getOrCreateConfiguration(target).getSecretKey();
case "storageclass":
case "storageClass": return getOrCreateConfiguration(target).getStorageClass();
+ case "streammode":
+ case "streamMode": return getOrCreateConfiguration(target).isStreamMode();
case "trustallcertificates":
case "trustAllCertificates": return getOrCreateConfiguration(target).isTrustAllCertificates();
case "uriendpointoverride":
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
index 30d87c0..255f2bb 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
+++ b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
@@ -121,6 +121,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen
case "startScheduler": target.setStartScheduler(property(camelContext, boolean.class, value)); return true;
case "storageclass":
case "storageClass": target.getConfiguration().setStorageClass(property(camelContext, java.lang.String.class, value)); return true;
+ case "streammode":
+ case "streamMode": target.getConfiguration().setStreamMode(property(camelContext, boolean.class, value)); return true;
case "timeunit":
case "timeUnit": target.setTimeUnit(property(camelContext, java.util.concurrent.TimeUnit.class, value)); return true;
case "trustallcertificates":
@@ -247,6 +249,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen
case "startScheduler": return boolean.class;
case "storageclass":
case "storageClass": return java.lang.String.class;
+ case "streammode":
+ case "streamMode": return boolean.class;
case "timeunit":
case "timeUnit": return java.util.concurrent.TimeUnit.class;
case "trustallcertificates":
@@ -369,6 +373,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen
case "startScheduler": return target.isStartScheduler();
case "storageclass":
case "storageClass": return target.getConfiguration().getStorageClass();
+ case "streammode":
+ case "streamMode": return target.getConfiguration().isStreamMode();
case "timeunit":
case "timeUnit": return target.getTimeUnit();
case "trustallcertificates":
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java
index f88438a..63752dd 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java
+++ b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java
@@ -20,10 +20,11 @@ public class AWS2S3EndpointUriFactory extends org.apache.camel.support.component
private static final Set<String> PROPERTY_NAMES;
private static final Set<String> SECRET_PROPERTY_NAMES;
static {
- Set<String> props = new HashSet<>(62);
+ Set<String> props = new HashSet<>(63);
props.add("customerAlgorithm");
props.add("fileName");
props.add("useCustomerKey");
+ props.add("streamMode");
props.add("bucketNameOrArn");
props.add("customerKeyId");
props.add("prefix");
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json b/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
index d2cbe25..4aab19d 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
+++ b/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
@@ -33,6 +33,7 @@
"proxyPort": { "kind": "property", "displayName": "Proxy Port", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Specify a proxy port to be used inside the client definition." },
"proxyProtocol": { "kind": "property", "displayName": "Proxy Protocol", "group": "common", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "To define a proxy protocol when instantiating th [...]
"region": { "kind": "property", "displayName": "Region", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (f [...]
+ "streamMode": { "kind": "property", "displayName": "Stream Mode", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If StreaMode is true a different way of uploading will be provided" },
"trustAllCertificates": { "kind": "property", "displayName": "Trust All Certificates", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If we want to trust all certificates in case of overriding the endpoint" },
"uriEndpointOverride": { "kind": "property", "displayName": "Uri Endpoint Override", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option" },
"useDefaultCredentialsProvider": { "kind": "property", "displayName": "Use Default Credentials Provider", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set whether the S3 client should expect to load credentials through a def [...]
@@ -78,6 +79,7 @@
"proxyPort": { "kind": "parameter", "displayName": "Proxy Port", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Specify a proxy port to be used inside the client definition." },
"proxyProtocol": { "kind": "parameter", "displayName": "Proxy Protocol", "group": "common", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "To define a proxy protocol when instantiating t [...]
"region": { "kind": "parameter", "displayName": "Region", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region ( [...]
+ "streamMode": { "kind": "parameter", "displayName": "Stream Mode", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If StreaMode is true a different way of uploading will be provided" },
"trustAllCertificates": { "kind": "parameter", "displayName": "Trust All Certificates", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If we want to trust all certificates in case of overriding the endpoint" },
"uriEndpointOverride": { "kind": "parameter", "displayName": "Uri Endpoint Override", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option" },
"useDefaultCredentialsProvider": { "kind": "parameter", "displayName": "Use Default Credentials Provider", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set whether the S3 client should expect to load credentials through a de [...]
diff --git a/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc b/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc
index 78bcbae..e497bde 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc
+++ b/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc
@@ -47,7 +47,7 @@ from("aws2-s3://helloBucket?accessKey=yourAccessKey&secretKey=yourSecretKey&pref
// component options: START
-The AWS 2 S3 Storage Service component supports 43 options, which are listed below.
+The AWS 2 S3 Storage Service component supports 44 options, which are listed below.
@@ -65,6 +65,7 @@ The AWS 2 S3 Storage Service component supports 43 options, which are listed bel
| *proxyPort* (common) | Specify a proxy port to be used inside the client definition. | | Integer
| *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
| *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() | | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
| *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
| *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option | | String
| *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
@@ -129,7 +130,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (61 parameters):
+=== Query Parameters (62 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -145,6 +146,7 @@ with the following path and query parameters:
| *proxyPort* (common) | Specify a proxy port to be used inside the client definition. | | Integer
| *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
| *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() | | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
| *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
| *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option | | String
| *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
index 439aa56..aec9bf7 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
@@ -110,6 +110,8 @@ public class AWS2S3Configuration implements Cloneable {
private String uriEndpointOverride;
@UriParam(defaultValue = "false")
private boolean pojoRequest;
+ @UriParam(defaultValue = "false")
+ private boolean streamMode;
public long getPartSize() {
return partSize;
@@ -575,6 +577,17 @@ public class AWS2S3Configuration implements Cloneable {
this.amazonS3Presigner = amazonS3Presigner;
}
+ public boolean isStreamMode() {
+ return streamMode;
+ }
+
+ /**
+ * If StreaMode is true a different way of uploading will be provided
+ */
+ public void setStreamMode(boolean streamMode) {
+ this.streamMode = streamMode;
+ }
+
public AWS2S3Configuration copy() {
try {
return (AWS2S3Configuration) super.clone();
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
index 9cad4ee..f8aa480 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
@@ -72,7 +72,11 @@ public class AWS2S3Endpoint extends ScheduledPollEndpoint {
@Override
public Producer createProducer() throws Exception {
- return new AWS2S3Producer(this);
+ if (!configuration.isStreamMode()) {
+ return new AWS2S3Producer(this);
+ } else {
+ return new AWS2S3StreamUploadProducer(this);
+ }
}
@Override
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3StreamUploadProducer.java
new file mode 100644
index 0000000..63843d3
--- /dev/null
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3StreamUploadProducer.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.s3;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.model.*;
+import software.amazon.awssdk.utils.IoUtils;
+
+/**
+ * A Producer which sends messages to the Amazon Web Service Simple Storage Service
+ * <a href="http://aws.amazon.com/s3/">AWS S3</a>
+ */
+public class AWS2S3StreamUploadProducer extends DefaultProducer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AWS2S3StreamUploadProducer.class);
+
+ private transient String s3ProducerToString;
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ CreateMultipartUploadResponse initResponse;
+ int index = 1;
+ List<CompletedPart> completedParts = new ArrayList<CompletedPart>();
+ int part = 0;
+
+ public AWS2S3StreamUploadProducer(final Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ streamUpload(exchange);
+ }
+
+ public void streamUpload(final Exchange exchange) throws Exception {
+ File filePayload = null;
+ InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);
+
+ buffer.write(IoUtils.toByteArray(is));
+ final String keyName = determineKey(exchange);
+ String dynamicKeyName;
+ if (part > 0) {
+ dynamicKeyName = keyName + "-" + part;
+ } else {
+ dynamicKeyName = keyName;
+ }
+ CreateMultipartUploadRequest.Builder createMultipartUploadRequest
+ = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName);
+
+ String storageClass = determineStorageClass(exchange);
+ if (storageClass != null) {
+ createMultipartUploadRequest.storageClass(storageClass);
+ }
+
+ String cannedAcl = exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class);
+ if (cannedAcl != null) {
+ ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl);
+ createMultipartUploadRequest.acl(objectAcl);
+ }
+
+ BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class);
+ if (acl != null) {
+ // note: if cannedacl and acl are both specified the last one will
+ // be used. refer to
+ // PutObjectRequest#setAccessControlList for more details
+ createMultipartUploadRequest.acl(acl.toString());
+ }
+
+ if (getConfiguration().isUseAwsKMS()) {
+ createMultipartUploadRequest.ssekmsKeyId(getConfiguration().getAwsKMSKeyId());
+ createMultipartUploadRequest.serverSideEncryption(ServerSideEncryption.AWS_KMS);
+ }
+
+ if (getConfiguration().isUseCustomerKey()) {
+ if (ObjectHelper.isNotEmpty(getConfiguration().getCustomerKeyId())) {
+ createMultipartUploadRequest.sseCustomerKey(getConfiguration().getCustomerKeyId());
+ }
+ if (ObjectHelper.isNotEmpty(getConfiguration().getCustomerKeyMD5())) {
+ createMultipartUploadRequest.sseCustomerKeyMD5(getConfiguration().getCustomerKeyMD5());
+ }
+ if (ObjectHelper.isNotEmpty(getConfiguration().getCustomerAlgorithm())) {
+ createMultipartUploadRequest.sseCustomerAlgorithm(getConfiguration().getCustomerAlgorithm());
+ }
+ }
+
+ LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange);
+ CompleteMultipartUploadResponse uploadResult = null;
+ if (index == 1) {
+ initResponse
+ = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
+ //final long contentLength = Long.valueOf(objectMetadata.get("Content-Length"));
+ completedParts = new ArrayList<CompletedPart>();
+ long partSize = getConfiguration().getPartSize();
+ }
+
+ long filePosition = 0;
+
+ try {
+ if (buffer.size() >= 5000000 || index == 100) {
+
+ UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
+ .key(dynamicKeyName).uploadId(initResponse.uploadId())
+ .partNumber(index).build();
+
+ LOG.trace("Uploading part [{}] for {}", index, keyName);
+
+ String etag = getEndpoint().getS3Client()
+ .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
+ CompletedPart partUpload = CompletedPart.builder().partNumber(index).eTag(etag).build();
+ completedParts.add(partUpload);
+ buffer.reset();
+ part++;
+ }
+
+ if (index == 100) {
+ CompletedMultipartUpload completeMultipartUpload
+ = CompletedMultipartUpload.builder().parts(completedParts).build();
+ CompleteMultipartUploadRequest compRequest
+ = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
+ .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
+ .uploadId(initResponse.uploadId())
+ .build();
+
+ uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+ index = 0;
+ }
+
+ } catch (Exception e) {
+ getEndpoint().getS3Client()
+ .abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
+ .key(dynamicKeyName).uploadId(initResponse.uploadId()).build());
+ throw e;
+ }
+
+ index++;
+
+ Message message = getMessageForResponse(exchange);
+
+ }
+
+ private AWS2S3Operations determineOperation(Exchange exchange) {
+ AWS2S3Operations operation = exchange.getIn().getHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.class);
+ if (operation == null) {
+ operation = getConfiguration().getOperation();
+ }
+ return operation;
+ }
+
+ private Map<String, String> determineMetadata(final Exchange exchange) {
+ Map<String, String> objectMetadata = new HashMap<String, String>();
+
+ Long contentLength = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_LENGTH, Long.class);
+ if (contentLength != null) {
+ objectMetadata.put("Content-Length", String.valueOf(contentLength));
+ }
+
+ String contentType = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_TYPE, String.class);
+ if (contentType != null) {
+ objectMetadata.put("Content-Type", String.valueOf(contentType));
+ }
+
+ String cacheControl = exchange.getIn().getHeader(AWS2S3Constants.CACHE_CONTROL, String.class);
+ if (cacheControl != null) {
+ objectMetadata.put("Cache-Control", String.valueOf(cacheControl));
+ }
+
+ String contentDisposition = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_DISPOSITION, String.class);
+ if (contentDisposition != null) {
+ objectMetadata.put("Content-Disposition", String.valueOf(contentDisposition));
+ }
+
+ String contentEncoding = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_ENCODING, String.class);
+ if (contentEncoding != null) {
+ objectMetadata.put("Content-Encoding", String.valueOf(contentEncoding));
+ }
+
+ String contentMD5 = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_MD5, String.class);
+ if (contentMD5 != null) {
+ objectMetadata.put("Content-Md5", String.valueOf(contentMD5));
+ }
+
+ return objectMetadata;
+ }
+
+ /**
+ * Reads the bucket name from the header of the given exchange. If not provided, it's read from the endpoint
+ * configuration.
+ *
+ * @param exchange The exchange to read the header from.
+ * @return The bucket name.
+ * @throws IllegalArgumentException if the header could not be determined.
+ */
+ private String determineBucketName(final Exchange exchange) {
+ String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
+
+ if (ObjectHelper.isEmpty(bucketName)) {
+ bucketName = getConfiguration().getBucketName();
+ LOG.trace("AWS S3 Bucket name header is missing, using default one [{}]", bucketName);
+ }
+
+ if (bucketName == null) {
+ throw new IllegalArgumentException("AWS S3 Bucket name header is missing or not configured.");
+ }
+
+ return bucketName;
+ }
+
+ private String determineKey(final Exchange exchange) {
+ String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
+ if (ObjectHelper.isEmpty(key)) {
+ key = getConfiguration().getKeyName();
+ }
+ if (key == null) {
+ throw new IllegalArgumentException("AWS S3 Key header missing.");
+ }
+ return key;
+ }
+
+ private String determineStorageClass(final Exchange exchange) {
+ String storageClass = exchange.getIn().getHeader(AWS2S3Constants.STORAGE_CLASS, String.class);
+ if (storageClass == null) {
+ storageClass = getConfiguration().getStorageClass();
+ }
+
+ return storageClass;
+ }
+
+ private ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] bytes = new byte[1024];
+ int count;
+ while ((count = is.read(bytes)) > 0) {
+ out.write(bytes, 0, count);
+ }
+ return out;
+ }
+
+ protected AWS2S3Configuration getConfiguration() {
+ return getEndpoint().getConfiguration();
+ }
+
+ @Override
+ public String toString() {
+ if (s3ProducerToString == null) {
+ s3ProducerToString = "S3Producer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+ return s3ProducerToString;
+ }
+
+ @Override
+ public AWS2S3Endpoint getEndpoint() {
+ return (AWS2S3Endpoint) super.getEndpoint();
+ }
+
+ public static Message getMessageForResponse(final Exchange exchange) {
+ return exchange.getMessage();
+ }
+
+}
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationIntegrationTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationIntegrationTest.java
new file mode 100644
index 0000000..e567caf
--- /dev/null
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationIntegrationTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.s3.integration;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class S3StreamUploadOperationIntegrationTest extends CamelTestSupport {
+
+ @BindToRegistry("amazonS3Client")
+ S3Client client
+ = S3Client.builder()
+ .credentialsProvider(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create("xxxx", "yyyy")))
+ .region(Region.EU_WEST_1).build();
+
+ @EndpointInject
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendIn() throws Exception {
+ result.expectedMessageCount(1);
+
+ for (int i = 0; i < 1000; i++) {
+ template.sendBody("direct:stream1", "Andrea\n");
+ }
+ for (int i = 0; i < 1000; i++) {
+ template.sendBody("direct:stream2", "Luca\n");
+ }
+
+ Thread.sleep(30000);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ String awsEndpoint1
+ = "aws2-s3://mycamel-1?autoCreateBucket=true&streamMode=true&keyName=fileTest";
+ String awsEndpoint2
+ = "aws2-s3://mycamel-1?autoCreateBucket=true&streamMode=true&keyName=fileTestParallel";
+
+ from("direct:stream1").toD(awsEndpoint1).to("mock:result");
+ from("direct:stream2").toD(awsEndpoint2).to("mock:result");
+ }
+ };
+ }
+
+}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
index 4cfa061..c5e6b6e 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
@@ -228,6 +228,21 @@ public interface Aws2S3ComponentBuilderFactory {
return this;
}
/**
+ * If StreaMode is true a different way of uploading will be provided.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: common
+ *
+ * @param streamMode the value to set
+ * @return the dsl builder
+ */
+ default Aws2S3ComponentBuilder streamMode(boolean streamMode) {
+ doSetProperty("streamMode", streamMode);
+ return this;
+ }
+ /**
* If we want to trust all certificates in case of overriding the
* endpoint.
*
@@ -801,6 +816,7 @@ public interface Aws2S3ComponentBuilderFactory {
case "proxyPort": getOrCreateConfiguration((AWS2S3Component) component).setProxyPort((java.lang.Integer) value); return true;
case "proxyProtocol": getOrCreateConfiguration((AWS2S3Component) component).setProxyProtocol((software.amazon.awssdk.core.Protocol) value); return true;
case "region": getOrCreateConfiguration((AWS2S3Component) component).setRegion((java.lang.String) value); return true;
+ case "streamMode": getOrCreateConfiguration((AWS2S3Component) component).setStreamMode((boolean) value); return true;
case "trustAllCertificates": getOrCreateConfiguration((AWS2S3Component) component).setTrustAllCertificates((boolean) value); return true;
case "uriEndpointOverride": getOrCreateConfiguration((AWS2S3Component) component).setUriEndpointOverride((java.lang.String) value); return true;
case "useDefaultCredentialsProvider": getOrCreateConfiguration((AWS2S3Component) component).setUseDefaultCredentialsProvider((boolean) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
index 16396386..0ae939c 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
@@ -328,6 +328,37 @@ public interface AWS2S3EndpointBuilderFactory {
return this;
}
/**
+ * If StreaMode is true a different way of uploading will be provided.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: common
+ *
+ * @param streamMode the value to set
+ * @return the dsl builder
+ */
+ default AWS2S3EndpointConsumerBuilder streamMode(boolean streamMode) {
+ doSetProperty("streamMode", streamMode);
+ return this;
+ }
+ /**
+ * If StreaMode is true a different way of uploading will be provided.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: common
+ *
+ * @param streamMode the value to set
+ * @return the dsl builder
+ */
+ default AWS2S3EndpointConsumerBuilder streamMode(String streamMode) {
+ doSetProperty("streamMode", streamMode);
+ return this;
+ }
+ /**
* If we want to trust all certificates in case of overriding the
* endpoint.
*
@@ -1849,6 +1880,37 @@ public interface AWS2S3EndpointBuilderFactory {
return this;
}
/**
+ * If StreaMode is true a different way of uploading will be provided.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: common
+ *
+ * @param streamMode the value to set
+ * @return the dsl builder
+ */
+ default AWS2S3EndpointProducerBuilder streamMode(boolean streamMode) {
+ doSetProperty("streamMode", streamMode);
+ return this;
+ }
+ /**
+ * If StreaMode is true a different way of uploading will be provided.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: common
+ *
+ * @param streamMode the value to set
+ * @return the dsl builder
+ */
+ default AWS2S3EndpointProducerBuilder streamMode(String streamMode) {
+ doSetProperty("streamMode", streamMode);
+ return this;
+ }
+ /**
* If we want to trust all certificates in case of overriding the
* endpoint.
*
@@ -2594,6 +2656,37 @@ public interface AWS2S3EndpointBuilderFactory {
return this;
}
/**
+ * If StreaMode is true a different way of uploading will be provided.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: common
+ *
+ * @param streamMode the value to set
+ * @return the dsl builder
+ */
+ default AWS2S3EndpointBuilder streamMode(boolean streamMode) {
+ doSetProperty("streamMode", streamMode);
+ return this;
+ }
+ /**
+ * If StreaMode is true a different way of uploading will be provided.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: common
+ *
+ * @param streamMode the value to set
+ * @return the dsl builder
+ */
+ default AWS2S3EndpointBuilder streamMode(String streamMode) {
+ doSetProperty("streamMode", streamMode);
+ return this;
+ }
+ /**
* If we want to trust all certificates in case of overriding the
* endpoint.
*
diff --git a/docs/components/modules/ROOT/pages/aws2-s3-component.adoc b/docs/components/modules/ROOT/pages/aws2-s3-component.adoc
index d3d355f..579bf3f 100644
--- a/docs/components/modules/ROOT/pages/aws2-s3-component.adoc
+++ b/docs/components/modules/ROOT/pages/aws2-s3-component.adoc
@@ -49,7 +49,7 @@ from("aws2-s3://helloBucket?accessKey=yourAccessKey&secretKey=yourSecretKey&pref
// component options: START
-The AWS 2 S3 Storage Service component supports 43 options, which are listed below.
+The AWS 2 S3 Storage Service component supports 44 options, which are listed below.
@@ -67,6 +67,7 @@ The AWS 2 S3 Storage Service component supports 43 options, which are listed bel
| *proxyPort* (common) | Specify a proxy port to be used inside the client definition. | | Integer
| *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
| *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() | | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
| *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
| *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option | | String
| *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
@@ -131,7 +132,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (61 parameters):
+=== Query Parameters (62 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -147,6 +148,7 @@ with the following path and query parameters:
| *proxyPort* (common) | Specify a proxy port to be used inside the client definition. | | Integer
| *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
| *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() | | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
| *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
| *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option | | String
| *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean