You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/04/06 06:40:03 UTC
[camel] 02/09: CAMEL-16185 - AWS S3: improve multipart support -
streaming upload
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 844da25b4ebf690d2625a543dd154eb9fdabc37d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Sun Apr 4 16:37:24 2021 +0200
CAMEL-16185 - AWS S3: improve multipart support - streaming upload
---
.../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 89 ++++++++++++++--------
.../S3StreamUploadOperationLocalstackTest.java | 3 +
...va => S3StreamUploadTimeoutLocalstackTest.java} | 29 ++-----
3 files changed, 68 insertions(+), 53 deletions(-)
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 32c6f28..f4d544d 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -66,9 +66,9 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
AtomicInteger part = new AtomicInteger();
UUID id;
String dynamicKeyName;
+ CompleteMultipartUploadResponse uploadResult;
private transient String s3ProducerToString;
private ScheduledExecutorService timeoutCheckerExecutorService;
- private boolean timeout;
@Override
protected void doStart() throws Exception {
@@ -76,7 +76,21 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
timeoutCheckerExecutorService
= getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
"timeout_checker");
- timeoutCheckerExecutorService.scheduleAtFixedRate(new AggregationIntervalTask(), 1, 1, TimeUnit.SECONDS);
+ timeoutCheckerExecutorService.scheduleAtFixedRate(new AggregationIntervalTask(), 10, 10, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (ObjectHelper.isNotEmpty(initResponse)) {
+ if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
+ if (index.get() > 0) {
+ uploadPart();
+ completeUpload();
+ }
+ }
+ }
+ super.doStop();
+
}
/**
@@ -86,8 +100,14 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
@Override
public void run() {
- timeout = true;
- LOG.info("timeout triggered");
+ if (ObjectHelper.isNotEmpty(initResponse)) {
+ if (ObjectHelper.isNotEmpty(initResponse.uploadId())) {
+ if (index.get() > 0) {
+ uploadPart();
+ completeUpload();
+ }
+ }
+ }
}
}
@@ -157,41 +177,15 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
try {
if (buffer.size() >= getConfiguration().getBatchSize()
|| index.get() == getConfiguration().getBatchMessageNumber()) {
- LOG.info("Timeout " + timeout);
-
- UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
- .key(dynamicKeyName).uploadId(initResponse.uploadId())
- .partNumber(index.get()).build();
- LOG.trace("Uploading part {} at index {} for {}", part, index, keyName);
+ uploadPart();
+ completeUpload();
- String etag = getEndpoint().getS3Client()
- .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
- CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build();
- completedParts.add(partUpload);
- buffer.reset();
- part.getAndIncrement();
- }
-
- if (index.get() == getConfiguration().getBatchMessageNumber() || timeout) {
- CompletedMultipartUpload completeMultipartUpload
- = CompletedMultipartUpload.builder().parts(completedParts).build();
- CompleteMultipartUploadRequest compRequest
- = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
- .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
- .uploadId(initResponse.uploadId())
- .build();
-
- CompleteMultipartUploadResponse uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
- LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(),
- index);
Message message = getMessageForResponse(exchange);
message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
if (uploadResult.versionId() != null) {
message.setHeader(AWS2S3Constants.VERSION_ID, uploadResult.versionId());
}
- timeout = false;
- index.getAndSet(0);
}
} catch (Exception e) {
@@ -204,6 +198,37 @@ public class AWS2S3StreamUploadProducer extends DefaultProducer {
index.getAndIncrement();
}
+ private void completeUpload() {
+ CompletedMultipartUpload completeMultipartUpload
+ = CompletedMultipartUpload.builder().parts(completedParts).build();
+ CompleteMultipartUploadRequest compRequest
+ = CompleteMultipartUploadRequest.builder().multipartUpload(completeMultipartUpload)
+ .bucket(getConfiguration().getBucketName()).key(dynamicKeyName)
+ .uploadId(initResponse.uploadId())
+ .build();
+
+ uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
+ LOG.info("Completed upload for the part {} with etag {} at index {}", part, uploadResult.eTag(),
+ index);
+
+ index.getAndSet(0);
+ }
+
+ private void uploadPart() {
+ UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
+ .key(dynamicKeyName).uploadId(initResponse.uploadId())
+ .partNumber(index.get()).build();
+
+ LOG.trace("Uploading part {} at index {} for {}", part, index, getConfiguration().getKeyName());
+
+ String etag = getEndpoint().getS3Client()
+ .uploadPart(uploadRequest, RequestBody.fromBytes(buffer.toByteArray())).eTag();
+ CompletedPart partUpload = CompletedPart.builder().partNumber(index.get()).eTag(etag).build();
+ completedParts.add(partUpload);
+ buffer.reset();
+ part.getAndIncrement();
+ }
+
private String fileNameToUpload(
String fileName, AWSS3NamingStrategyEnum strategy, String ext, AtomicInteger part, UUID id) {
String dynamicKeyName;
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
index 6a37b9ad..0db9740 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
@@ -60,6 +60,9 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
Thread.sleep(30000);
List<S3Object> resp = ex.getMessage().getBody(List.class);
assertEquals(40, resp.size());
+ for (S3Object s3Object : resp) {
+ System.err.println(s3Object.key());
+ }
}
@Override
diff --git a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
similarity index 65%
copy from components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
copy to components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
index 6a37b9ad..f42a153 100644
--- a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
+++ b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadTimeoutLocalstackTest.java
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.camel.component.aws2.s3.localstack;
import java.util.List;
@@ -31,7 +15,7 @@ import software.amazon.awssdk.services.s3.model.S3Object;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
+public class S3StreamUploadTimeoutLocalstackTest extends Aws2S3BaseTest {
@EndpointInject
private ProducerTemplate template;
@@ -41,12 +25,13 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
@Test
public void sendIn() throws Exception {
- result.expectedMessageCount(1000);
+ result.expectedMessageCount(23);
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 23; i++) {
template.sendBody("direct:stream1", "Andrea\n");
}
+ Thread.sleep(11000);
assertMockEndpointsSatisfied();
Exchange ex = template.request("direct:listObjects", new Processor() {
@@ -57,9 +42,11 @@ public class S3StreamUploadOperationLocalstackTest extends Aws2S3BaseTest {
}
});
- Thread.sleep(30000);
List<S3Object> resp = ex.getMessage().getBody(List.class);
- assertEquals(40, resp.size());
+ assertEquals(1, resp.size());
+ for (S3Object s3Object : resp) {
+ System.err.println(s3Object.key());
+ }
}
@Override