You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/04/06 06:40:03 UTC

[camel] 02/09: CAMEL-16185 - AWS S3: improve multipart support - streaming upload

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 844da25b4ebf690d2625a543dd154eb9fdabc37d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Sun Apr 4 16:37:24 2021 +0200

    CAMEL-16185 - AWS S3: improve multipart support - streaming upload
---
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 89 ++++++++++++++--------
 .../S3StreamUploadOperationLocalstackTest.java     |  3 +
 ...va => S3StreamUploadTimeoutLocalstackTest.java} | 29 ++-----
 3 files changed, 68 insertions(+), 53 deletions(-)

diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 32c6f28..f4d544d 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -66,9 +66,9 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
     AtomicInteger part = new AtomicInteger();
     UUID id;
     String dynamicKeyName;
+    CompleteMultipartUploadResponse uploadResult;
     private transient String s3ProducerToString;
     private ScheduledExecutorService timeoutCheckerExecutorService;
-    private boolean timeout;
 
     @Override
     protected void doStart() throws Exception {
@@ -76,7 +76,21 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
         timeoutCheckerExecutorService
                 = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
                         "timeout_checker");
-        timeoutCheckerExecutorService.scheduleAtFixedRate(new AggregationIntervalTask(), 1, 1, TimeUnit.SECONDS);
+        timeoutCheckerExecutorService.scheduleAtFixedRate(new AggregationIntervalTask(), 10, 10, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (ObjectHelper.isNotEmpty(initResponse)) {
+            if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
+                if (index.get() > 0) {
+                    uploadPart();
+                    completeUpload();
+                }
+            }
+        }
+        super.doStop();
+
     }
 
     /**
@@ -86,8 +100,14 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
 
         @Override
         public void run() {
-            timeout = true;
-            LOG.info("timeout triggered");
+            if (ObjectHelper.isNotEmpty(initResponse)) {
+                if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
+                    if (index.get() > 0) {
+                        uploadPart();
+                        completeUpload();
+                    }
+                }
+            }
         }
     }
 
@@ -157,41 +177,15 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
         try {
             if (buffer.size() >= getConfiguration().getBatchSize()
                     || index.get() == getConfiguration().getBatchMessageNumber()) {
-                LOG.info("Timeout " + timeout);
-
-                UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
-                        .key(dynamicKeyName).uploadId(initResponse.uploadId())
-                        .partNumber(index.get()).build();
 
-                LOG.trace("Uploading part {} at index {} for {}", part, index, keyName);
+                uploadPart();
+                completeUpload();
 
-                String etag = getEndpoint().getS3Client()
-                        .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
-                CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build();
-                completedParts.add(partUpload);
-                buffer.reset();
-                part.getAndIncrement();
-            }
-
-            if (index.get() == getConfiguration().getBatchMessageNumber() || timeout) {
-                CompletedMultipartUpload completeMultipartUpload
-                        = CompletedMultipartUpload.builder().parts(completedParts).build();
-                CompleteMultipartUploadRequest compRequest
-                        = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
-                                .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
-                                .uploadId(initResponse.uploadId())
-                                .build();
-
-                CompleteMultipartUploadResponse uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
-                LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(),
-                        index);
                 Message message = getMessageForResponse(exchange);
                 message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
                 if (uploadResult.versionId() != null) {
                     message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
                 }
-                timeout = false;
-                index.getAndSet(0);
             }
 
         } catch (Exception e) {
@@ -204,6 +198,37 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
         index.getAndIncrement();
     }
 
+    private void completeUpload() {
+        CompletedMultipartUpload completeMultipartUpload
+                = CompletedMultipartUpload.builder().parts(completedParts).build();
+        CompleteMultipartUploadRequest compRequest
+                = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
+                        .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
+                        .uploadId(initResponse.uploadId())
+                        .build();
+
+        uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+        LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(),
+                index);
+
+        index.getAndSet(0);
+    }
+
+    private void uploadPart() {
+        UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
+                .key(dynamicKeyName).uploadId(initResponse.uploadId())
+                .partNumber(index.get()).build();
+
+        LOG.trace("Uploading part {} at index {} for {}", part, index, getConfiguration().getKeyName());
+
+        String etag = getEndpoint().getS3Client()
+                .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
+        CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build();
+        completedParts.add(partUpload);
+        buffer.reset();
+        part.getAndIncrement();
+    }
+
     private String fileNameToUpload(
             String fileName, AWSS3NamingStrategyEnum strategy, String ext, AtomicInteger part, UUID id) {
         String dynamicKeyName;
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
index 6a37b9ad..0db9740 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
@@ -60,6 +60,9 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
         Thread.sleep(30000);
         List<S3Object> resp = ex.getMessage().getBody(List.class);
         assertEquals(40, resp.size());
+        for (S3Object s3Object : resp) {
+            System.err.println(s3Object.key());
+        }
     }
 
     @Override
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
similarity index 65%
copy from components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
copy to components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
index 6a37b9ad..f42a153 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.camel.component.aws2.s3.localstack;
 
 import java.util.List;
@@ -31,7 +15,7 @@ import software.amazon.awssdk.services.s3.model.S3Object;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
+public class S3StreamUploadTimeoutLocalstackTest extends Aws2S3BaseTest {
 
     @EndpointInject
     private ProducerTemplate template;
@@ -41,12 +25,13 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
 
     @Test
     public void sendIn() throws Exception {
-        result.expectedMessageCount(1000);
+        result.expectedMessageCount(23);
 
-        for (int i = 0; i < 1000; i++) {
+        for (int i = 0; i < 23; i++) {
             template.sendBody("direct:stream1", "Andrea\n");
         }
 
+        Thread.sleep(11000);
         assertMockEndpointsSatisfied();
 
         Exchange ex = template.request("direct:listObjects", new Processor() {
@@ -57,9 +42,11 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
             }
         });
 
-        Thread.sleep(30000);
         List<S3Object> resp = ex.getMessage().getBody(List.class);
-        assertEquals(40, resp.size());
+        assertEquals(1, resp.size());
+        for (S3Object s3Object : resp) {
+            System.err.println(s3Object.key());
+        }
     }
 
     @Override