You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/31 18:26:57 UTC
[camel] branch main updated: CAMEL-17027 make camel-aws-s3 threadsafe so it can be used concurrently (#11255)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new df3e080a138 CAMEL-17027 make camel-aws-s3 threadsafe so it can be used concurrently (#11255)
df3e080a138 is described below
commit df3e080a13814f530488557a7737eef90f840e6a
Author: Jono Morris <jo...@xtra.co.nz>
AuthorDate: Fri Sep 1 06:26:50 2023 +1200
CAMEL-17027 make camel-aws-s3 threadsafe so it can be used concurrently (#11255)
---
.../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 153 +++++++++++++--------
...IT.java => S3StreamUploadMultipartAsyncIT.java} | 50 ++++---
...IT.java => S3StreamUploadOperationAsyncIT.java} | 33 ++---
.../s3/integration/S3StreamUploadTimeoutIT.java | 29 ++--
4 files changed, 165 insertions(+), 100 deletions(-)
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index e491479f961..e0a9e69beb2 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -61,14 +61,9 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(AWS2S3StreamUploadProducer.class);
private static final String TIMEOUT_CHECKER_EXECUTOR_NAME = "S3_Streaming_Upload_Timeout_Checker";
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- CreateMultipartUploadResponse initResponse;
- AtomicInteger index = new AtomicInteger(1);
- List<CompletedPart> completedParts;
- AtomicInteger part = new AtomicInteger();
- UUID id;
- String dynamicKeyName;
- CompleteMultipartUploadResponse uploadResult;
+ private AtomicInteger part = new AtomicInteger();
+ private UploadState uploadAggregate = null;
+ private final Object lock = new Object();
private transient String s3ProducerToString;
private ScheduledExecutorService timeoutCheckerExecutorService;
@@ -94,12 +89,10 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
@Override
protected void doStop() throws Exception {
- if (ObjectHelper.isNotEmpty(initResponse)) {
- if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
- if (index.get() > 0) {
- uploadPart();
- completeUpload();
- }
+ synchronized (lock) {
+ if (ObjectHelper.isNotEmpty(uploadAggregate)) {
+ uploadPart(uploadAggregate);
+ completeUpload(uploadAggregate);
}
}
if (timeoutCheckerExecutorService != null) {
@@ -117,12 +110,11 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
@Override
public void run() {
- if (ObjectHelper.isNotEmpty(initResponse)) {
- if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
- if (index.get() > 0) {
- uploadPart();
- completeUpload();
- }
+ synchronized (lock) {
+ if (ObjectHelper.isNotEmpty(uploadAggregate)) {
+ uploadPart(uploadAggregate);
+ completeUpload(uploadAggregate);
+ uploadAggregate = null;
}
}
}
@@ -132,21 +124,52 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
public void process(final Exchange exchange) throws Exception {
InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);
+ UploadState state = null;
int totalSize = 0;
byte[] b;
while ((b = AWS2S3Utils.toByteArray(is, getConfiguration().getBufferSize())).length > 0) {
totalSize += b.length;
- buffer.write(b);
+ synchronized (lock) {
+ // aggregate with previously received exchanges
+ if (ObjectHelper.isNotEmpty(uploadAggregate)) {
+ uploadAggregate.buffer.write(b);
+ uploadAggregate.index++;
+
+ if (uploadAggregate.buffer.size() >= getConfiguration().getBatchSize()
+ || uploadAggregate.index == getConfiguration().getBatchMessageNumber()) {
+
+ uploadPart(uploadAggregate);
+ CompleteMultipartUploadResponse uploadResult = completeUpload(uploadAggregate);
+ this.uploadAggregate = null;
+
+ Message message = getMessageForResponse(exchange);
+ message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
+ if (uploadResult.versionId() != null) {
+ message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
+ }
+ }
+ continue;
+ }
+ }
+
+ if (state == null) {
+ state = new UploadState();
+ } else {
+ state.index++;
+ }
+ state.buffer.write(b);
final String keyName = getConfiguration().getKeyName();
final String fileName = AWS2S3Utils.determineFileName(keyName);
final String extension = AWS2S3Utils.determineFileExtension(keyName);
- if (index.get() == 1 && getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) {
- id = UUID.randomUUID();
+ if (state.index == 1 && getConfiguration().getNamingStrategy().equals(AWSS3NamingStrategyEnum.random)) {
+ state.id = UUID.randomUUID();
}
- dynamicKeyName = fileNameToUpload(fileName, getConfiguration().getNamingStrategy(), extension, part, id);
+ state.dynamicKeyName = fileNameToUpload(fileName, getConfiguration().getNamingStrategy(), extension,
+ state.part, state.id);
CreateMultipartUploadRequest.Builder createMultipartUploadRequest
- = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName()).key(dynamicKeyName);
+ = CreateMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
+ .key(state.dynamicKeyName);
String storageClass = AWS2S3Utils.determineStorageClass(exchange, getConfiguration());
if (storageClass != null) {
@@ -170,80 +193,85 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
AWS2S3Utils.setEncryption(createMultipartUploadRequest, getConfiguration());
LOG.trace("Initiating multipart upload [{}] from exchange [{}]...", createMultipartUploadRequest, exchange);
- if (index.get() == 1) {
- initResponse
+ if (state.index == 1) {
+ state.initResponse
= getEndpoint().getS3Client().createMultipartUpload(createMultipartUploadRequest.build());
- completedParts = new ArrayList<>();
}
try {
if (totalSize >= getConfiguration().getBatchSize()
- || buffer.size() >= getConfiguration().getBatchSize()
- || index.get() == getConfiguration().getBatchMessageNumber()) {
+ || state.buffer.size() >= getConfiguration().getBatchSize()
+ || state.index == getConfiguration().getBatchMessageNumber()) {
- uploadPart();
- completeUpload();
+ uploadPart(state);
+ CompleteMultipartUploadResponse uploadResult = completeUpload(state);
Message message = getMessageForResponse(exchange);
message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
if (uploadResult.versionId() != null) {
message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
}
+ state = null;
}
-
+
} catch (Exception e) {
getEndpoint().getS3Client()
.abortMultipartUpload(AbortMultipartUploadRequest.builder().bucket(getConfiguration().getBucketName())
- .key(dynamicKeyName).uploadId(initResponse.uploadId()).build());
+ .key(state.dynamicKeyName).uploadId(state.initResponse.uploadId()).build());
throw e;
}
+ }
- index.getAndIncrement();
+ if (ObjectHelper.isNotEmpty(state)) {
+ // exchange wasn't large enough to send, batch it with subsequent exchanges.
+ synchronized (lock) {
+ if (this.uploadAggregate == null) {
+ this.uploadAggregate = state;
+ }
+ }
}
}
- private void completeUpload() {
+ private CompleteMultipartUploadResponse completeUpload(UploadState state) {
CompletedMultipartUpload completeMultipartUpload
- = CompletedMultipartUpload.builder().parts(completedParts).build();
+ = CompletedMultipartUpload.builder().parts(state.completedParts).build();
CompleteMultipartUploadRequest compRequest
= CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
- .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
- .uploadId(initResponse.uploadId())
+ .bucket(getConfiguration().getBucketName()).key(state.dynamicKeyName)
+ .uploadId(state.initResponse.uploadId())
.build();
- uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+ CompleteMultipartUploadResponse uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
// Converting the index to String can cause extra overhead
if (LOG.isInfoEnabled()) {
LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(),
- index);
+ state.index);
}
-
- index.getAndSet(0);
- initResponse = null;
+ return uploadResult;
}
- private void uploadPart() {
+ private void uploadPart(UploadState state) {
UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
- .key(dynamicKeyName).uploadId(initResponse.uploadId())
- .partNumber(index.get()).build();
+ .key(state.dynamicKeyName).uploadId(state.initResponse.uploadId())
+ .partNumber(state.index).build();
- LOG.trace("Uploading part {} at index {} for {}", part, index, getConfiguration().getKeyName());
+ LOG.trace("Uploading part {} at index {} for {}", state.part, state.index, getConfiguration().getKeyName());
String etag = getEndpoint().getS3Client()
- .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
- CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build();
- completedParts.add(partUpload);
- buffer.reset();
+ .uploadPart(uploadRequest, RequestBody.fromBytes(state.buffer.toByteArray())).eTag();
+ CompletedPart partUpload = CompletedPart.builder().partNumber(state.index).eTag(etag).build();
+ state.completedParts.add(partUpload);
+ state.buffer.reset();
part.getAndIncrement();
}
private String fileNameToUpload(
- String fileName, AWSS3NamingStrategyEnum strategy, String ext, AtomicInteger part, UUID id) {
+ String fileName, AWSS3NamingStrategyEnum strategy, String ext, int part, UUID id) {
String dynamicKeyName;
switch (strategy) {
case progressive:
- if (part.get() > 0) {
+ if (part > 0) {
if (ObjectHelper.isNotEmpty(ext)) {
dynamicKeyName = fileName + "-" + part + ext;
} else {
@@ -258,7 +286,7 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
}
break;
case random:
- if (part.get() > 0) {
+ if (part > 0) {
if (ObjectHelper.isNotEmpty(ext)) {
dynamicKeyName = fileName + "-" + id.toString() + ext;
} else {
@@ -327,4 +355,19 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
return exchange.getMessage();
}
+ private class UploadState {
+ int index;
+ int part;
+ List<CompletedPart> completedParts = new ArrayList<>();
+ ByteArrayOutputStream buffer;
+ String dynamicKeyName;
+ UUID id;
+ CreateMultipartUploadResponse initResponse;
+
+ UploadState() {
+ index = 1;
+ part = AWS2S3StreamUploadProducer.this.part.get();
+ buffer = new ByteArrayOutputStream();
+ }
+ }
}
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartAsyncIT.java
similarity index 58%
copy from components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
copy to components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartAsyncIT.java
index ddabcc00f64..27714d37710 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadMultipartAsyncIT.java
@@ -16,23 +16,28 @@
*/
package org.apache.camel.component.aws2.s3.integration;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.s3.AWS2S3Constants;
import org.apache.camel.component.aws2.s3.AWS2S3Operations;
import org.apache.camel.component.mock.MockEndpoint;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.S3Object;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class S3StreamUploadTimeoutIT extends Aws2S3Base {
+public class S3StreamUploadMultipartAsyncIT extends Aws2S3Base {
@EndpointInject
private ProducerTemplate template;
@@ -42,26 +47,37 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
@Test
public void sendIn() throws Exception {
- result.expectedMessageCount(23);
+ result.expectedMessageCount(10);
+ for (int i = 0; i < 10; i++) {
- for (int i = 0; i < 23; i++) {
- template.sendBody("direct:stream1", "Andrea\n");
+ final CompletableFuture<Exchange> future = template.asyncSend("direct:stream1", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(AWS2S3Constants.KEY, "empty.bin");
+ exchange.getIn().setBody(new File("src/test/resources/empty.bin"));
+ }
+ });
+
+ assertDoesNotThrow(() -> future.get(5, TimeUnit.SECONDS));
}
- Awaitility.await().atMost(11, TimeUnit.SECONDS)
- .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
+ MockEndpoint.assertIsSatisfied(context, 10, TimeUnit.SECONDS);
- Awaitility.await().atMost(11, TimeUnit.SECONDS)
- .untilAsserted(() -> {
- Exchange ex = template.request("direct:listObjects", this::process);
+ Exchange ex = template.request("direct:listObjects", new Processor() {
- List<S3Object> resp = ex.getMessage().getBody(List.class);
- assertEquals(1, resp.size());
- });
- }
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+ }
+ });
+
+ // file size: 5,242,880 bytes, 10 * (5 chunks of 1,000,000 + remainder of 242,880)
+ List<S3Object> resp = ex.getMessage().getBody(List.class);
+ assertEquals(60, resp.size());
- private void process(Exchange exchange) {
- exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+ assertEquals(10 * Files.size(Paths.get("src/test/resources/empty.bin")),
+ resp.stream().mapToLong(S3Object::size).sum());
}
@Override
@@ -70,7 +86,7 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
@Override
public void configure() {
String awsEndpoint1
- = "aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random&streamingUploadTimeout=10000";
+ = "aws2-s3://mycamel-1?autoCreateBucket=true&streamingUploadMode=true&keyName=fileTest.txt&batchMessageNumber=25&namingStrategy=random";
from("direct:stream1").to(awsEndpoint1).to("mock:result");
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationAsyncIT.java
similarity index 70%
copy from components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
copy to components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationAsyncIT.java
index ddabcc00f64..5dcb9f68480 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadOperationAsyncIT.java
@@ -17,22 +17,24 @@
package org.apache.camel.component.aws2.s3.integration;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.s3.AWS2S3Constants;
import org.apache.camel.component.aws2.s3.AWS2S3Operations;
import org.apache.camel.component.mock.MockEndpoint;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.S3Object;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class S3StreamUploadTimeoutIT extends Aws2S3Base {
+public class S3StreamUploadOperationAsyncIT extends Aws2S3Base {
@EndpointInject
private ProducerTemplate template;
@@ -42,26 +44,25 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
@Test
public void sendIn() throws Exception {
- result.expectedMessageCount(23);
+ result.expectedMessageCount(1000);
- for (int i = 0; i < 23; i++) {
- template.sendBody("direct:stream1", "Andrea\n");
+ for (int i = 0; i < 1000; i++) {
+ final CompletableFuture<Object> future = template.asyncSendBody("direct:stream1", "Andrea\n");
+ assertDoesNotThrow(() -> future.get(5, TimeUnit.SECONDS));
}
- Awaitility.await().atMost(11, TimeUnit.SECONDS)
- .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
+ MockEndpoint.assertIsSatisfied(context, 10, TimeUnit.SECONDS);
- Awaitility.await().atMost(11, TimeUnit.SECONDS)
- .untilAsserted(() -> {
- Exchange ex = template.request("direct:listObjects", this::process);
+ Exchange ex = template.request("direct:listObjects", new Processor() {
- List<S3Object> resp = ex.getMessage().getBody(List.class);
- assertEquals(1, resp.size());
- });
- }
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+ }
+ });
- private void process(Exchange exchange) {
- exchange.getIn().setHeader(AWS2S3Constants.S3_OPERATION, AWS2S3Operations.listObjects);
+ List<S3Object> resp = ex.getMessage().getBody(List.class);
+ assertEquals(40, resp.size());
}
@Override
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
index ddabcc00f64..f1eef645c19 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3StreamUploadTimeoutIT.java
@@ -42,22 +42,27 @@ public class S3StreamUploadTimeoutIT extends Aws2S3Base {
@Test
public void sendIn() throws Exception {
- result.expectedMessageCount(23);
- for (int i = 0; i < 23; i++) {
- template.sendBody("direct:stream1", "Andrea\n");
- }
+ for(int i = 1; i <= 2; i++) {
+ int count = i * 23;
+
+ result.expectedMessageCount(count);
+
+ for (int j = 0; j < 23; j++) {
+ template.sendBody("direct:stream1", "Andrea\n");
+ }
- Awaitility.await().atMost(11, TimeUnit.SECONDS)
- .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
+ Awaitility.await().atMost(11, TimeUnit.SECONDS)
+ .untilAsserted(() -> MockEndpoint.assertIsSatisfied(context));
- Awaitility.await().atMost(11, TimeUnit.SECONDS)
- .untilAsserted(() -> {
- Exchange ex = template.request("direct:listObjects", this::process);
+ Awaitility.await().atMost(11, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ Exchange ex = template.request("direct:listObjects", this::process);
- List<S3Object> resp = ex.getMessage().getBody(List.class);
- assertEquals(1, resp.size());
- });
+ List<S3Object> resp = ex.getMessage().getBody(List.class);
+ assertEquals(1, resp.size());
+ });
+ }
}
private void process(Exchange exchange) {