You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/10/11 17:30:04 UTC
[1/2] camel git commit: CAMEL-8431 Fixed the issue that camel keeps
consuming the same file when deleteAfterRead is false
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x f233f53b8 -> 7624bc4cb
refs/heads/camel-2.16.x 62b4310a8 -> e56c64e1f
CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e56c64e1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e56c64e1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e56c64e1
Branch: refs/heads/camel-2.16.x
Commit: e56c64e1ff4693bfffc8867c687ada83e3444a2f
Parents: 62b4310
Author: Willem Jiang <wi...@gmail.com>
Authored: Sun Oct 11 23:19:00 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Oct 11 23:22:11 2015 +0800
----------------------------------------------------------------------
.../camel/component/aws/s3/S3Consumer.java | 71 ++++++++++++--------
1 file changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e56c64e1/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 2f355f9..7de2b9d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -49,6 +49,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
private String marker;
+ private boolean filesConsumed;
public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
super(endpoint, processor);
@@ -63,37 +64,49 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
String fileName = getConfiguration().getFileName();
String bucketName = getConfiguration().getBucketName();
Queue<Exchange> exchanges;
-
- if (fileName != null) {
- LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
-
- S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
- exchanges = createExchanges(s3Object);
- } else {
- LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-
- ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
- listObjectsRequest.setBucketName(bucketName);
- listObjectsRequest.setPrefix(getConfiguration().getPrefix());
- if (maxMessagesPerPoll > 0) {
- listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
- }
- if (marker != null && !getConfiguration().isDeleteAfterRead()) {
- listObjectsRequest.setMarker(marker);
- }
- ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
- // we only setup the marker if the file is not deleted
- if (!getConfiguration().isDeleteAfterRead()) {
- // where marker is track
- marker = listObjects.getMarker();
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ if (filesConsumed) {
+ exchanges = new LinkedList<Exchange>();
+ } else {
+ if (fileName != null) {
+ LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+
+ S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+ exchanges = createExchanges(s3Object);
+ if (!getConfiguration().isDeleteAfterRead()) {
+ filesConsumed = true;
+ }
+ } else {
+ LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+ listObjectsRequest.setBucketName(bucketName);
+ listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+ if (maxMessagesPerPoll > 0) {
+ listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+ }
+ if (marker != null && !getConfiguration().isDeleteAfterRead()) {
+ listObjectsRequest.setMarker(marker);
+ }
+
+ ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+ // we only setup the marker if the file is not deleted
+ if (!getConfiguration().isDeleteAfterRead()) {
+ // if the marker is truncated, the nextMarker should not be null
+ if (listObjects.getNextMarker() != null) {
+ marker = listObjects.getNextMarker();
+ } else {
+ // if there is no marker, the files are consumed, we should not pull it again
+ filesConsumed = true;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ }
+
+ exchanges = createExchanges(listObjects.getObjectSummaries());
}
-
- exchanges = createExchanges(listObjects.getObjectSummaries());
- }
+ }
return processBatch(CastUtils.cast(exchanges));
}
[2/2] camel git commit: CAMEL-8431 Fixed the issue that camel keeps
consuming the same file when deleteAfterRead is false
Posted by ni...@apache.org.
CAMEL-8431 Fixed the issue that camel keeps consuming the same file when deleteAfterRead is false
Conflicts:
components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7624bc4c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7624bc4c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7624bc4c
Branch: refs/heads/camel-2.15.x
Commit: 7624bc4cb28b04d681efff0d4a1844481d5f6f63
Parents: f233f53
Author: Willem Jiang <wi...@gmail.com>
Authored: Sun Oct 11 23:19:00 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Oct 11 23:29:23 2015 +0800
----------------------------------------------------------------------
.../camel/component/aws/s3/S3Consumer.java | 94 +++++++++++---------
1 file changed, 54 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7624bc4c/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index bac2649..7201924 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,12 +43,13 @@ import org.slf4j.LoggerFactory;
/**
* A Consumer of messages from the Amazon Web Service Simple Storage Service
* <a href="http://aws.amazon.com/s3/">AWS S3</a>
- *
+ *
*/
public class S3Consumer extends ScheduledBatchPollingConsumer {
-
+
private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
private String marker;
+ private boolean filesConsumed;
public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
super(endpoint, processor);
@@ -59,56 +60,69 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
// must reset for each poll
shutdownRunningTask = null;
pendingExchanges = 0;
-
+
String fileName = getConfiguration().getFileName();
String bucketName = getConfiguration().getBucketName();
- Queue<Exchange> exchanges = null;
- if (fileName != null) {
- LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+ Queue<Exchange> exchanges;
- S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
- exchanges = createExchanges(s3Object);
+ if (filesConsumed) {
+ exchanges = new LinkedList<Exchange>();
} else {
- LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-
- ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
- listObjectsRequest.setBucketName(bucketName);
- listObjectsRequest.setPrefix(getConfiguration().getPrefix());
- if (maxMessagesPerPoll > 0) {
- listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
- }
- if (marker != null && !getConfiguration().isDeleteAfterRead()) {
- listObjectsRequest.setMarker(marker);
- }
-
- ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
- // we only setup the marker if the file is not deleted
- if (!getConfiguration().isDeleteAfterRead()) {
- // where marker is track
- marker = listObjects.getMarker();
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ if (fileName != null) {
+ LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+
+ S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+ exchanges = createExchanges(s3Object);
+ if (!getConfiguration().isDeleteAfterRead()) {
+ filesConsumed = true;
+ }
+ } else {
+ LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+ listObjectsRequest.setBucketName(bucketName);
+ listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+ if (maxMessagesPerPoll > 0) {
+ listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+ }
+ if (marker != null && !getConfiguration().isDeleteAfterRead()) {
+ listObjectsRequest.setMarker(marker);
+ }
+
+ ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+ // we only setup the marker if the file is not deleted
+ if (!getConfiguration().isDeleteAfterRead()) {
+ // if the marker is truncated, the nextMarker should not be null
+ if (listObjects.getNextMarker() != null) {
+ marker = listObjects.getNextMarker();
+ } else {
+ // if there is no marker, the files are consumed, we should not pull it again
+ filesConsumed = true;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ }
+
+ exchanges = createExchanges(listObjects.getObjectSummaries());
}
-
- exchanges = createExchanges(listObjects.getObjectSummaries());
}
return processBatch(CastUtils.cast(exchanges));
}
-
+
protected Queue<Exchange> createExchanges(S3Object s3Object) {
Queue<Exchange> answer = new LinkedList<Exchange>();
Exchange exchange = getEndpoint().createExchange(s3Object);
answer.add(exchange);
return answer;
}
-
+
protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) {
if (LOG.isTraceEnabled()) {
LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
}
-
+
Queue<Exchange> answer = new LinkedList<Exchange>();
for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
S3Object s3Object = getAmazonS3Client().getObject(s3ObjectSummary.getBucketName(), s3ObjectSummary.getKey());
@@ -118,7 +132,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
return answer;
}
-
+
public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
@@ -160,7 +174,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
return total;
}
-
+
/**
* Strategy to delete the message after being processed.
*
@@ -171,9 +185,9 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
if (getConfiguration().isDeleteAfterRead()) {
String bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
String key = exchange.getIn().getHeader(S3Constants.KEY, String.class);
-
+
LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
-
+
getAmazonS3Client().deleteObject(bucketName, key);
LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
@@ -200,11 +214,11 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
protected S3Configuration getConfiguration() {
return getEndpoint().getConfiguration();
}
-
+
protected AmazonS3 getAmazonS3Client() {
return getEndpoint().getS3Client();
}
-
+
@Override
public S3Endpoint getEndpoint() {
return (S3Endpoint) super.getEndpoint();
@@ -214,4 +228,4 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
public String toString() {
return "S3Consumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
}
-}
\ No newline at end of file
+}