You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/28 09:05:50 UTC

[1/2] camel git commit: Updated doc

Repository: camel
Updated Branches:
  refs/heads/master 3b4b52216 -> e9b01196a


Updated doc


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e9b01196
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e9b01196
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e9b01196

Branch: refs/heads/master
Commit: e9b01196aa1d904888ebbf3380700579ec503418
Parents: 9f16e39
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 28 09:02:40 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 28 09:05:44 2016 +0200

----------------------------------------------------------------------
 components/camel-etcd/src/main/docs/etcd.adoc | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e9b01196/components/camel-etcd/src/main/docs/etcd.adoc
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/main/docs/etcd.adoc b/components/camel-etcd/src/main/docs/etcd.adoc
index d19db9f..3632b0d 100644
--- a/components/camel-etcd/src/main/docs/etcd.adoc
+++ b/components/camel-etcd/src/main/docs/etcd.adoc
@@ -8,8 +8,9 @@
 
 
 
+
 // endpoint options: START
-The etcd component supports 14 endpoint options which are listed below:
+The etcd component supports 15 endpoint options which are listed below:
 
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -25,6 +26,7 @@ The etcd component supports 14 endpoint options which are listed below:
 | timeToLive | producer |  | Integer | To set the lifespan of a key in milliseconds.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
+| fromIndex | advance | 0 | Long | The index to watch from
 | password | security |  | String | The password to use for basic authentication.
 | sslContextParameters | security |  | SSLContextParameters | To configure security using SSLContextParameters.
 | userName | security |  | String | The user name to use for basic authentication.
@@ -40,6 +42,7 @@ The etcd component supports 14 endpoint options which are listed below:
 
 
 
+
 // component options: START
 The etcd component has no options.
 // component options: END


[2/2] camel git commit: CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.

Posted by da...@apache.org.
CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f16e397
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f16e397
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f16e397

Branch: refs/heads/master
Commit: 9f16e397cb8823f9317bd08a7f0f87ee05e1d5b8
Parents: 3b4b522
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 28 09:02:28 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 28 09:05:44 2016 +0200

----------------------------------------------------------------------
 components/camel-aws/src/main/docs/aws-s3.adoc  |  4 +-
 .../camel/component/aws/s3/S3Configuration.java | 12 ++--
 .../camel/component/aws/s3/S3Consumer.java      | 72 +++++++++-----------
 3 files changed, 42 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/docs/aws-s3.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-s3.adoc b/components/camel-aws/src/main/docs/aws-s3.adoc
index 6bf206a..e291580 100644
--- a/components/camel-aws/src/main/docs/aws-s3.adoc
+++ b/components/camel-aws/src/main/docs/aws-s3.adoc
@@ -38,6 +38,7 @@ The AWS S3 Storage Service component has no options.
 
 
 
+
 // endpoint options: START
 The AWS S3 Storage Service component supports 38 endpoint options which are listed below:
 
@@ -53,7 +54,7 @@ The AWS S3 Storage Service component supports 38 endpoint options which are list
 | proxyPort | common |  | Integer | Camel 2.16: Specify a proxy port to be used inside the client definition.
 | secretKey | common |  | String | Amazon AWS Secret Key
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
-| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs the object is not deleted.
+| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after they have been retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs the object is not deleted. If this option is false then the same objects will be retrieve over and over again on the polls. Therefore you need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the link S3ConstantsBUCKET_NAME and link S3ConstantsKEY headers or only the link S3ConstantsKEY header.
 | fileName | consumer |  | String | To get the object from the bucket with the given file name
 | includeBody | consumer | true | boolean | Camel 2.17: If it is true the exchange body will be set to a stream to the contents of the file. If false the headers will be set with the S3 object metadata but the body will be null.
 | maxMessagesPerPoll | consumer | 10 | int | Gets the maximum number of messages as a limit to poll at each polling. Is default unlimited but use 0 or negative number to disable it as unlimited.
@@ -88,6 +89,7 @@ The AWS S3 Storage Service component supports 38 endpoint options which are list
 
 
 
+
 |=======================================================================
 
 Required S3 component options

http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index 1057763..83c33d2 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -171,10 +171,6 @@ public class S3Configuration implements Cloneable {
         this.region = region;
     }
 
-    public boolean isDeleteAfterRead() {
-        return deleteAfterRead;
-    }
-
     /**
      * *Camel 2.17*: If it is true, the exchange body will be set to a stream to the contents of the file.
      * If false, the headers will be set with the S3 object metadata, but the body will be null.
@@ -187,9 +183,17 @@ public class S3Configuration implements Cloneable {
         return includeBody;
     }
 
+    public boolean isDeleteAfterRead() {
+        return deleteAfterRead;
+    }
+
     /**
      * Delete objects from S3 after they have been retrieved.  The delete is only performed if the Exchange is committed.
      * If a rollback occurs, the object is not deleted.
+     * <p/>
+     * If this option is false, then the same objects will be retrieve over and over again on the polls. Therefore you
+     * need to use the Idempotent Consumer EIP in the route to filter out duplicates. You can filter using the
+     * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or only the {@link S3Constants#KEY} header.
      */
     public void setDeleteAfterRead(boolean deleteAfterRead) {
         this.deleteAfterRead = deleteAfterRead;

http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index eab0508..5fb4936 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory;
 /**
  * A Consumer of messages from the Amazon Web Service Simple Storage Service
  * <a href="http://aws.amazon.com/s3/">AWS S3</a>
- * 
  */
 public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
-    private boolean filesConsumed;
     private transient String s3ConsumerToString;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
@@ -66,48 +64,40 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
         
-        if (filesConsumed) {
-            exchanges = new LinkedList<Exchange>();
+        if (fileName != null) {
+            LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
+
+            S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
+            exchanges = createExchanges(s3Object);
         } else {
-            if (fileName != null) {
-                LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName, fileName);
-    
-                S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName, fileName));
-                exchanges = createExchanges(s3Object);
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    filesConsumed = true;
-                }
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+            listObjectsRequest.setBucketName(bucketName);
+            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+            if (maxMessagesPerPoll > 0) {
+                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+            }
+            // if there was a marker from previous poll then use that to continue from where we left last time
+            if (marker != null) {
+                LOG.trace("Resuming from marker: {}", marker);
+                listObjectsRequest.setMarker(marker);
+            }
+
+            ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+            if (listObjects.isTruncated()) {
+                marker = listObjects.getNextMarker();
+                LOG.trace("Returned list is truncated, so setting next marker: {}", marker);
             } else {
-                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-            
-                ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-                listObjectsRequest.setBucketName(bucketName);
-                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-                if (maxMessagesPerPoll > 0) {
-                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-                }
-                if (marker != null && !getConfiguration().isDeleteAfterRead()) {
-                    listObjectsRequest.setMarker(marker);
-                }
-            
-                ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-                // we only setup the marker if the file is not deleted
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    // if the marker is truncated, the nextMarker should not be null
-                    if (listObjects.getNextMarker() != null) {
-                        marker = listObjects.getNextMarker();
-                    } else {
-                        // if there is no marker, the files are consumed, we should not pull it again
-                        filesConsumed = true;
-                    }
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
-                }
-            
-                exchanges = createExchanges(listObjects.getObjectSummaries());
+                // no more data so clear marker
+                marker = null;
             }
-        }    
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.getObjectSummaries());
+        }
         return processBatch(CastUtils.cast(exchanges));
     }