You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/31 18:26:57 UTC

[camel] branch main updated: CAMEL-17027 make camel-aws-s3 threadsafe so it can be used concurrently (#11255)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new df3e080a138 CAMEL-17027 make camel-aws-s3 threadsafe so it can be used concurrently (#11255)
df3e080a138 is described below

commit df3e080a13814f530488557a7737eef90f840e6a
Author: Jono Morris <jo...@xtra.co.nz>
AuthorDate: Fri Sep 1 06:26:50 2023 +1200

    CAMEL-17027 make camel-aws-s3 threadsafe so it can be used concurrently (#11255)
---
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 153 +++++++++++++--------
 ...IT.java => S3StreamUploadMultipartAsyncIT.java} |  50 ++++---
 ...IT.java => S3StreamUploadOperationAsyncIT.java} |  33 ++---
 .../s3/integration/S3StreamUploadTimeoutIT.java    |  29 ++--
 4 files changed, 165 insertions(+), 100 deletions(-)

diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index e491479f961..e0a9e69beb2 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -61,14 +61,9 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(AWS2S3StreamUploadProducer.class);
     private static final String TIMEOUT_CHECKER_EXECUTOR_NAME = "S3_Streaming_Upload_Timeout_Checker";
-    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-    CreateMultipartUploadResponse initResponse;
-    AtomicInteger index = new AtomicInteger(1);
-    List<CompletedPart> completedParts;
-    AtomicInteger part = new AtomicInteger();
-    UUID id;
-    String dynamicKeyName;
-    CompleteMultipartUploadResponse uploadResult;
+    private AtomicInteger part = new AtomicInteger();
+    private UploadState uploadAggregate = null;
+    private final Object lock = new Object();
     private transient String s3ProducerToString;
     private ScheduledExecutorService timeoutCheckerExecutorService;
 
