You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/10/27 07:40:28 UTC

[camel] 01/03: CAMEL-11224 aws-sqs producer does not support new FIFO queues

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b139e822bcb79cf51ca5a3b9add5e6ef8e3a313d
Author: Jon McEwen <jo...@boohoo.com>
AuthorDate: Tue Oct 10 10:36:18 2017 +0100

    CAMEL-11224 aws-sqs producer does not support new FIFO queues
---
 ...ts.java => ConstantMessageGroupIdStrategy.java} | 21 +++---
 .../{SqsConstants.java => ExchangeIdStrategy.java} | 21 +++---
 ...{SqsConstants.java => HeaderValueStrategy.java} | 21 +++---
 .../camel/component/aws/sqs/NullStrategy.java      | 12 +++
 .../camel/component/aws/sqs/SqsComponent.java      |  4 +
 .../camel/component/aws/sqs/SqsConfiguration.java  | 56 ++++++++++++++
 .../camel/component/aws/sqs/SqsConstants.java      |  1 +
 .../camel/component/aws/sqs/SqsProducer.java       | 35 ++++++---
 ...s.java => StringValueFromExchangeStrategy.java} | 19 ++---
 .../component/aws/sqs/SqsConfigurationTest.java    | 32 ++++++++
 .../camel/component/aws/sqs/SqsProducerTest.java   | 87 ++++++++++++++++++++++
 11 files changed, 251 insertions(+), 58 deletions(-)

diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
index 61891c9..44caf36 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+public class ConstantMessageGroupIdStrategy implements StringValueFromExchangeStrategy {
+
+    @Override
+    public String value(Exchange exchange) {
+        return "CamelSingleMessageGroup";
+    }
 
-    String ATTRIBUTES = "CamelAwsSqsAttributes";
-    String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
-    String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
-    String MESSAGE_ID = "CamelAwsSqsMessageId";
-    String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
-    String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java
index 61891c9..6b0dac1 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+public class ExchangeIdStrategy implements StringValueFromExchangeStrategy {
+
+    @Override
+    public String value(Exchange exchange) {
+        return exchange.getExchangeId();
+    }
 
-    String ATTRIBUTES = "CamelAwsSqsAttributes";
-    String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
-    String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
-    String MESSAGE_ID = "CamelAwsSqsMessageId";
-    String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
-    String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java
index 61891c9..0bef8c2 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+public class HeaderValueStrategy implements StringValueFromExchangeStrategy {
+
+    @Override
+    public String value(Exchange exchange) {
+        return exchange.getIn().getHeader(SqsConstants.MESSAGE_GROUP_ID_HEADER, String.class);
+    }
 
-    String ATTRIBUTES = "CamelAwsSqsAttributes";
-    String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
-    String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
-    String MESSAGE_ID = "CamelAwsSqsMessageId";
-    String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
-    String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java
new file mode 100644
index 0000000..bbb5ab3
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java
@@ -0,0 +1,12 @@
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public class NullStrategy implements StringValueFromExchangeStrategy {
+
+    @Override
+    public String value(Exchange exchange) {
+        return null;
+    }
+
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
index f865670..653b956 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
@@ -62,6 +62,10 @@ public class SqsComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("Extending message visibility (extendMessageVisibility) requires visibilityTimeout to be set on the Endpoint.");
         }
         
+        if (configuration.isFifoQueue() && configuration.getMessageGroupIdStrategy() == null) {
+            throw new IllegalArgumentException("messageGroupIdStrategy must be set for FIFO queues.");
+        }
+        
         SqsEndpoint sqsEndpoint = new SqsEndpoint(uri, this, configuration);
         sqsEndpoint.setConsumerProperties(parameters);
         return sqsEndpoint;
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index 3683e44..dbab657f 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -67,6 +67,10 @@ public class SqsConfiguration {
     // producer properties
     @UriParam(label = "producer")
     private Integer delaySeconds;
+    @UriParam(label = "producer")
+    private StringValueFromExchangeStrategy messageGroupIdStrategy;
+    @UriParam(label = "producer", defaultValue = "useExchangeId")
+    private StringValueFromExchangeStrategy messageDeduplicationIdStrategy = new ExchangeIdStrategy();
 
     // queue properties
     @UriParam(label = "queue")
@@ -83,6 +87,18 @@ public class SqsConfiguration {
     private String redrivePolicy;
 
     /**
+     *  Whether or not the queue is a FIFO queue
+     */
+    public boolean isFifoQueue() {
+        // AWS docs suggest this is valid derivation.
+        // FIFO queue names must end with .fifo, and standard queues cannot
+        if (queueName.endsWith(".fifo")) {
+            return true;
+        }
+        return false;
+    }
+
+     /**
      * The region with which the AWS-SQS client wants to work with.
      * Only works if Camel creates the AWS-SQS client, i.e., if you explicitly set amazonSQSClient,
      * then this setting will have no effect. You would have to set it on the client you create directly
@@ -364,4 +380,44 @@ public class SqsConfiguration {
     public void setProxyPort(Integer proxyPort) {
         this.proxyPort = proxyPort;
     }
+
+    /**
+     * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the messageGroupId on the message.
+     * Can be one of the following options: *useConstant*, *useExchangeId*, *useHeaderValue*.
+     * For the *useHeaderValue* option, the value of header "CamelAwsMessageGroupId" will be used.
+     */
+    public void setMessageGroupIdStrategy(String strategy) {
+        if ("useConstant".equalsIgnoreCase(strategy)) {
+            messageGroupIdStrategy = new ConstantMessageGroupIdStrategy();
+        } else if ("useExchangeId".equalsIgnoreCase(strategy)) {
+            messageGroupIdStrategy = new ExchangeIdStrategy();
+        } else if ("useHeaderValue".equalsIgnoreCase(strategy)) {
+            messageGroupIdStrategy = new HeaderValueStrategy();
+        } else {
+            throw new IllegalArgumentException("Unrecognised MessageGroupIdStrategy: " + strategy);
+        }
+    }
+
+    public StringValueFromExchangeStrategy getMessageGroupIdStrategy() {
+        return messageGroupIdStrategy;
+    }
+
+    public StringValueFromExchangeStrategy getMessageDeduplicationIdStrategy() {
+        return messageDeduplicationIdStrategy;
+    }
+
+    /**
+     * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the messageDeduplicationId on the message.
+     * Can be one of the following options: *useExchangeId*, *useContentBasedDeduplication*.
+     * For the *useContentBasedDeduplication* option, no messageDeduplicationId will be set on the message.
+     */
+    public void setMessageDeduplicationIdStrategy(String strategy) {
+        if ("useExchangeId".equalsIgnoreCase(strategy)) {
+            messageDeduplicationIdStrategy = new ExchangeIdStrategy();
+        } else if ("useContentBasedDeduplication".equalsIgnoreCase(strategy)) {
+            messageDeduplicationIdStrategy = new NullStrategy();
+        } else {
+            throw new IllegalArgumentException("Unrecognised MessageDeduplicationIdStrategy: " + strategy);
+        }
+    }
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
index 61891c9..5fc53c6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
@@ -28,4 +28,5 @@ public interface SqsConstants {
     String MESSAGE_ID = "CamelAwsSqsMessageId";
     String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
     String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
+    String MESSAGE_GROUP_ID_HEADER = "CamelAwsMessageGroupId";
 }
\ No newline at end of file
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 682d75e..e3d2f00 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -42,9 +42,9 @@ import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageFo
  * 
  */
 public class SqsProducer extends DefaultProducer {
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(SqsProducer.class);
-    
+
     private transient String sqsProducerToString;
 
     public SqsProducer(SqsEndpoint endpoint) throws NoFactoryAvailableException {
@@ -56,18 +56,33 @@ public class SqsProducer extends DefaultProducer {
         SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body);
         request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(), exchange));
         addDelay(request, exchange);
+        configureFifoAttributes(request, exchange);
 
         LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
-        
+
         SendMessageResult result = getClient().sendMessage(request);
-        
+
         LOG.trace("Received result [{}]", result);
-        
+
         Message message = getMessageForResponse(exchange);
         message.setHeader(SqsConstants.MESSAGE_ID, result.getMessageId());
         message.setHeader(SqsConstants.MD5_OF_BODY, result.getMD5OfMessageBody());
     }
 
+    private void configureFifoAttributes(SendMessageRequest request, Exchange exchange) {
+        if (getEndpoint().getConfiguration().isFifoQueue()) {
+            // use strategies
+            StringValueFromExchangeStrategy messageGroupIdStrategy = getEndpoint().getConfiguration().getMessageGroupIdStrategy();
+            String messageGroupId = messageGroupIdStrategy.value(exchange);
+            request.setMessageGroupId(messageGroupId);
+
+            StringValueFromExchangeStrategy messageDeduplicationIdStrategy = getEndpoint().getConfiguration().getMessageDeduplicationIdStrategy();
+            String messageDeduplicationId = messageDeduplicationIdStrategy.value(exchange);
+            request.setMessageDeduplicationId(messageDeduplicationId);
+
+        }
+    }
+
     private void addDelay(SendMessageRequest request, Exchange exchange) {
         Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
         Integer delayValue;
@@ -81,20 +96,20 @@ public class SqsProducer extends DefaultProducer {
         LOG.trace("found delay: " + delayValue);
         request.setDelaySeconds(delayValue == null ? Integer.valueOf(0) : delayValue);
     }
-    
+
     protected AmazonSQS getClient() {
         return getEndpoint().getClient();
     }
-    
+
     protected String getQueueUrl() {
         return getEndpoint().getQueueUrl();
     }
-    
+
     @Override
     public SqsEndpoint getEndpoint() {
         return (SqsEndpoint) super.getEndpoint();
     }
-    
+
     @Override
     public String toString() {
         if (sqsProducerToString == null) {
@@ -102,7 +117,7 @@ public class SqsProducer extends DefaultProducer {
         }
         return sqsProducerToString;
     }
-    
+
     private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
         Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>();
         HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java
index 61891c9..31eafb6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java
@@ -16,16 +16,11 @@
  */
 package org.apache.camel.component.aws.sqs;
 
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+@FunctionalInterface
+public interface StringValueFromExchangeStrategy {
+    
+    String value(Exchange exchange);
 
-    String ATTRIBUTES = "CamelAwsSqsAttributes";
-    String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
-    String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
-    String MESSAGE_ID = "CamelAwsSqsMessageId";
-    String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
-    String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java
new file mode 100644
index 0000000..ace5173
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java
@@ -0,0 +1,32 @@
+package org.apache.camel.component.aws.sqs;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class SqsConfigurationTest {
+
+    @Test
+    public void itReturnsAnInformativeErrorForBadMessageGroupIdStrategy() throws Exception {
+        SqsConfiguration sqsConfiguration = new SqsConfiguration();
+        try {
+            sqsConfiguration.setMessageGroupIdStrategy("useUnknownStrategy");
+            fail("Should have thrown exception");
+        } catch (Exception e) {
+            assertTrue("Bad error message: " + e.getMessage(), e.getMessage().startsWith("Unrecognised MessageGroupIdStrategy"));
+        }
+    }
+
+    
+    @Test
+    public void itReturnsAnInformativeErrorForBadMessageDeduplicationIdStrategy() throws Exception {
+        SqsConfiguration sqsConfiguration = new SqsConfiguration();
+        try {
+            sqsConfiguration.setMessageDeduplicationIdStrategy("useUnknownStrategy");
+            fail("Should have thrown exception");
+        } catch (Exception e) {
+            assertTrue("Bad error message: " + e.getMessage(), e.getMessage().startsWith("Unrecognised MessageDeduplicationIdStrategy"));
+        }
+    }
+
+}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
index bed350c..c1d24f1 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
@@ -55,6 +55,7 @@ public class SqsProducerTest {
     private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3";
     private static final String SAMPLE_MESSAGE_HEADER_NAME_4 = "CamelHeader_1";
     private static final String SAMPLE_MESSAGE_HEADER_VALUE_4 = "testValue";
+    private static final String SAMPLE_EXCHANGE_ID = "ID:whatever-the-hostname-is-32818-1506943497897-1:1:8:1:75939";
     
     Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
 
@@ -75,12 +76,14 @@ public class SqsProducerTest {
         sqsConfiguration = new SqsConfiguration();
         HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy();
         sqsConfiguration.setDelaySeconds(Integer.valueOf(0));
+        sqsConfiguration.setQueueName("queueName");
         when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient);
         when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration);
         when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult);
         when(exchange.getOut()).thenReturn(outMessage);
         when(exchange.getIn()).thenReturn(inMessage);
         when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly);
+        when(exchange.getExchangeId()).thenReturn(SAMPLE_EXCHANGE_ID);
         when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY);
         when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL);
         when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy);
@@ -194,8 +197,92 @@ public class SqsProducerTest {
                          .getStringValue());
         assertEquals(3, capture.getValue().getMessageAttributes().size());
     }
+
+    @Test
+    public void itSetsMessageGroupIdUsingConstantStrategy() throws Exception {
+        sqsConfiguration.setQueueName("queueName.fifo");
+        sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals("CamelSingleMessageGroup", capture.getValue().getMessageGroupId());
+        
+    }
     
+    @Test
+    public void itSetsMessageGroupIdUsingExchangeIdStrategy() throws Exception {
+        sqsConfiguration.setQueueName("queueName.fifo");
+        sqsConfiguration.setMessageGroupIdStrategy("useExchangeId");
+
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageGroupId());
+        
+    }
     
