You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/28 09:05:34 UTC
[2/2] camel git commit: CAMEL-9784: aws s3 consumer should keep
polling if deleteAfterRead is false,
otherwise it only poll data one time and then never anymore.
CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9216caf3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9216caf3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9216caf3
Branch: refs/heads/camel-2.16.x
Commit: 9216caf3d1983d1b661d27deaf633fcfc6a59c09
Parents: 4055428
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 28 09:02:28 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 28 09:05:07 2016 +0200
----------------------------------------------------------------------
.../camel/component/aws/s3/S3Configuration.java | 7 +-
.../camel/component/aws/s3/S3Consumer.java | 72 +++++++++-----------
2 files changed, 37 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9216caf3/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index 36bab52..97d4e40 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -176,7 +176,12 @@ public class S3Configuration implements Cloneable {
}
/**
- * Delete objects from S3 after it has been retrieved.
+ * Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed.
+ * If a rollback occurs, the object is not deleted.
+ * <p/>
+ * If this option is false, then the same objects will be retrieve over and over again on the polls. Therefore you
+ * need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the
+ * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or only the {@link S3Constants#KEY} header.
*/
public void setDeleteAfterRead(boolean deleteAfterRead) {
this.deleteAfterRead = deleteAfterRead;
http://git-wip-us.apache.org/repos/asf/camel/blob/9216caf3/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index eab0508..5fb4936 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory;
/**
* A Consumer of messages from the Amazon Web Service Simple Storage Service
* <a href="http://aws.amazon.com/s3/">AWS S3</a>
- *
*/
public class S3Consumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
private String marker;
- private boolean filesConsumed;
private transient String s3ConsumerToString;
public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
@@ -66,48 +64,40 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
String bucketName = getConfiguration().getBucketName();
Queue<Exchange> exchanges;
- if (filesConsumed) {
- exchanges = new LinkedList<Exchange>();
+ if (fileName != null) {
+ LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+
+ S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+ exchanges = createExchanges(s3Object);
} else {
- if (fileName != null) {
- LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
-
- S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
- exchanges = createExchanges(s3Object);
- if (!getConfiguration().isDeleteAfterRead()) {
- filesConsumed = true;
- }
+ LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+ listObjectsRequest.setBucketName(bucketName);
+ listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+ if (maxMessagesPerPoll > 0) {
+ listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+ }
+ // if there was a marker from previous poll then use that to continue from where we left last time
+ if (marker != null) {
+ LOG.trace("Resuming from marker: {}", marker);
+ listObjectsRequest.setMarker(marker);
+ }
+
+ ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+ if (listObjects.isTruncated()) {
+ marker = listObjects.getNextMarker();
+ LOG.trace("Returned list is truncated, so setting next marker: {}", marker);
} else {
- LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-
- ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
- listObjectsRequest.setBucketName(bucketName);
- listObjectsRequest.setPrefix(getConfiguration().getPrefix());
- if (maxMessagesPerPoll > 0) {
- listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
- }
- if (marker != null && !getConfiguration().isDeleteAfterRead()) {
- listObjectsRequest.setMarker(marker);
- }
-
- ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
- // we only setup the marker if the file is not deleted
- if (!getConfiguration().isDeleteAfterRead()) {
- // if the marker is truncated, the nextMarker should not be null
- if (listObjects.getNextMarker() != null) {
- marker = listObjects.getNextMarker();
- } else {
- // if there is no marker, the files are consumed, we should not pull it again
- filesConsumed = true;
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
- }
-
- exchanges = createExchanges(listObjects.getObjectSummaries());
+ // no more data so clear marker
+ marker = null;
}
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ }
+
+ exchanges = createExchanges(listObjects.getObjectSummaries());
+ }
return processBatch(CastUtils.cast(exchanges));
}