@@ -94,12 +89,10 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
-        if (ObjectHelper.isNotEmpty(initResponse)) {
-            if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
-                if (index.get() > 0) {
-                    uploadPart();
-                    completeUpload();
-                }
+        synchronized (lock) {
+            if (ObjectHelper.isNotEmpty(uploadAggregate)) {
+                uploadPart(uploadAggregate);
+                completeUpload(uploadAggregate);
             }
         }
         if (timeoutCheckerExecutorService != null) {
@@ -117,12 +110,11 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
 
         @Override
         public void run() {
-            if (ObjectHelper.isNotEmpty(initResponse)) {
-                if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
-                    if (index.get() > 0) {
-                        uploadPart();
-                        completeUpload();
-                    }
+            synchronized (lock) {
+                if (ObjectHelper.isNotEmpty(uploadAggregate)) {
+                    uploadPart(uploadAggregate);
+                    completeUpload(uploadAggregate);
+                    uploadAggregate = null;
                 }
             }
         }
@@ -132,21 +124,52 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
     public void process(final Exchange exchange) throws Exception {
         InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);
 
+        UploadState state = null;
         int totalSize = 0;
         byte[] b;
         while ((b = AWS2S3Utils.toByteArray(is, getConfiguration().getBufferSize())).length > 0) {
             totalSize += b.length;
-            buffer.write(b);
+            synchronized (lock) {
+                // aggregate with previously received exchanges
+                if (ObjectHelper.isNotEmpty(uploadAggregate)) {
+                    uploadAggregate.buffer.write(b);
+                    uploadAggregate.index++;
+
+                    if (uploadAggregate.buffer.size() >= getConfiguration().getBatchSize()
+                            || uploadAggregate.index == getConfiguration().getBatchMessageNumber()) {
+
+                        uploadPart(uploadAggregate);
+                        CompleteMultipartUploadResponse uploadResult = completeUpload(uploadAggregate);
+                        this.uploadAggregate = null;
+
+                        Message message = getMessageForResponse(exchange);
+                        message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
+                        if (uploadResult.versionId() != null) {
+                            message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
+                        }
+                    }
+                    continue;
+                }
+            }
+
+            if (state == null) {
+                state = new UploadState();
+            } else {
+                state.index++;
+            }
+            state.buffer.write(b);
 
             final String keyName = getConfiguration().getKeyName();
             final String fileName = AWS2S3Utils.determineFileName(keyName);
             final String extension = AWS2S3Utils.determineFileExtension(keyName);
-            if (index.get() == 1 && getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) {
-                id = UUID.randomUUID();
+            if (state.index == 1 && getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) {
+                state.id = UUID.randomUUID();
             }
-            dynamicKeyName = fileNameToUpload(fileName, getConfiguration().getNamingStrategy(), extension, part, id);
+            state.dynamicKeyName = fileNameToUpload(fileName, getConfiguration().getNamingStrategy(), extension,
+                    state.part, state.id);
             CreateMultipartUploadRequest.Builder createMultipartUploadRequest
-                    = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName);
+                    = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
+                            .key(state.dynamicKeyName);
 
             String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration());
             if (storageClass != null) {
@@ -170,80 +193,85 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
             AWS2S3Utils.setEncryption(createMultipartUploadRequest, getConfiguration());
 
             LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange);
-            if (index.get() == 1) {
-                initResponse
+            if (state.index == 1) {
+                state.initResponse
                         = getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
-                completedParts = new ArrayList<>();
             }
 
             try {
                 if (totalSize >= getConfiguration().getBatchSize()
-                        || buffer.size() >= getConfiguration().getBatchSize()
-                        || index.get() == getConfiguration().getBatchMessageNumber()) {
+                        || state.buffer.size() >= getConfiguration().getBatchSize()
+                        || state.index == getConfiguration().getBatchMessageNumber()) {
 
-                    uploadPart();
-                    completeUpload();
+                    uploadPart(state);
+                    CompleteMultipartUploadResponse uploadResult = completeUpload(state);
 
                     Message message = getMessageForResponse(exchange);
                     message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
                     if (uploadResult.versionId() != null) {
                         message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
                     }
+                    state = null;
                 }
-
+                
             } catch (Exception e) {
                 getEndpoint().getS3Client()
                         .abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
-                                .key(dynamicKeyName).uploadId(initResponse.uploadId()).build());
+                                .key(state.dynamicKeyName).uploadId(state.initResponse.uploadId()).build());
                 throw e;
             }
+        }
 
-            index.getAndIncrement();
+        if (ObjectHelper.isNotEmpty(state)) {
+            // exchange wasn't large enough to send, batch it with subsequent exchanges.
+            synchronized (lock) {
+                if (this.uploadAggregate == null) {
+                    this.uploadAggregate = state;
+                }
+            }
         }
     }
 
-    private void completeUpload() {
+    private CompleteMultipartUploadResponse completeUpload(UploadState state) {
         CompletedMultipartUpload completeMultipartUpload
-                = CompletedMultipartUpload.builder().parts(completedParts).build();
+                = CompletedMultipartUpload.builder().parts(state.completedParts).build();
         CompleteMultipartUploadRequest compRequest
                 = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
-                        .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
-                        .uploadId(initResponse.uploadId())
+                        .bucket(getConfiguration().getBucketName()).key(state.dynamicKeyName)
+                        .uploadId(state.initResponse.uploadId())
                         .build();
 
-        uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+        CompleteMultipartUploadResponse uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
 
         // Converting the index to String can cause extra overhead
         if (LOG.isInfoEnabled()) {
             LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(),
-                    index);
+                    state.index);
         }
-
-        index.getAndSet(0);
-        initResponse = null;
+        return uploadResult;
     }
 
