You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/04/01 12:39:50 UTC

[camel] 01/05: CAMEL-16185 - AWS S3: improve multipart support - streaming upload

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a47bbc3ec2b3766be516cffd801313492db3655f
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Apr 1 07:10:30 2021 +0200

    CAMEL-16185 - AWS S3: improve multipart support - streaming upload
---
 .../camel/catalog/docs/aws2-s3-component.adoc      |   6 +-
 .../aws2/s3/AWS2S3ComponentConfigurer.java         |   6 +
 .../aws2/s3/AWS2S3EndpointConfigurer.java          |   6 +
 .../aws2/s3/AWS2S3EndpointUriFactory.java          |   3 +-
 .../apache/camel/component/aws2/s3/aws2-s3.json    |   2 +
 .../src/main/docs/aws2-s3-component.adoc           |   6 +-
 .../component/aws2/s3/AWS2S3Configuration.java     |  13 +
 .../camel/component/aws2/s3/AWS2S3Endpoint.java    |   6 +-
 .../aws2/s3/AWS2S3StreamUploadProducer.java        | 289 +++++++++++++++++++++
 .../S3StreamUploadOperationIntegrationTest.java    |  76 ++++++
 .../dsl/Aws2S3ComponentBuilderFactory.java         |  16 ++
 .../endpoint/dsl/AWS2S3EndpointBuilderFactory.java |  93 +++++++
 .../modules/ROOT/pages/aws2-s3-component.adoc      |   6 +-
 13 files changed, 520 insertions(+), 8 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc
index 78bcbae..e497bde 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/aws2-s3-component.adoc
@@ -47,7 +47,7 @@ from("aws2-s3://helloBucket?accessKey=yourAccessKey&secretKey=yourSecretKey&pref
 
 
 // component options: START
-The AWS 2 S3 Storage Service component supports 43 options, which are listed below.
+The AWS 2 S3 Storage Service component supports 44 options, which are listed below.
 
 
 
@@ -65,6 +65,7 @@ The AWS 2 S3 Storage Service component supports 43 options, which are listed bel
 | *proxyPort* (common) | Specify a proxy port to be used inside the client definition. |  | Integer
 | *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
 | *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() |  | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
 | *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
 | *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option |  | String
 | *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
@@ -129,7 +130,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (61 parameters):
+=== Query Parameters (62 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -145,6 +146,7 @@ with the following path and query parameters:
 | *proxyPort* (common) | Specify a proxy port to be used inside the client definition. |  | Integer
 | *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
 | *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() |  | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
 | *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
 | *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option |  | String
 | *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
index 77bb7d0..6f0e69b 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
+++ b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3ComponentConfigurer.java
@@ -98,6 +98,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme
         case "secretKey": getOrCreateConfiguration(target).setSecretKey(property(camelContext, java.lang.String.class, value)); return true;
         case "storageclass":
         case "storageClass": getOrCreateConfiguration(target).setStorageClass(property(camelContext, java.lang.String.class, value)); return true;
+        case "streammode":
+        case "streamMode": getOrCreateConfiguration(target).setStreamMode(property(camelContext, boolean.class, value)); return true;
         case "trustallcertificates":
         case "trustAllCertificates": getOrCreateConfiguration(target).setTrustAllCertificates(property(camelContext, boolean.class, value)); return true;
         case "uriendpointoverride":
@@ -190,6 +192,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme
         case "secretKey": return java.lang.String.class;
         case "storageclass":
         case "storageClass": return java.lang.String.class;
+        case "streammode":
+        case "streamMode": return boolean.class;
         case "trustallcertificates":
         case "trustAllCertificates": return boolean.class;
         case "uriendpointoverride":
@@ -278,6 +282,8 @@ public class AWS2S3ComponentConfigurer extends PropertyConfigurerSupport impleme
         case "secretKey": return getOrCreateConfiguration(target).getSecretKey();
         case "storageclass":
         case "storageClass": return getOrCreateConfiguration(target).getStorageClass();
+        case "streammode":
+        case "streamMode": return getOrCreateConfiguration(target).isStreamMode();
         case "trustallcertificates":
         case "trustAllCertificates": return getOrCreateConfiguration(target).isTrustAllCertificates();
         case "uriendpointoverride":
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
index 30d87c0..255f2bb 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
+++ b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointConfigurer.java
@@ -121,6 +121,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen
         case "startScheduler": target.setStartScheduler(property(camelContext, boolean.class, value)); return true;
         case "storageclass":
         case "storageClass": target.getConfiguration().setStorageClass(property(camelContext, java.lang.String.class, value)); return true;
+        case "streammode":
+        case "streamMode": target.getConfiguration().setStreamMode(property(camelContext, boolean.class, value)); return true;
         case "timeunit":
         case "timeUnit": target.setTimeUnit(property(camelContext, java.util.concurrent.TimeUnit.class, value)); return true;
         case "trustallcertificates":
@@ -247,6 +249,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen
         case "startScheduler": return boolean.class;
         case "storageclass":
         case "storageClass": return java.lang.String.class;
+        case "streammode":
+        case "streamMode": return boolean.class;
         case "timeunit":
         case "timeUnit": return java.util.concurrent.TimeUnit.class;
         case "trustallcertificates":
@@ -369,6 +373,8 @@ public class AWS2S3EndpointConfigurer extends PropertyConfigurerSupport implemen
         case "startScheduler": return target.isStartScheduler();
         case "storageclass":
         case "storageClass": return target.getConfiguration().getStorageClass();
+        case "streammode":
+        case "streamMode": return target.getConfiguration().isStreamMode();
         case "timeunit":
         case "timeUnit": return target.getTimeUnit();
         case "trustallcertificates":
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java
index f88438a..63752dd 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java
+++ b/components/camel-aws/camel-aws2-s3/src/generated/java/org/apache/camel/component/aws2/s3/AWS2S3EndpointUriFactory.java
@@ -20,10 +20,11 @@ public class AWS2S3EndpointUriFactory extends org.apache.camel.support.component
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(62);
+        Set<String> props = new HashSet<>(63);
         props.add("customerAlgorithm");
         props.add("fileName");
         props.add("useCustomerKey");
+        props.add("streamMode");
         props.add("bucketNameOrArn");
         props.add("customerKeyId");
         props.add("prefix");
diff --git a/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json b/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
index d2cbe25..4aab19d 100644
--- a/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
+++ b/components/camel-aws/camel-aws2-s3/src/generated/resources/org/apache/camel/component/aws2/s3/aws2-s3.json
@@ -33,6 +33,7 @@
     "proxyPort": { "kind": "property", "displayName": "Proxy Port", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Specify a proxy port to be used inside the client definition." },
     "proxyProtocol": { "kind": "property", "displayName": "Proxy Protocol", "group": "common", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "To define a proxy protocol when instantiating th [...]
     "region": { "kind": "property", "displayName": "Region", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (f [...]
+    "streamMode": { "kind": "property", "displayName": "Stream Mode", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If StreaMode is true a different way of uploading will be provided" },
     "trustAllCertificates": { "kind": "property", "displayName": "Trust All Certificates", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If we want to trust all certificates in case of overriding the endpoint" },
     "uriEndpointOverride": { "kind": "property", "displayName": "Uri Endpoint Override", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option" },
     "useDefaultCredentialsProvider": { "kind": "property", "displayName": "Use Default Credentials Provider", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set whether the S3 client should expect to load credentials through a def [...]
@@ -78,6 +79,7 @@
     "proxyPort": { "kind": "parameter", "displayName": "Proxy Port", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Specify a proxy port to be used inside the client definition." },
     "proxyProtocol": { "kind": "parameter", "displayName": "Proxy Protocol", "group": "common", "label": "", "required": false, "type": "object", "javaType": "software.amazon.awssdk.core.Protocol", "enum": [ "HTTP", "HTTPS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "HTTPS", "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "To define a proxy protocol when instantiating t [...]
     "region": { "kind": "parameter", "displayName": "Region", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region ( [...]
+    "streamMode": { "kind": "parameter", "displayName": "Stream Mode", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If StreaMode is true a different way of uploading will be provided" },
     "trustAllCertificates": { "kind": "parameter", "displayName": "Trust All Certificates", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "If we want to trust all certificates in case of overriding the endpoint" },
     "uriEndpointOverride": { "kind": "parameter", "displayName": "Uri Endpoint Override", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option" },
     "useDefaultCredentialsProvider": { "kind": "parameter", "displayName": "Use Default Credentials Provider", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.aws2.s3.AWS2S3Configuration", "configurationField": "configuration", "description": "Set whether the S3 client should expect to load credentials through a de [...]
diff --git a/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc b/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc
index 78bcbae..e497bde 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc
+++ b/components/camel-aws/camel-aws2-s3/src/main/docs/aws2-s3-component.adoc
@@ -47,7 +47,7 @@ from("aws2-s3://helloBucket?accessKey=yourAccessKey&secretKey=yourSecretKey&pref
 
 
 // component options: START
-The AWS 2 S3 Storage Service component supports 43 options, which are listed below.
+The AWS 2 S3 Storage Service component supports 44 options, which are listed below.
 
 
 
@@ -65,6 +65,7 @@ The AWS 2 S3 Storage Service component supports 43 options, which are listed bel
 | *proxyPort* (common) | Specify a proxy port to be used inside the client definition. |  | Integer
 | *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
 | *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() |  | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
 | *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
 | *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option |  | String
 | *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
@@ -129,7 +130,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (61 parameters):
+=== Query Parameters (62 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -145,6 +146,7 @@ with the following path and query parameters:
 | *proxyPort* (common) | Specify a proxy port to be used inside the client definition. |  | Integer
 | *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
 | *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() |  | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
 | *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
 | *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option |  | String
 | *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
index 439aa56..aec9bf7 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Configuration.java
@@ -110,6 +110,8 @@ public class AWS2S3Configuration implements Cloneable {
     private String uriEndpointOverride;
     @UriParam(defaultValue = "false")
     private boolean pojoRequest;
+    @UriParam(defaultValue = "false")
+    private boolean streamMode;
 
     public long getPartSize() {
         return partSize;
@@ -575,6 +577,17 @@ public class AWS2S3Configuration implements Cloneable {
         this.amazonS3Presigner = amazonS3Presigner;
     }
 
+    public boolean isStreamMode() {
+        return streamMode;
+    }
+
+    /**
+     * If StreaMode is true a different way of uploading will be provided
+     */
+    public void setStreamMode(boolean streamMode) {
+        this.streamMode = streamMode;
+    }
+
     public AWS2S3Configuration copy() {
         try {
             return (AWS2S3Configuration) super.clone();
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
index 9cad4ee..f8aa480 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
@@ -72,7 +72,11 @@ public class AWS2S3Endpoint extends ScheduledPollEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new AWS2S3Producer(this);
+        if (!configuration.isStreamMode()) {
+            return new AWS2S3Producer(this);
+        } else {
+            return new AWS2S3StreamUploadProducer(this);
+        }
     }
 
     @Override
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3StreamUploadProducer.java
new file mode 100644
index 0000000..63843d3
--- /dev/null
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3StreamUploadProducer.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.s3;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.model.*;
+import software.amazon.awssdk.utils.IoUtils;
+
+/**
+ * A Producer which sends messages to the Amazon Web Service Simple Storage Service
+ * <a href="http://aws.amazon.com/s3/">AWS S3</a>
+ */
+public class AWS2S3StreamUploadProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AWS2S3StreamUploadProducer.class);
+
+    private transient String s3ProducerToString;
+
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    CreateMultipartUploadResponse initResponse;
+    int index = 1;
+    List<CompletedPart> completedParts = new ArrayList<CompletedPart>();
+    int part = 0;
+
+    public AWS2S3StreamUploadProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        streamUpload(exchange);
+    }
+
+    public void streamUpload(final Exchange exchange) throws Exception {
+        File filePayload = null;
+        InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);
+
+        buffer.write(IoUtils.toByteArray(is));
+        final String keyName = determineKey(exchange);
+        String dynamicKeyName;
+        if (part > 0) {
+            dynamicKeyName = keyName + "-" + part;
+        } else {
+            dynamicKeyName = keyName;
+        }
+        CreateMultipartUploadRequest.Builder createMultipartUploadRequest
+                = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName);
+
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            createMultipartUploadRequest.storageClass(storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(AWS2S3Constants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            ObjectCannedACL objectAcl = ObjectCannedACL.valueOf(cannedAcl);
+            createMultipartUploadRequest.acl(objectAcl);
+        }
+
+        BucketCannedACL acl = exchange.getIn().getHeader(AWS2S3Constants.ACL, BucketCannedACL.class);
+        if (acl != null) {
+            // note: if cannedacl and acl are both specified the last one will
+            // be used. refer to
+            // PutObjectRequest#setAccessControlList for more details
+            createMultipartUploadRequest.acl(acl.toString());
+        }
+
+        if (getConfiguration().isUseAwsKMS()) {
+            createMultipartUploadRequest.ssekmsKeyId(getConfiguration().getAwsKMSKeyId());
+            createMultipartUploadRequest.serverSideEncryption(ServerSideEncryption.AWS_KMS);
+        }
+
+        if (getConfiguration().isUseCustomerKey()) {
+            if (ObjectHelper.isNotEmpty(getConfiguration().getCustomerKeyId())) {
+                createMultipartUploadRequest.sseCustomerKey(getConfiguration().getCustomerKeyId());
+            }
+            if (ObjectHelper.isNotEmpty(getConfiguration().getCustomerKeyMD5())) {
+                createMultipartUploadRequest.sseCustomerKeyMD5(getConfiguration().getCustomerKeyMD5());
+            }
+            if (ObjectHelper.isNotEmpty(getConfiguration().getCustomerAlgorithm())) {
+                createMultipartUploadRequest.sseCustomerAlgorithm(getConfiguration().getCustomerAlgorithm());
+            }
+        }
+
+        LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange);
+        CompleteMultipartUploadResponse uploadResult = null;
+        if (index == 1) {
+            initResponse
+                    = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
+            //final long contentLength = Long.valueOf(objectMetadata.get("Content-Length"));
+            completedParts = new ArrayList<CompletedPart>();
+            long partSize = getConfiguration().getPartSize();
+        }
+
+        long filePosition = 0;
+
+        try {
+            if (buffer.size() >= 5000000 || index == 100) {
+
+                UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
+                        .key(dynamicKeyName).uploadId(initResponse.uploadId())
+                        .partNumber(index).build();
+
+                LOG.trace("Uploading part [{}] for {}", index, keyName);
+
+                String etag = getEndpoint().getS3Client()
+                        .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
+                CompletedPart partUpload = CompletedPart.builder().partNumber(index).eTag(etag).build();
+                completedParts.add(partUpload);
+                buffer.reset();
+                part++;
+            }
+
+            if (index == 100) {
+                CompletedMultipartUpload completeMultipartUpload
+                        = CompletedMultipartUpload.builder().parts(completedParts).build();
+                CompleteMultipartUploadRequest compRequest
+                        = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
+                                .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
+                                .uploadId(initResponse.uploadId())
+                                .build();
+
+                uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+                index = 0;
+            }
+
+        } catch (Exception e) {
+            getEndpoint().getS3Client()
+                    .abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
+                            .key(dynamicKeyName).uploadId(initResponse.uploadId()).build());
+            throw e;
+        }
+
+        index++;
+
+        Message message = getMessageForResponse(exchange);
+
+    }
+
+    private AWS2S3Operations determineOperation(Exchange exchange) {
+        AWS2S3Operations operation = exchange.getIn().getHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<String, String>();
+
+        Long contentLength = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            objectMetadata.put("Content-Type", String.valueOf(contentType));
+        }
+
+        String cacheControl = exchange.getIn().getHeader(AWS2S3Constants.CACHE_CONTROL, String.class);
+        if (cacheControl != null) {
+            objectMetadata.put("Cache-Control", String.valueOf(cacheControl));
+        }
+
+        String contentDisposition = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_DISPOSITION, String.class);
+        if (contentDisposition != null) {
+            objectMetadata.put("Content-Disposition", String.valueOf(contentDisposition));
+        }
+
+        String contentEncoding = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            objectMetadata.put("Content-Encoding", String.valueOf(contentEncoding));
+        }
+
+        String contentMD5 = exchange.getIn().getHeader(AWS2S3Constants.CONTENT_MD5, String.class);
+        if (contentMD5 != null) {
+            objectMetadata.put("Content-Md5", String.valueOf(contentMD5));
+        }
+
+        return objectMetadata;
+    }
+
+    /**
+     * Reads the bucket name from the header of the given exchange. If not provided, it's read from the endpoint
+     * configuration.
+     *
+     * @param  exchange                 The exchange to read the header from.
+     * @return                          The bucket name.
+     * @throws IllegalArgumentException if the header could not be determined.
+     */
+    private String determineBucketName(final Exchange exchange) {
+        String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
+
+        if (ObjectHelper.isEmpty(bucketName)) {
+            bucketName = getConfiguration().getBucketName();
+            LOG.trace("AWS S3 Bucket name header is missing, using default one [{}]", bucketName);
+        }
+
+        if (bucketName == null) {
+            throw new IllegalArgumentException("AWS S3 Bucket name header is missing or not configured.");
+        }
+
+        return bucketName;
+    }
+
+    private String determineKey(final Exchange exchange) {
+        String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
+        if (ObjectHelper.isEmpty(key)) {
+            key = getConfiguration().getKeyName();
+        }
+        if (key == null) {
+            throw new IllegalArgumentException("AWS S3 Key header missing.");
+        }
+        return key;
+    }
+
+    private String determineStorageClass(final Exchange exchange) {
+        String storageClass = exchange.getIn().getHeader(AWS2S3Constants.STORAGE_CLASS, String.class);
+        if (storageClass == null) {
+            storageClass = getConfiguration().getStorageClass();
+        }
+
+        return storageClass;
+    }
+
+    private ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        byte[] bytes = new byte[1024];
+        int count;
+        while ((count = is.read(bytes)) > 0) {
+            out.write(bytes, 0, count);
+        }
+        return out;
+    }
+
+    protected AWS2S3Configuration getConfiguration() {
+        return getEndpoint().getConfiguration();
+    }
+
+    @Override
+    public String toString() {
+        if (s3ProducerToString == null) {
+            s3ProducerToString = "S3Producer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+        }
+        return s3ProducerToString;
+    }
+
+    @Override
+    public AWS2S3Endpoint getEndpoint() {
+        return (AWS2S3Endpoint) super.getEndpoint();
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+}
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationIntegrationTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationIntegrationTest.java
new file mode 100644
index 0000000..e567caf
--- /dev/null
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationIntegrationTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.s3.integration;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class S3StreamUploadOperationIntegrationTest extends CamelTestSupport {
+
+    @BindToRegistry("amazonS3Client")
+    S3Client client
+            = S3Client.builder()
+                    .credentialsProvider(StaticCredentialsProvider.create(
+                            AwsBasicCredentials.create("xxxx", "yyyy")))
+                    .region(Region.EU_WEST_1).build();
+
+    @EndpointInject
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void sendIn() throws Exception {
+        result.expectedMessageCount(1);
+
+        for (int i = 0; i < 1000; i++) {
+            template.sendBody("direct:stream1", "Andrea\n");
+        }
+        for (int i = 0; i < 1000; i++) {
+            template.sendBody("direct:stream2", "Luca\n");
+        }
+
+        Thread.sleep(30000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                String awsEndpoint1
+                        = "aws2-s3://mycamel-1?autoCreateBucket=true&streamMode=true&keyName=fileTest";
+                String awsEndpoint2
+                        = "aws2-s3://mycamel-1?autoCreateBucket=true&streamMode=true&keyName=fileTestParallel";
+
+                from("direct:stream1").toD(awsEndpoint1).to("mock:result");
+                from("direct:stream2").toD(awsEndpoint2).to("mock:result");
+            }
+        };
+    }
+
+}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
index 4cfa061..c5e6b6e 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2S3ComponentBuilderFactory.java
@@ -228,6 +228,21 @@ public interface Aws2S3ComponentBuilderFactory {
             return this;
         }
         /**
+         * If StreaMode is true a different way of uploading will be provided.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param streamMode the value to set
+         * @return the dsl builder
+         */
+        default Aws2S3ComponentBuilder streamMode(boolean streamMode) {
+            doSetProperty("streamMode", streamMode);
+            return this;
+        }
+        /**
          * If we want to trust all certificates in case of overriding the
          * endpoint.
          * 
@@ -801,6 +816,7 @@ public interface Aws2S3ComponentBuilderFactory {
             case "proxyPort": getOrCreateConfiguration((AWS2S3Component) component).setProxyPort((java.lang.Integer) value); return true;
             case "proxyProtocol": getOrCreateConfiguration((AWS2S3Component) component).setProxyProtocol((software.amazon.awssdk.core.Protocol) value); return true;
             case "region": getOrCreateConfiguration((AWS2S3Component) component).setRegion((java.lang.String) value); return true;
+            case "streamMode": getOrCreateConfiguration((AWS2S3Component) component).setStreamMode((boolean) value); return true;
             case "trustAllCertificates": getOrCreateConfiguration((AWS2S3Component) component).setTrustAllCertificates((boolean) value); return true;
             case "uriEndpointOverride": getOrCreateConfiguration((AWS2S3Component) component).setUriEndpointOverride((java.lang.String) value); return true;
             case "useDefaultCredentialsProvider": getOrCreateConfiguration((AWS2S3Component) component).setUseDefaultCredentialsProvider((boolean) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
index 16396386..0ae939c 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/AWS2S3EndpointBuilderFactory.java
@@ -328,6 +328,37 @@ public interface AWS2S3EndpointBuilderFactory {
             return this;
         }
         /**
+         * If StreaMode is true a different way of uploading will be provided.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param streamMode the value to set
+         * @return the dsl builder
+         */
+        default AWS2S3EndpointConsumerBuilder streamMode(boolean streamMode) {
+            doSetProperty("streamMode", streamMode);
+            return this;
+        }
+        /**
+         * If StreaMode is true a different way of uploading will be provided.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param streamMode the value to set
+         * @return the dsl builder
+         */
+        default AWS2S3EndpointConsumerBuilder streamMode(String streamMode) {
+            doSetProperty("streamMode", streamMode);
+            return this;
+        }
+        /**
          * If we want to trust all certificates in case of overriding the
          * endpoint.
          * 
@@ -1849,6 +1880,37 @@ public interface AWS2S3EndpointBuilderFactory {
             return this;
         }
         /**
+         * If StreaMode is true a different way of uploading will be provided.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param streamMode the value to set
+         * @return the dsl builder
+         */
+        default AWS2S3EndpointProducerBuilder streamMode(boolean streamMode) {
+            doSetProperty("streamMode", streamMode);
+            return this;
+        }
+        /**
+         * If StreaMode is true a different way of uploading will be provided.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param streamMode the value to set
+         * @return the dsl builder
+         */
+        default AWS2S3EndpointProducerBuilder streamMode(String streamMode) {
+            doSetProperty("streamMode", streamMode);
+            return this;
+        }
+        /**
          * If we want to trust all certificates in case of overriding the
          * endpoint.
          * 
@@ -2594,6 +2656,37 @@ public interface AWS2S3EndpointBuilderFactory {
             return this;
         }
         /**
+         * If StreaMode is true a different way of uploading will be provided.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param streamMode the value to set
+         * @return the dsl builder
+         */
+        default AWS2S3EndpointBuilder streamMode(boolean streamMode) {
+            doSetProperty("streamMode", streamMode);
+            return this;
+        }
+        /**
+         * If StreaMode is true a different way of uploading will be provided.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param streamMode the value to set
+         * @return the dsl builder
+         */
+        default AWS2S3EndpointBuilder streamMode(String streamMode) {
+            doSetProperty("streamMode", streamMode);
+            return this;
+        }
+        /**
          * If we want to trust all certificates in case of overriding the
          * endpoint.
          * 
diff --git a/docs/components/modules/ROOT/pages/aws2-s3-component.adoc b/docs/components/modules/ROOT/pages/aws2-s3-component.adoc
index d3d355f..579bf3f 100644
--- a/docs/components/modules/ROOT/pages/aws2-s3-component.adoc
+++ b/docs/components/modules/ROOT/pages/aws2-s3-component.adoc
@@ -49,7 +49,7 @@ from("aws2-s3://helloBucket?accessKey=yourAccessKey&secretKey=yourSecretKey&pref
 
 
 // component options: START
-The AWS 2 S3 Storage Service component supports 43 options, which are listed below.
+The AWS 2 S3 Storage Service component supports 44 options, which are listed below.
 
 
 
@@ -67,6 +67,7 @@ The AWS 2 S3 Storage Service component supports 43 options, which are listed bel
 | *proxyPort* (common) | Specify a proxy port to be used inside the client definition. |  | Integer
 | *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
 | *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() |  | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
 | *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
 | *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option |  | String
 | *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean
@@ -131,7 +132,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (61 parameters):
+=== Query Parameters (62 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -147,6 +148,7 @@ with the following path and query parameters:
 | *proxyPort* (common) | Specify a proxy port to be used inside the client definition. |  | Integer
 | *proxyProtocol* (common) | To define a proxy protocol when instantiating the S3 client. There are 2 enums and the value can be one of: HTTP, HTTPS | HTTPS | Protocol
 | *region* (common) | The region in which S3 client needs to work. When using this parameter, the configuration will expect the lowercase name of the region (for example ap-east-1) You'll need to use the name Region.EU_WEST_1.id() |  | String
+| *streamMode* (common) | If StreaMode is true a different way of uploading will be provided | false | boolean
 | *trustAllCertificates* (common) | If we want to trust all certificates in case of overriding the endpoint | false | boolean
 | *uriEndpointOverride* (common) | Set the overriding uri endpoint. This option needs to be used in combination with overrideEndpoint option |  | String
 | *useDefaultCredentialsProvider* (common) | Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. | false | boolean