+    @Test
+    public void itSetsMessageGroupIdUsingHeaderValueStrategy() throws Exception {
+        sqsConfiguration.setQueueName("queueName.fifo");
+        sqsConfiguration.setMessageGroupIdStrategy("useHeaderValue");
+        when(inMessage.getHeader(SqsConstants.MESSAGE_GROUP_ID_HEADER, String.class)).thenReturn("my-group-id");
+
+        underTest.process(exchange);
 
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals("my-group-id", capture.getValue().getMessageGroupId());
+        
+    }
+
+    @Test
+    public void itSetsMessageDedpulicationIdUsingExchangeIdStrategy() throws Exception {
+        sqsConfiguration.setQueueName("queueName.fifo");
+        sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+        sqsConfiguration.setMessageDeduplicationIdStrategy("useExchangeId");
+
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageDeduplicationId());
+        
+    }
+
+    @Test
+    public void itSetsMessageDedpulicationIdUsingExchangeIdStrategyAsDefault() throws Exception {
+        sqsConfiguration.setQueueName("queueName.fifo");
+        sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageDeduplicationId());
+        
+    }
+
+    @Test
+    public void itDoesNotSetMessageDedpulicationIdUsingContentBasedDeduplicationStrategy() throws Exception {
+        sqsConfiguration.setQueueName("queueName.fifo");
+        sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+        sqsConfiguration.setMessageDeduplicationIdStrategy("useContentBasedDeduplication");
+
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertNull(capture.getValue().getMessageDeduplicationId());
+        
+    }
 
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.