You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/10/11 17:30:04 UTC

[1/2] camel git commit: CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x f233f53b8 -> 7624bc4cb
  refs/heads/camel-2.16.x 62b4310a8 -> e56c64e1f


CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e56c64e1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e56c64e1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e56c64e1

Branch: refs/heads/camel-2.16.x
Commit: e56c64e1ff4693bfffc8867c687ada83e3444a2f
Parents: 62b4310
Author: Willem Jiang <wi...@gmail.com>
Authored: Sun Oct 11 23:19:00 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Oct 11 23:22:11 2015 +0800

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Consumer.java      | 71 ++++++++++++--------
 1 file changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e56c64e1/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 2f355f9..7de2b9d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -49,6 +49,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
+    private boolean filesConsumed;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
         super(endpoint, processor);
@@ -63,37 +64,49 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
         String fileName = getConfiguration().getFileName();
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
-
-        if (fileName != null) {
-            LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
-
-            S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
-            exchanges = createExchanges(s3Object);
-        } else {
-            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-        
-            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-            listObjectsRequest.setBucketName(bucketName);
-            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-            if (maxMessagesPerPoll > 0) {
-                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-            }
-            if (marker != null && !getConfiguration().isDeleteAfterRead()) {
-                listObjectsRequest.setMarker(marker);
-            }
         
-            ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-            // we only setup the marker if the file is not deleted
-            if (!getConfiguration().isDeleteAfterRead()) {
-                // where marker is track
-                marker = listObjects.getMarker();
-            }
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+        if (filesConsumed) {
+            exchanges = new LinkedList<Exchange>();
+        } else {
+            if (fileName != null) {
+                LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+    
+                S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+                exchanges = createExchanges(s3Object);
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    filesConsumed = true;
+                }
+            } else {
+                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+            
+                ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+                listObjectsRequest.setBucketName(bucketName);
+                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+                if (maxMessagesPerPoll > 0) {
+                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+                }
+                if (marker != null && !getConfiguration().isDeleteAfterRead()) {
+                    listObjectsRequest.setMarker(marker);
+                }
+            
+                ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+                // we only setup the marker if the file is not deleted
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    // if the marker is truncated, the nextMarker should not be null
+                    if (listObjects.getNextMarker() != null) {
+                        marker = listObjects.getNextMarker();
+                    } else {
+                        // if there is no marker, the files are consumed, we should not pull it again
+                        filesConsumed = true;
+                    }
+                }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+                }
+            
+                exchanges = createExchanges(listObjects.getObjectSummaries());
             }
-        
-            exchanges = createExchanges(listObjects.getObjectSummaries());
-        }
+        }    
         return processBatch(CastUtils.cast(exchanges));
     }
     


[2/2] camel git commit: CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false

Posted by ni...@apache.org.
CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false

Conflicts:
	components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7624bc4c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7624bc4c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7624bc4c

Branch: refs/heads/camel-2.15.x
Commit: 7624bc4cb28b04d681efff0d4a1844481d5f6f63
Parents: f233f53
Author: Willem Jiang <wi...@gmail.com>
Authored: Sun Oct 11 23:19:00 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Oct 11 23:29:23 2015 +0800

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Consumer.java      | 94 +++++++++++---------
 1 file changed, 54 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7624bc4c/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index bac2649..7201924 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,12 +43,13 @@ import org.slf4j.LoggerFactory;
 /**
  * A Consumer of messages from the Amazon Web Service Simple Storage Service
  * <a href="http://aws.amazon.com/s3/">AWS S3</a>
- * 
+ *
  */
 public class S3Consumer extends ScheduledBatchPollingConsumer {
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
+    private boolean filesConsumed;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
         super(endpoint, processor);
@@ -59,56 +60,69 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
         // must reset for each poll
         shutdownRunningTask = null;
         pendingExchanges = 0;
-        
+
         String fileName = getConfiguration().getFileName();
         String bucketName = getConfiguration().getBucketName();
-        Queue<Exchange> exchanges = null;
 
-        if (fileName != null) {
-            LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+        Queue<Exchange> exchanges;
 
-            S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
-            exchanges = createExchanges(s3Object);
+        if (filesConsumed) {
+            exchanges = new LinkedList<Exchange>();
         } else {
-            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-        
-            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-            listObjectsRequest.setBucketName(bucketName);
-            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-            if (maxMessagesPerPoll > 0) {
-                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-            }
-            if (marker != null && !getConfiguration().isDeleteAfterRead()) {
-                listObjectsRequest.setMarker(marker);
-            }
-        
-            ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-            // we only setup the marker if the file is not deleted
-            if (!getConfiguration().isDeleteAfterRead()) {
-                // where marker is track
-                marker = listObjects.getMarker();
-            }
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+            if (fileName != null) {
+                LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+
+                S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+                exchanges = createExchanges(s3Object);
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    filesConsumed = true;
+                }
+            } else {
+                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+                ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+                listObjectsRequest.setBucketName(bucketName);
+                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+                if (maxMessagesPerPoll > 0) {
+                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+                }
+                if (marker != null && !getConfiguration().isDeleteAfterRead()) {
+                    listObjectsRequest.setMarker(marker);
+                }
+
+                ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+                // we only setup the marker if the file is not deleted
+                if (!getConfiguration().isDeleteAfterRead()) {
+                    // if the marker is truncated, the nextMarker should not be null
+                    if (listObjects.getNextMarker() != null) {
+                        marker = listObjects.getNextMarker();
+                    } else {
+                        // if there is no marker, the files are consumed, we should not pull it again
+                        filesConsumed = true;
+                    }
+                }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+                }
+
+                exchanges = createExchanges(listObjects.getObjectSummaries());
             }
-        
-            exchanges = createExchanges(listObjects.getObjectSummaries());
         }
         return processBatch(CastUtils.cast(exchanges));
     }
-    
+
     protected Queue<Exchange> createExchanges(S3Object s3Object) {
         Queue<Exchange> answer = new LinkedList<Exchange>();
         Exchange exchange = getEndpoint().createExchange(s3Object);
         answer.add(exchange);
         return answer;
     }
-    
+
     protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
         }
-        
+
         Queue<Exchange> answer = new LinkedList<Exchange>();
         for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
             S3Object s3Object = getAmazonS3Client().getObject(s3ObjectSummary.getBucketName(), s3ObjectSummary.getKey());
@@ -118,7 +132,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
 
         return answer;
     }
-    
+
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
@@ -160,7 +174,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
 
         return total;
     }
-    
+
     /**
      * Strategy to delete the message after being processed.
      *
@@ -171,9 +185,9 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
             if (getConfiguration().isDeleteAfterRead()) {
                 String bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
                 String key = exchange.getIn().getHeader(S3Constants.KEY, String.class);
-                
+
                 LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
-                
+
                 getAmazonS3Client().deleteObject(bucketName, key);
 
                 LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
@@ -200,11 +214,11 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
     protected S3Configuration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
-    
+
     protected AmazonS3 getAmazonS3Client() {
         return getEndpoint().getS3Client();
     }
-    
+
     @Override
     public S3Endpoint getEndpoint() {
         return (S3Endpoint) super.getEndpoint();
@@ -214,4 +228,4 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
     public String toString() {
         return "S3Consumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
     }
-}
\ No newline at end of file
+}