You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/11/08 13:04:27 UTC

[camel] 01/03: CAMEL-12921 - Camel-AWS SQS: Add an option to create a SQS delay queue

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c4a5f6f4d7b4d3fd01dfe644a9927b635b9b8731
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 8 13:42:57 2018 +0100

    CAMEL-12921 - Camel-AWS SQS: Add an option to create a SQS delay queue
---
 components/camel-aws/src/main/docs/aws-sqs-component.adoc  |  3 ++-
 .../apache/camel/component/aws/sqs/SqsConfiguration.java   | 14 ++++++++++++++
 .../org/apache/camel/component/aws/sqs/SqsEndpoint.java    |  6 ++++++
 .../aws/sqs/springboot/SqsComponentConfiguration.java      | 13 +++++++++++++
 4 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/components/camel-aws/src/main/docs/aws-sqs-component.adoc b/components/camel-aws/src/main/docs/aws-sqs-component.adoc
index b285d8a..1b55eb9 100644
--- a/components/camel-aws/src/main/docs/aws-sqs-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-sqs-component.adoc
@@ -68,7 +68,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (47 parameters):
+==== Query Parameters (48 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -94,6 +94,7 @@ with the following path and query parameters:
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |  | PollingConsumerPoll Strategy
+| *delayQueue* (producer) | Define if you want to apply delaySeconds option to the queue or on single messages | false | boolean
 | *delaySeconds* (producer) | Delay sending messages for a number of seconds. |  | Integer
 | *messageDeduplicationId Strategy* (producer) | Only for FIFO queues. Strategy for setting the messageDeduplicationId on the message. Can be one of the following options: useExchangeId, useContentBasedDeduplication. For the useContentBasedDeduplication option, no messageDeduplicationId will be set on the message. | useExchangeId | MessageDeduplicationId Strategy
 | *messageGroupIdStrategy* (producer) | Only for FIFO queues. Strategy for setting the messageGroupId on the message. Can be one of the following options: useConstant, useExchangeId, usePropertyValue. For the usePropertyValue option, the value of property CamelAwsMessageGroupId will be used. |  | MessageGroupIdStrategy
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index 9714074..ce66e1b 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -69,6 +69,9 @@ public class SqsConfiguration implements Cloneable {
     // producer properties
     @UriParam(label = "producer")
     private Integer delaySeconds;
+    // producer properties
+    @UriParam(label = "producer")
+    private boolean delayQueue;
     @UriParam(label = "producer", enums = "useConstant,useExchangeId,usePropertyValue")
     private MessageGroupIdStrategy messageGroupIdStrategy;
     @UriParam(label = "producer", defaultValue = "useExchangeId", enums = "useExchangeId,useContentBasedDeduplication")
@@ -224,6 +227,17 @@ public class SqsConfiguration implements Cloneable {
         this.delaySeconds = delaySeconds;
     }
 
+    public boolean isDelayQueue() {
+        return delayQueue;
+    }
+
+    /**
+     * Define if you want to apply delaySeconds option to the queue or on single messages
+     */
+    public void setDelayQueue(boolean delayQueue) {
+        this.delayQueue = delayQueue;
+    }
+
     public Integer getMaximumMessageSize() {
         return maximumMessageSize;
     }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index 1182f12..de21934 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -183,6 +183,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
         if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
             request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
         }
+        if (getConfiguration().getDelaySeconds() != null && getConfiguration().isDelayQueue()) {
+            request.getAttributes().put(QueueAttributeName.DelaySeconds.name(), String.valueOf(getConfiguration().getDelaySeconds()));
+        }
         if (getConfiguration().getRedrivePolicy() != null) {
             request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
         }
@@ -212,6 +215,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
         if (getConfiguration().getReceiveMessageWaitTimeSeconds() != null) {
             request.getAttributes().put(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), String.valueOf(getConfiguration().getReceiveMessageWaitTimeSeconds()));
         }
+        if (getConfiguration().getDelaySeconds() != null && getConfiguration().isDelayQueue()) {
+            request.getAttributes().put(QueueAttributeName.DelaySeconds.name(), String.valueOf(getConfiguration().getDelaySeconds()));
+        }
         if (getConfiguration().getRedrivePolicy() != null) {
             request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
         }
diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java
index d06b7bd..c8ae146 100644
--- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java
@@ -159,6 +159,11 @@ public class SqsComponentConfiguration
          */
         private Integer delaySeconds;
         /**
+         * Define if you want to apply delaySeconds option to the queue or on
+         * single messages
+         */
+        private Boolean delayQueue = false;
+        /**
          * The maximumMessageSize (in bytes) an SQS message can contain for this
          * queue.
          */
@@ -321,6 +326,14 @@ public class SqsComponentConfiguration
             this.delaySeconds = delaySeconds;
         }
 
+        public Boolean getDelayQueue() {
+            return delayQueue;
+        }
+
+        public void setDelayQueue(Boolean delayQueue) {
+            this.delayQueue = delayQueue;
+        }
+
         public Integer getMaximumMessageSize() {
             return maximumMessageSize;
         }