You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/10/11 17:19:17 UTC
camel git commit: CAMEL-8431 Fixed the issue that camel keeps
consuming the same file when deleteAfterRead is false
Repository: camel
Updated Branches:
refs/heads/master 0e8d8419e -> a2ba808d4
CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a2ba808d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a2ba808d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a2ba808d
Branch: refs/heads/master
Commit: a2ba808d48c1e1f346bbbcef3e47ef1937e16715
Parents: 0e8d841
Author: Willem Jiang <wi...@gmail.com>
Authored: Sun Oct 11 23:19:00 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Oct 11 23:19:00 2015 +0800
----------------------------------------------------------------------
.../camel/component/aws/s3/S3Consumer.java | 71 ++++++++++++--------
1 file changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a2ba808d/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 2f355f9..7de2b9d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -49,6 +49,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
private String marker;
+ private boolean filesConsumed;
public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
super(endpoint, processor);
@@ -63,37 +64,49 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
String fileName = getConfiguration().getFileName();
String bucketName = getConfiguration().getBucketName();
Queue<Exchange> exchanges;
-
- if (fileName != null) {
- LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
-
- S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
- exchanges = createExchanges(s3Object);
- } else {
- LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-
- ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
- listObjectsRequest.setBucketName(bucketName);
- listObjectsRequest.setPrefix(getConfiguration().getPrefix());
- if (maxMessagesPerPoll > 0) {
- listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
- }
- if (marker != null && !getConfiguration().isDeleteAfterRead()) {
- listObjectsRequest.setMarker(marker);
- }
- ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
- // we only setup the marker if the file is not deleted
- if (!getConfiguration().isDeleteAfterRead()) {
- // where marker is track
- marker = listObjects.getMarker();
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ if (filesConsumed) {
+ exchanges = new LinkedList<Exchange>();
+ } else {
+ if (fileName != null) {
+ LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+
+ S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+ exchanges = createExchanges(s3Object);
+ if (!getConfiguration().isDeleteAfterRead()) {
+ filesConsumed = true;
+ }
+ } else {
+ LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+ listObjectsRequest.setBucketName(bucketName);
+ listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+ if (maxMessagesPerPoll > 0) {
+ listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+ }
+ if (marker != null && !getConfiguration().isDeleteAfterRead()) {
+ listObjectsRequest.setMarker(marker);
+ }
+
+ ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+ // we only setup the marker if the file is not deleted
+ if (!getConfiguration().isDeleteAfterRead()) {
+ // if the marker is truncated, the nextMarker should not be null
+ if (listObjects.getNextMarker() != null) {
+ marker = listObjects.getNextMarker();
+ } else {
+ // if there is no marker, the files are consumed, we should not pull it again
+ filesConsumed = true;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ }
+
+ exchanges = createExchanges(listObjects.getObjectSummaries());
}
-
- exchanges = createExchanges(listObjects.getObjectSummaries());
- }
+ }
return processBatch(CastUtils.cast(exchanges));
}