-    private void uploadPart() {
+    private void uploadPart(UploadState state) {
         UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
-                .key(dynamicKeyName).uploadId(initResponse.uploadId())
-                .partNumber(index.get()).build();
+                .key(state.dynamicKeyName).uploadId(state.initResponse.uploadId())
+                .partNumber(state.index).build();
 
-        LOG.trace("Uploading part {} at index {} for {}", part, index, getConfiguration().getKeyName());
+        LOG.trace("Uploading part {} at index {} for {}", state.part, state.index, getConfiguration().getKeyName());
 
         String etag = getEndpoint().getS3Client()
-                .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
-        CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build();
-        completedParts.add(partUpload);
-        buffer.reset();
+                .uploadPart(uploadRequest, RequestBody.fromBytes(state.buffer.toByteArray())).eTag();
+        CompletedPart partUpload = CompletedPart.builder().partNumber(state.index).eTag(etag).build();
+        state.completedParts.add(partUpload);
+        state.buffer.reset();
         part.getAndIncrement();
     }
 
     private String fileNameToUpload(
-            String fileName, AWSS3NamingStrategyEnum strategy, String ext, AtomicInteger part, UUID id) {
+            String fileName, AWSS3NamingStrategyEnum strategy, String ext, int part, UUID id) {
         String dynamicKeyName;
         switch (strategy) {
             case progressive:
-                if (part.get() > 0) {
+                if (part > 0) {
                     if (ObjectHelper.isNotEmpty(ext)) {
                         dynamicKeyName = fileName + "-" + part + ext;
                     } else {
@@ -258,7 +286,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
                 }
                 break;
             case random:
-                if (part.get() > 0) {
+                if (part > 0) {
                     if (ObjectHelper.isNotEmpty(ext)) {
                         dynamicKeyName = fileName + "-" + id.toString() + ext;
                     } else {
@@ -327,4 +355,19 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
         return exchange.getMessage();
     }
 
+    private class UploadState {
+        int index;
+        int part;
+        List<CompletedPart> completedParts = new ArrayList<>();
+        ByteArrayOutputStream buffer;
+        String dynamicKeyName;
+        UUID id;
+        CreateMultipartUploadResponse initResponse;
+
+        UploadState() {
+            index = 1;
+            part = AWS2S3StreamUploadProducer.this.part.get();
+            buffer = new ByteArrayOutputStream();
+        }
+    }
 }
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartAsyncIT.java
similarity index 58%
copy from components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
copy to components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartAsyncIT.java
index ddabcc00f64..27714d37710 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartAsyncIT.java
@@ -16,23 +16,28 @@
  */
 package org.apache.camel.component.aws2.s3.integration;
 
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.s3.AWS2S3Constants;
 import org.apache.camel.component.aws2.s3.AWS2S3Operations;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class S3StreamUploadTimeoutIT extends Aws2S3Base {
+public class S3StreamUploadMultipartAsyncIT extends Aws2S3Base {
 
     @EndpointInject
     private ProducerTemplate template;
@@ -42,26 +47,37 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
 
     @Test
     public void sendIn() throws Exception {
-        result.expectedMessageCount(23);
+        result.expectedMessageCount(10);
+        for (int i = 0; i < 10; i++) {
 
-        for (int i = 0; i < 23; i++) {
-            template.sendBody("direct:stream1", "Andrea\n");
+            final CompletableFuture<Exchange> future = template.asyncSend("direct:stream1", new Processor() {
+
+                @Override
+                public void process(Exchange exchange) {
+                    exchange.getIn().setHeader(AWS2S3Constants.KEY, "empty.bin");
+                    exchange.getIn().setBody(new File("src/test/resources/empty.bin"));
+                }
+            });
+
+            assertDoesNotThrow(() -> future.get(5, TimeUnit.SECONDS));
         }
 
-        Awaitility.await().atMost(11, TimeUnit.SECONDS)
-                .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
+        MockEndpoint.assertIsSatisfied(context, 10, TimeUnit.SECONDS);
 
-        Awaitility.await().atMost(11, TimeUnit.SECONDS)
-                .untilAsserted(() -> {
-                    Exchange ex = template.request("direct:listObjects", this::process);
+        Exchange ex = template.request("direct:listObjects", new Processor() {
 
-                    List<S3Object> resp = ex.getMessage().getBody(List.class);
-                    assertEquals(1, resp.size());
-                });
-    }
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+            }
+        });
+
+        // file size: 5,242,880 bytes, 10 * (5 chunks of 1,000,000 + remainder of 242,880)
+        List<S3Object> resp = ex.getMessage().getBody(List.class);
+        assertEquals(60, resp.size());
 
-    private void process(Exchange exchange) {
-        exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+        assertEquals(10 * Files.size(Paths.get("src/test/resources/empty.bin")),
+                resp.stream().mapToLong(S3Object::size).sum());
     }
 
     @Override
@@ -70,7 +86,7 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
             @Override
             public void configure() {
                 String awsEndpoint1
-                        = "aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random&streamingUploadTimeout=10000";
+                        = "aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random";
 
                 from("direct:stream1").to(awsEndpoint1).to("mock:result");
 
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationAsyncIT.java
similarity index 70%
copy from components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
copy to components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationAsyncIT.java
index ddabcc00f64..5dcb9f68480 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationAsyncIT.java
@@ -17,22 +17,24 @@
 package org.apache.camel.component.aws2.s3.integration;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.s3.AWS2S3Constants;
 import org.apache.camel.component.aws2.s3.AWS2S3Operations;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class S3StreamUploadTimeoutIT extends Aws2S3Base {
+public class S3StreamUploadOperationAsyncIT extends Aws2S3Base {
 
     @EndpointInject
     private ProducerTemplate template;
@@ -42,26 +44,25 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
 
     @Test
     public void sendIn() throws Exception {
-        result.expectedMessageCount(23);
+        result.expectedMessageCount(1000);
 
-        for (int i = 0; i < 23; i++) {
-            template.sendBody("direct:stream1", "Andrea\n");
+        for (int i = 0; i < 1000; i++) {
+            final CompletableFuture<Object> future = template.asyncSendBody("direct:stream1", "Andrea\n");
+            assertDoesNotThrow(() -> future.get(5, TimeUnit.SECONDS));
         }
 
-        Awaitility.await().atMost(11, TimeUnit.SECONDS)
-                .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
+        MockEndpoint.assertIsSatisfied(context, 10, TimeUnit.SECONDS);
 
-        Awaitility.await().atMost(11, TimeUnit.SECONDS)
-                .untilAsserted(() -> {
-                    Exchange ex = template.request("direct:listObjects", this::process);
+        Exchange ex = template.request("direct:listObjects", new Processor() {
 
-                    List<S3Object> resp = ex.getMessage().getBody(List.class);
-                    assertEquals(1, resp.size());
-                });
-    }
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+            }
+        });
 
-    private void process(Exchange exchange) {
-        exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+        List<S3Object> resp = ex.getMessage().getBody(List.class);
+        assertEquals(40, resp.size());
     }
 
     @Override
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
index ddabcc00f64..f1eef645c19 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
@@ -42,22 +42,27 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
 
     @Test
     public void sendIn() throws Exception {
-        result.expectedMessageCount(23);
 
-        for (int i = 0; i < 23; i++) {
-            template.sendBody("direct:stream1", "Andrea\n");
-        }
+        for(int i = 1; i <= 2; i++) {
+            int count = i * 23;
+            
+            result.expectedMessageCount(count);
+
+            for (int j = 0; j < 23; j++) {
+                template.sendBody("direct:stream1", "Andrea\n");
+            }
 
-        Awaitility.await().atMost(11, TimeUnit.SECONDS)
-                .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
+            Awaitility.await().atMost(11, TimeUnit.SECONDS)
+                    .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
 
-        Awaitility.await().atMost(11, TimeUnit.SECONDS)
-                .untilAsserted(() -> {
-                    Exchange ex = template.request("direct:listObjects", this::process);
+            Awaitility.await().atMost(11, TimeUnit.SECONDS)
+                    .untilAsserted(() -> {
+                        Exchange ex = template.request("direct:listObjects", this::process);
 
-                    List<S3Object> resp = ex.getMessage().getBody(List.class);
-                    assertEquals(1, resp.size());
-                });
+                        List<S3Object> resp = ex.getMessage().getBody(List.class);
+                        assertEquals(1, resp.size());
+                    });
+        }
     }
 
     private void process(Exchange exchange) {