You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/10/11 17:19:17 UTC

camel git commit: CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false

Repository: camel
Updated Branches:
  refs/heads/master 0e8d8419e -> a2ba808d4


CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a2ba808d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a2ba808d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a2ba808d

Branch: refs/heads/master
Commit: a2ba808d48c1e1f346bbbcef3e47ef1937e16715
Parents: 0e8d841
Author: Willem Jiang <wi...@gmail.com>
Authored: Sun Oct 11 23:19:00 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Oct 11 23:19:00 2015 +0800

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Consumer.java      | 71 ++++++++++++--------
 1 file changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a2ba808d/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 2f355f9..7de2b9d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -49,6 +49,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
+    private boolean filesConsumed;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
         super(endpoint, processor);
@@ -63,37 +64,49 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
         String fileName = getConfiguration().getFileName();
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
-
-        if (fileName != null) {
-            LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
-
-            S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
-            exchanges = createExchanges(s3Object);
-        } else {
-            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-        
-            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-            listObjectsRequest.setBucketName(bucketName);
-            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-            if (maxMessagesPerPoll > 0) {
-                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-            }
-            if (marker != null && !getConfiguration().isDeleteAfterRead()) {
-                listObjectsRequest.setMarker(marker);
-            }
         
-            ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-            // we only setup the marker if the file is not deleted
-            if (!getConfiguration().isDeleteAfterRead()) {
-                // where marker is track
-                marker = listObjects.getMarker();
-            }
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+        if (filesConsumed) {
+            exchanges = new LinkedList<Exchange>();
+        } else {
+            if (fileName != null) {
+                LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+    
+                S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+                exchanges = createExchanges(s3Object);
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    filesConsumed = true;
+                }
+            } else {
+                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+            
+                ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+                listObjectsRequest.setBucketName(bucketName);
+                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+                if (maxMessagesPerPoll > 0) {
+                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+                }
+                if (marker != null && !getConfiguration().isDeleteAfterRead()) {
+                    listObjectsRequest.setMarker(marker);
+                }
+            
+                ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+                // we only setup the marker if the file is not deleted
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    // if the marker is truncated, the nextMarker should not be null
+                    if (listObjects.getNextMarker() != null) {
+                        marker = listObjects.getNextMarker();
+                    } else {
+                        // if there is no marker, the files are consumed, we should not pull it again
+                        filesConsumed = true;
+                    }
+                }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+                }
+            
+                exchanges = createExchanges(listObjects.getObjectSummaries());
             }
-        
-            exchanges = createExchanges(listObjects.getObjectSummaries());
-        }
+        }    
         return processBatch(CastUtils.cast(exchanges));
     }