You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/28 09:05:34 UTC

[2/2] camel git commit: CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.

CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9216caf3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9216caf3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9216caf3

Branch: refs/heads/camel-2.16.x
Commit: 9216caf3d1983d1b661d27deaf633fcfc6a59c09
Parents: 4055428
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 28 09:02:28 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 28 09:05:07 2016 +0200

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Configuration.java |  7 +-
 .../camel/component/aws/s3/S3Consumer.java      | 72 +++++++++-----------
 2 files changed, 37 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9216caf3/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index 36bab52..97d4e40 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -176,7 +176,12 @@ public class S3Configuration implements Cloneable {
     }
 
     /**
-     * Delete objects from S3 after it has been retrieved.
+     * Delete objects from S3 after they have been retrieved.  The delete is only performed if the Exchange is committed.
+     * If a rollback occurs, the object is not deleted.
+     * <p/>
+     * If this option is false, then the same objects will be retrieve over and over again on the polls. Therefore you
+     * need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the
+     * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or only the {@link S3Constants#KEY} header.
      */
     public void setDeleteAfterRead(boolean deleteAfterRead) {
         this.deleteAfterRead = deleteAfterRead;

http://git-wip-us.apache.org/repos/asf/camel/blob/9216caf3/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index eab0508..5fb4936 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory;
 /**
  * A Consumer of messages from the Amazon Web Service Simple Storage Service
  * <a href="http://aws.amazon.com/s3/">AWS S3</a>
- * 
  */
 public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
-    private boolean filesConsumed;
     private transient String s3ConsumerToString;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
@@ -66,48 +64,40 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
         
-        if (filesConsumed) {
-            exchanges = new LinkedList<Exchange>();
+        if (fileName != null) {
+            LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+
+            S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+            exchanges = createExchanges(s3Object);
         } else {
-            if (fileName != null) {
-                LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
-    
-                S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
-                exchanges = createExchanges(s3Object);
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    filesConsumed = true;
-                }
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+            listObjectsRequest.setBucketName(bucketName);
+            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+            if (maxMessagesPerPoll > 0) {
+                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+            }
+            // if there was a marker from previous poll then use that to continue from where we left last time
+            if (marker != null) {
+                LOG.trace("Resuming from marker: {}", marker);
+                listObjectsRequest.setMarker(marker);
+            }
+
+            ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+            if (listObjects.isTruncated()) {
+                marker = listObjects.getNextMarker();
+                LOG.trace("Returned list is truncated, so setting next marker: {}", marker);
             } else {
-                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-            
-                ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-                listObjectsRequest.setBucketName(bucketName);
-                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-                if (maxMessagesPerPoll > 0) {
-                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-                }
-                if (marker != null && !getConfiguration().isDeleteAfterRead()) {
-                    listObjectsRequest.setMarker(marker);
-                }
-            
-                ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-                // we only setup the marker if the file is not deleted
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    // if the marker is truncated, the nextMarker should not be null
-                    if (listObjects.getNextMarker() != null) {
-                        marker = listObjects.getNextMarker();
-                    } else {
-                        // if there is no marker, the files are consumed, we should not pull it again
-                        filesConsumed = true;
-                    }
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
-                }
-            
-                exchanges = createExchanges(listObjects.getObjectSummaries());
+                // no more data so clear marker
+                marker = null;
             }
-        }    
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.getObjectSummaries());
+        }
         return processBatch(CastUtils.cast(exchanges));
     }