You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/10/27 07:40:28 UTC
[camel] 01/03: CAMEL-11224 aws-sqs producer does not support new
FIFO queues
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit b139e822bcb79cf51ca5a3b9add5e6ef8e3a313d
Author: Jon McEwen <jo...@boohoo.com>
AuthorDate: Tue Oct 10 10:36:18 2017 +0100
CAMEL-11224 aws-sqs producer does not support new FIFO queues
---
...ts.java => ConstantMessageGroupIdStrategy.java} | 21 +++---
.../{SqsConstants.java => ExchangeIdStrategy.java} | 21 +++---
...{SqsConstants.java => HeaderValueStrategy.java} | 21 +++---
.../camel/component/aws/sqs/NullStrategy.java | 12 +++
.../camel/component/aws/sqs/SqsComponent.java | 4 +
.../camel/component/aws/sqs/SqsConfiguration.java | 56 ++++++++++++++
.../camel/component/aws/sqs/SqsConstants.java | 1 +
.../camel/component/aws/sqs/SqsProducer.java | 35 ++++++---
...s.java => StringValueFromExchangeStrategy.java} | 19 ++---
.../component/aws/sqs/SqsConfigurationTest.java | 32 ++++++++
.../camel/component/aws/sqs/SqsProducerTest.java | 87 ++++++++++++++++++++++
11 files changed, 251 insertions(+), 58 deletions(-)
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
index 61891c9..44caf36 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
@@ -16,16 +16,13 @@
*/
package org.apache.camel.component.aws.sqs;
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+public class ConstantMessageGroupIdStrategy implements StringValueFromExchangeStrategy {
+
+ @Override
+ public String value(Exchange exchange) {
+ return "CamelSingleMessageGroup";
+ }
- String ATTRIBUTES = "CamelAwsSqsAttributes";
- String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
- String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
- String MESSAGE_ID = "CamelAwsSqsMessageId";
- String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
- String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java
index 61891c9..6b0dac1 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java
@@ -16,16 +16,13 @@
*/
package org.apache.camel.component.aws.sqs;
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+public class ExchangeIdStrategy implements StringValueFromExchangeStrategy {
+
+ @Override
+ public String value(Exchange exchange) {
+ return exchange.getExchangeId();
+ }
- String ATTRIBUTES = "CamelAwsSqsAttributes";
- String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
- String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
- String MESSAGE_ID = "CamelAwsSqsMessageId";
- String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
- String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java
index 61891c9..0bef8c2 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java
@@ -16,16 +16,13 @@
*/
package org.apache.camel.component.aws.sqs;
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+public class HeaderValueStrategy implements StringValueFromExchangeStrategy {
+
+ @Override
+ public String value(Exchange exchange) {
+ return exchange.getIn().getHeader(SqsConstants.MESSAGE_GROUP_ID_HEADER, String.class);
+ }
- String ATTRIBUTES = "CamelAwsSqsAttributes";
- String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
- String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
- String MESSAGE_ID = "CamelAwsSqsMessageId";
- String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
- String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java
new file mode 100644
index 0000000..bbb5ab3
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java
@@ -0,0 +1,12 @@
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public class NullStrategy implements StringValueFromExchangeStrategy {
+
+ @Override
+ public String value(Exchange exchange) {
+ return null;
+ }
+
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
index f865670..653b956 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
@@ -62,6 +62,10 @@ public class SqsComponent extends UriEndpointComponent {
throw new IllegalArgumentException("Extending message visibility (extendMessageVisibility) requires visibilityTimeout to be set on the Endpoint.");
}
+ if (configuration.isFifoQueue() && configuration.getMessageGroupIdStrategy() == null) {
+ throw new IllegalArgumentException("messageGroupIdStrategy must be set for FIFO queues.");
+ }
+
SqsEndpoint sqsEndpoint = new SqsEndpoint(uri, this, configuration);
sqsEndpoint.setConsumerProperties(parameters);
return sqsEndpoint;
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index 3683e44..dbab657f 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -67,6 +67,10 @@ public class SqsConfiguration {
// producer properties
@UriParam(label = "producer")
private Integer delaySeconds;
+ @UriParam(label = "producer")
+ private StringValueFromExchangeStrategy messageGroupIdStrategy;
+ @UriParam(label = "producer", defaultValue = "useExchangeId")
+ private StringValueFromExchangeStrategy messageDeduplicationIdStrategy = new ExchangeIdStrategy();
// queue properties
@UriParam(label = "queue")
@@ -83,6 +87,18 @@ public class SqsConfiguration {
private String redrivePolicy;
/**
+ * Whether or not the queue is a FIFO queue
+ */
+ public boolean isFifoQueue() {
+ // AWS docs suggest this is valid derivation.
+ // FIFO queue names must end with .fifo, and standard queues cannot
+ if (queueName.endsWith(".fifo")) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
* The region with which the AWS-SQS client wants to work with.
* Only works if Camel creates the AWS-SQS client, i.e., if you explicitly set amazonSQSClient,
* then this setting will have no effect. You would have to set it on the client you create directly
@@ -364,4 +380,44 @@ public class SqsConfiguration {
public void setProxyPort(Integer proxyPort) {
this.proxyPort = proxyPort;
}
+
+ /**
+ * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the messageGroupId on the message.
+ * Can be one of the following options: *useConstant*, *useExchangeId*, *useHeaderValue*.
+ * For the *useHeaderValue* option, the value of header "CamelAwsMessageGroupId" will be used.
+ */
+ public void setMessageGroupIdStrategy(String strategy) {
+ if ("useConstant".equalsIgnoreCase(strategy)) {
+ messageGroupIdStrategy = new ConstantMessageGroupIdStrategy();
+ } else if ("useExchangeId".equalsIgnoreCase(strategy)) {
+ messageGroupIdStrategy = new ExchangeIdStrategy();
+ } else if ("useHeaderValue".equalsIgnoreCase(strategy)) {
+ messageGroupIdStrategy = new HeaderValueStrategy();
+ } else {
+ throw new IllegalArgumentException("Unrecognised MessageGroupIdStrategy: " + strategy);
+ }
+ }
+
+ public StringValueFromExchangeStrategy getMessageGroupIdStrategy() {
+ return messageGroupIdStrategy;
+ }
+
+ public StringValueFromExchangeStrategy getMessageDeduplicationIdStrategy() {
+ return messageDeduplicationIdStrategy;
+ }
+
+ /**
+ * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the messageDeduplicationId on the message.
+ * Can be one of the following options: *useExchangeId*, *useContentBasedDeduplication*.
+ * For the *useContentBasedDeduplication* option, no messageDeduplicationId will be set on the message.
+ */
+ public void setMessageDeduplicationIdStrategy(String strategy) {
+ if ("useExchangeId".equalsIgnoreCase(strategy)) {
+ messageDeduplicationIdStrategy = new ExchangeIdStrategy();
+ } else if ("useContentBasedDeduplication".equalsIgnoreCase(strategy)) {
+ messageDeduplicationIdStrategy = new NullStrategy();
+ } else {
+ throw new IllegalArgumentException("Unrecognised MessageDeduplicationIdStrategy: " + strategy);
+ }
+ }
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
index 61891c9..5fc53c6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
@@ -28,4 +28,5 @@ public interface SqsConstants {
String MESSAGE_ID = "CamelAwsSqsMessageId";
String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
+ String MESSAGE_GROUP_ID_HEADER = "CamelAwsMessageGroupId";
}
\ No newline at end of file
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 682d75e..e3d2f00 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -42,9 +42,9 @@ import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageFo
*
*/
public class SqsProducer extends DefaultProducer {
-
+
private static final Logger LOG = LoggerFactory.getLogger(SqsProducer.class);
-
+
private transient String sqsProducerToString;
public SqsProducer(SqsEndpoint endpoint) throws NoFactoryAvailableException {
@@ -56,18 +56,33 @@ public class SqsProducer extends DefaultProducer {
SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body);
request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(), exchange));
addDelay(request, exchange);
+ configureFifoAttributes(request, exchange);
LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
-
+
SendMessageResult result = getClient().sendMessage(request);
-
+
LOG.trace("Received result [{}]", result);
-
+
Message message = getMessageForResponse(exchange);
message.setHeader(SqsConstants.MESSAGE_ID, result.getMessageId());
message.setHeader(SqsConstants.MD5_OF_BODY, result.getMD5OfMessageBody());
}
+ private void configureFifoAttributes(SendMessageRequest request, Exchange exchange) {
+ if (getEndpoint().getConfiguration().isFifoQueue()) {
+ // use strategies
+ StringValueFromExchangeStrategy messageGroupIdStrategy = getEndpoint().getConfiguration().getMessageGroupIdStrategy();
+ String messageGroupId = messageGroupIdStrategy.value(exchange);
+ request.setMessageGroupId(messageGroupId);
+
+ StringValueFromExchangeStrategy messageDeduplicationIdStrategy = getEndpoint().getConfiguration().getMessageDeduplicationIdStrategy();
+ String messageDeduplicationId = messageDeduplicationIdStrategy.value(exchange);
+ request.setMessageDeduplicationId(messageDeduplicationId);
+
+ }
+ }
+
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
Integer delayValue;
@@ -81,20 +96,20 @@ public class SqsProducer extends DefaultProducer {
LOG.trace("found delay: " + delayValue);
request.setDelaySeconds(delayValue == null ? Integer.valueOf(0) : delayValue);
}
-
+
protected AmazonSQS getClient() {
return getEndpoint().getClient();
}
-
+
protected String getQueueUrl() {
return getEndpoint().getQueueUrl();
}
-
+
@Override
public SqsEndpoint getEndpoint() {
return (SqsEndpoint) super.getEndpoint();
}
-
+
@Override
public String toString() {
if (sqsProducerToString == null) {
@@ -102,7 +117,7 @@ public class SqsProducer extends DefaultProducer {
}
return sqsProducerToString;
}
-
+
private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>();
HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java
similarity index 67%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java
index 61891c9..31eafb6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java
@@ -16,16 +16,11 @@
*/
package org.apache.camel.component.aws.sqs;
-/**
- * Constants used in Camel AWS SQS module
- *
- */
-public interface SqsConstants {
+import org.apache.camel.Exchange;
+
+@FunctionalInterface
+public interface StringValueFromExchangeStrategy {
+
+ String value(Exchange exchange);
- String ATTRIBUTES = "CamelAwsSqsAttributes";
- String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes";
- String MD5_OF_BODY = "CamelAwsSqsMD5OfBody";
- String MESSAGE_ID = "CamelAwsSqsMessageId";
- String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
- String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java
new file mode 100644
index 0000000..ace5173
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java
@@ -0,0 +1,32 @@
+package org.apache.camel.component.aws.sqs;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class SqsConfigurationTest {
+
+ @Test
+ public void itReturnsAnInformativeErrorForBadMessageGroupIdStrategy() throws Exception {
+ SqsConfiguration sqsConfiguration = new SqsConfiguration();
+ try {
+ sqsConfiguration.setMessageGroupIdStrategy("useUnknownStrategy");
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ assertTrue("Bad error message: " + e.getMessage(), e.getMessage().startsWith("Unrecognised MessageGroupIdStrategy"));
+ }
+ }
+
+
+ @Test
+ public void itReturnsAnInformativeErrorForBadMessageDeduplicationIdStrategy() throws Exception {
+ SqsConfiguration sqsConfiguration = new SqsConfiguration();
+ try {
+ sqsConfiguration.setMessageDeduplicationIdStrategy("useUnknownStrategy");
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ assertTrue("Bad error message: " + e.getMessage(), e.getMessage().startsWith("Unrecognised MessageDeduplicationIdStrategy"));
+ }
+ }
+
+}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
index bed350c..c1d24f1 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
@@ -55,6 +55,7 @@ public class SqsProducerTest {
private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3";
private static final String SAMPLE_MESSAGE_HEADER_NAME_4 = "CamelHeader_1";
private static final String SAMPLE_MESSAGE_HEADER_VALUE_4 = "testValue";
+ private static final String SAMPLE_EXCHANGE_ID = "ID:whatever-the-hostname-is-32818-1506943497897-1:1:8:1:75939";
Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
@@ -75,12 +76,14 @@ public class SqsProducerTest {
sqsConfiguration = new SqsConfiguration();
HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy();
sqsConfiguration.setDelaySeconds(Integer.valueOf(0));
+ sqsConfiguration.setQueueName("queueName");
when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient);
when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration);
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult);
when(exchange.getOut()).thenReturn(outMessage);
when(exchange.getIn()).thenReturn(inMessage);
when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly);
+ when(exchange.getExchangeId()).thenReturn(SAMPLE_EXCHANGE_ID);
when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY);
when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL);
when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy);
@@ -194,8 +197,92 @@ public class SqsProducerTest {
.getStringValue());
assertEquals(3, capture.getValue().getMessageAttributes().size());
}
+
+ @Test
+ public void itSetsMessageGroupIdUsingConstantStrategy() throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals("CamelSingleMessageGroup", capture.getValue().getMessageGroupId());
+
+ }
+ @Test
+ public void itSetsMessageGroupIdUsingExchangeIdStrategy() throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useExchangeId");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageGroupId());
+
+ }
+ @Test
+ public void itSetsMessageGroupIdUsingHeaderValueStrategy() throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useHeaderValue");
+ when(inMessage.getHeader(SqsConstants.MESSAGE_GROUP_ID_HEADER, String.class)).thenReturn("my-group-id");
+
+ underTest.process(exchange);
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals("my-group-id", capture.getValue().getMessageGroupId());
+
+ }
+
+ @Test
+ public void itSetsMessageDedpulicationIdUsingExchangeIdStrategy() throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+ sqsConfiguration.setMessageDeduplicationIdStrategy("useExchangeId");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageDeduplicationId());
+
+ }
+
+ @Test
+ public void itSetsMessageDedpulicationIdUsingExchangeIdStrategyAsDefault() throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageDeduplicationId());
+
+ }
+
+ @Test
+ public void itDoesNotSetMessageDedpulicationIdUsingContentBasedDeduplicationStrategy() throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+ sqsConfiguration.setMessageDeduplicationIdStrategy("useContentBasedDeduplication");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertNull(capture.getValue().getMessageDeduplicationId());
+
+ }
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.