You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/04/17 07:27:18 UTC
[camel] 02/06: CAMEL-14618 - Camel-aws-s3: Add an option to
consumer to be able to move the consumed files to another bucket, added test
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 359f01442438c5de5f89cd42364aa9e8329a3cbd
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Apr 17 08:41:49 2020 +0200
CAMEL-14618 - Camel-aws-s3: Add an option to consumer to be able to move the consumed files to another bucket, added test
---
.../camel/component/aws2/s3/AWS2S3Consumer.java | 19 +++++++----------
.../s3/integration/S3ConsumerIntegrationTest.java | 24 +++++++++++++++++++---
2 files changed, 28 insertions(+), 15 deletions(-)
diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
index 0fa3fda..b99fe70 100644
--- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
+++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
@@ -215,16 +215,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer {
*/
protected void processCommit(Exchange exchange) {
try {
- if (getConfiguration().isDeleteAfterRead()) {
- String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
- String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
-
- LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
-
- getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build());
-
- LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
- } else if (getConfiguration().isMoveAfterRead()) {
+ if (getConfiguration().isMoveAfterRead()) {
String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
@@ -233,7 +224,11 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer {
getAmazonS3Client().copyObject(CopyObjectRequest.builder().destinationKey(key).destinationBucket(getConfiguration().getDestinationBucket()).copySource(bucketName + "/" + key).build());
LOG.trace("Moved object from bucket {} with key {} to bucket {}...", bucketName, key, getConfiguration().getDestinationBucket());
-
+ }
+ if (getConfiguration().isDeleteAfterRead()) {
+ String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
+ String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
+
LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build());
@@ -241,7 +236,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer {
LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
}
} catch (AwsServiceException e) {
- getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e);
+ getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.", exchange, e);
}
}
diff --git a/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java
index 7a17893..b13669a 100644
--- a/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java
+++ b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java
@@ -46,7 +46,7 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport {
@Test
public void sendIn() throws Exception {
- result.expectedMessageCount(1);
+ result.expectedMessageCount(3);
template.send("direct:putObject", new Processor() {
@@ -56,8 +56,26 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport {
exchange.getIn().setBody("Test");
}
});
+
+ template.send("direct:putObject", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(AWS2S3Constants.KEY, "test1.txt");
+ exchange.getIn().setBody("Test1");
+ }
+ });
+
+ template.send("direct:putObject", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(AWS2S3Constants.KEY, "test2.txt");
+ exchange.getIn().setBody("Test2");
+ }
+ });
- Thread.sleep(5000);
+ Thread.sleep(10000);
assertMockEndpointsSatisfied();
}
@@ -70,7 +88,7 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport {
from("direct:putObject").startupOrder(1).to(awsEndpoint).to("mock:result");
- from("aws2-s3://mycamel?moveAfterRead=true&deleteAfterRead=false&destinationBucket=camel-kafka-connector").startupOrder(2).log("${header.key}");
+ from("aws2-s3://mycamel?moveAfterRead=true&destinationBucket=camel-kafka-connector").startupOrder(2).log("${body}");
}
};