You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/04/17 07:27:18 UTC

[camel] 02/06: CAMEL-14618 - Camel-aws-s3: Add an option to consumer to be able to move the consumed files to another bucket, added test

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 359f01442438c5de5f89cd42364aa9e8329a3cbd
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Apr 17 08:41:49 2020 +0200

    CAMEL-14618 - Camel-aws-s3: Add an option to consumer to be able to move the consumed files to another bucket, added test
---
 .../camel/component/aws2/s3/AWS2S3Consumer.java    | 19 +++++++----------
 .../s3/integration/S3ConsumerIntegrationTest.java  | 24 +++++++++++++++++++---
 2 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
index 0fa3fda..b99fe70 100644
--- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
+++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
@@ -215,16 +215,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer {
      */
     protected void processCommit(Exchange exchange) {
         try {
-            if (getConfiguration().isDeleteAfterRead()) {
-                String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
-                String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
-
-                LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
-
-                getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build());
-
-                LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
-            } else if (getConfiguration().isMoveAfterRead()) {
+            if (getConfiguration().isMoveAfterRead()) {
                 String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
                 String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
 
@@ -233,7 +224,11 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer {
                 getAmazonS3Client().copyObject(CopyObjectRequest.builder().destinationKey(key).destinationBucket(getConfiguration().getDestinationBucket()).copySource(bucketName + "/" + key).build());
 
                 LOG.trace("Moved object from bucket {} with key {} to bucket {}...", bucketName, key, getConfiguration().getDestinationBucket());
-                
+            }
+            if (getConfiguration().isDeleteAfterRead()) {
+                String bucketName = exchange.getIn().getHeader(AWS2S3Constants.BUCKET_NAME, String.class);
+                String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
+
                 LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
 
                 getAmazonS3Client().deleteObject(DeleteObjectRequest.builder().bucket(getConfiguration().getBucketName()).key(key).build());
@@ -241,7 +236,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer {
                 LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
             }
         } catch (AwsServiceException e) {
-            getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e);
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.", exchange, e);
         }
     }
 
diff --git a/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java
index 7a17893..b13669a 100644
--- a/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java
+++ b/components/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/integration/S3ConsumerIntegrationTest.java
@@ -46,7 +46,7 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport {
 
     @Test
     public void sendIn() throws Exception {
-        result.expectedMessageCount(1);
+        result.expectedMessageCount(3);
 
         template.send("direct:putObject", new Processor() {
 
@@ -56,8 +56,26 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport {
                 exchange.getIn().setBody("Test");
             }
         });
+        
+        template.send("direct:putObject", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(AWS2S3Constants.KEY, "test1.txt");
+                exchange.getIn().setBody("Test1");
+            }
+        });
+        
+        template.send("direct:putObject", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(AWS2S3Constants.KEY, "test2.txt");
+                exchange.getIn().setBody("Test2");
+            }
+        });
 
-        Thread.sleep(5000);
+        Thread.sleep(10000);
         assertMockEndpointsSatisfied();
     }
 
@@ -70,7 +88,7 @@ public class S3ConsumerIntegrationTest extends CamelTestSupport {
 
                 from("direct:putObject").startupOrder(1).to(awsEndpoint).to("mock:result");
 
-                from("aws2-s3://mycamel?moveAfterRead=true&deleteAfterRead=false&destinationBucket=camel-kafka-connector").startupOrder(2).log("${header.key}");
+                from("aws2-s3://mycamel?moveAfterRead=true&destinationBucket=camel-kafka-connector").startupOrder(2).log("${body}");
 
             }
         };