You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/08/13 11:50:39 UTC
[1/2] git commit: CAMEL-7654 Supports Message Attribuets with
HeaderFilterStrategy in AWS SQS component
Repository: camel
Updated Branches:
refs/heads/master e7c9e405a -> 1e9878801
CAMEL-7654 Supports Message Attribuets with HeaderFilterStrategy in AWS SQS component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1e987880
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1e987880
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1e987880
Branch: refs/heads/master
Commit: 1e98788019bc1b4ebdd26fcbaa248ffa7a9715df
Parents: 2ca8187
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Aug 13 17:44:15 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Aug 13 17:49:50 2014 +0800
----------------------------------------------------------------------
.../camel/component/aws/sqs/SqsEndpoint.java | 27 ++++++++++++++--
.../aws/sqs/SqsHeaderFilterStrategy.java | 30 ++++++++++++++++++
.../camel/component/aws/sqs/SqsProducer.java | 33 +++++++++++++-------
.../component/aws/sqs/SqsProducerTest.java | 8 +++--
4 files changed, 81 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index b48fbe6..8ed85c5 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -31,7 +31,6 @@ import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
-
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -40,6 +39,8 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory;
* Defines the <a href="http://camel.apache.org/aws.html">AWS SQS Endpoint</a>.
*
*/
-public class SqsEndpoint extends ScheduledPollEndpoint {
+public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterStrategyAware {
private static final Logger LOG = LoggerFactory.getLogger(SqsEndpoint.class);
@@ -57,12 +58,21 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
private String queueUrl;
private SqsConfiguration configuration;
private int maxMessagesPerPoll;
+ private HeaderFilterStrategy headerFilterStrategy;
public SqsEndpoint(String uri, SqsComponent component, SqsConfiguration configuration) {
super(uri, component);
this.configuration = configuration;
}
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return headerFilterStrategy;
+ }
+ public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+ this.headerFilterStrategy = strategy;
+ }
+
public Producer createProducer() throws Exception {
return new SqsProducer(this);
}
@@ -87,6 +97,11 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
if (ObjectHelper.isNotEmpty(getConfiguration().getAmazonSQSEndpoint())) {
client.setEndpoint(getConfiguration().getAmazonSQSEndpoint());
}
+
+ // check the setting the headerFilterStrategy
+ if (headerFilterStrategy == null) {
+ headerFilterStrategy = new SqsHeaderFilterStrategy();
+ }
// If both region and Account ID is provided the queue URL can be built manually.
// This allows accessing queues where you don't have permission to list queues or query queues
@@ -191,10 +206,16 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes());
message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes());
+ //Need to apply the SqsHeaderFilterStrategy this time
+ HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy();
//add all sqs message attributes as camel message headers so that knowledge of
//the Sqs class MessageAttributeValue will not leak to the client
for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) {
- message.setHeader(entry.getKey(), translateValue(entry.getValue()));
+ String header = entry.getKey();
+ Object value = translateValue(entry.getValue());
+ if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) {
+ message.setHeader(header, value);
+ }
}
return exchange;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java
new file mode 100644
index 0000000..fb5f425
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class SqsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+ public SqsHeaderFilterStrategy() {
+ initialize();
+ }
+
+ protected void initialize() {
+ // filter headers begin with "Camel" or "org.apache.camel"
+ setOutFilterPattern("(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 0a4c948..874e021 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -25,11 +25,11 @@ import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
-
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@ public class SqsProducer extends DefaultProducer {
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body);
- request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders()));
+ request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(), exchange));
addDelay(request, exchange);
LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
@@ -106,19 +106,28 @@ public class SqsProducer extends DefaultProducer {
return "SqsProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
}
- private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers) {
+ private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>();
+ HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
for (Entry<String, Object> entry : headers.entrySet()) {
- Object value = entry.getValue();
- MessageAttributeValue mav = new MessageAttributeValue();
- if (value instanceof String) {
- mav.setDataType("String");
- mav.withStringValue((String)value);
- } else if (value instanceof ByteBuffer) {
- mav.setDataType("Binary");
- mav.withBinaryValue((ByteBuffer)value);
+ // only put the message header which is not filtered into the message attribute
+ if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
+ Object value = entry.getValue();
+ if (value instanceof String) {
+ MessageAttributeValue mav = new MessageAttributeValue();
+ mav.setDataType("String");
+ mav.withStringValue((String)value);
+ result.put(entry.getKey(), mav);
+ } else if (value instanceof ByteBuffer) {
+ MessageAttributeValue mav = new MessageAttributeValue();
+ mav.setDataType("Binary");
+ mav.withBinaryValue((ByteBuffer)value);
+ result.put(entry.getKey(), mav);
+ } else {
+ // cannot translate the message header to message attribute value
+ LOG.warn("Cannot put the message header key={0}, value={1} into Sqs MessageAttribute", entry.getKey(), entry.getValue());
+ }
}
- result.put(entry.getKey(), mav);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
index e6742ca..bed350c 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
@@ -23,17 +23,16 @@ import java.util.Map;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
-
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
@@ -54,6 +53,8 @@ public class SqsProducerTest {
private static final ByteBuffer SAMPLE_MESSAGE_HEADER_VALUE_2 = ByteBuffer.wrap(new byte[10]);
private static final String SAMPLE_MESSAGE_HEADER_NAME_3 = "header_name_3";
private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3";
+ private static final String SAMPLE_MESSAGE_HEADER_NAME_4 = "CamelHeader_1";
+ private static final String SAMPLE_MESSAGE_HEADER_VALUE_4 = "testValue";
Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
@@ -72,6 +73,7 @@ public class SqsProducerTest {
underTest = new SqsProducer(sqsEndpoint);
sendMessageResult = new SendMessageResult().withMD5OfMessageBody(MESSAGE_MD5).withMessageId(MESSAGE_ID);
sqsConfiguration = new SqsConfiguration();
+ HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy();
sqsConfiguration.setDelaySeconds(Integer.valueOf(0));
when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient);
when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration);
@@ -81,6 +83,7 @@ public class SqsProducerTest {
when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly);
when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY);
when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL);
+ when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy);
}
@Test
@@ -173,6 +176,7 @@ public class SqsProducerTest {
headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1);
headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2);
headers.put(SAMPLE_MESSAGE_HEADER_NAME_3, SAMPLE_MESSAGE_HEADER_VALUE_3);
+ headers.put(SAMPLE_MESSAGE_HEADER_NAME_4, SAMPLE_MESSAGE_HEADER_VALUE_4);
when(inMessage.getHeaders()).thenReturn(headers);
underTest.process(exchange);
[2/2] git commit: CAMEL-7654 Support Message attribute for the
SQSProducer
Posted by ni...@apache.org.
CAMEL-7654 Support Message attribute for the SQSProducer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ca8187c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ca8187c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ca8187c
Branch: refs/heads/master
Commit: 2ca8187cf079b011b7f4438d803cd51c96378dc9
Parents: e7c9e40
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Aug 13 16:58:21 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Aug 13 17:49:50 2014 +0800
----------------------------------------------------------------------
.../camel/component/aws/sqs/SqsEndpoint.java | 18 +++++
.../camel/component/aws/sqs/SqsProducer.java | 24 +++++++
.../component/aws/sqs/SqsProducerTest.java | 72 ++++++++++++++++++++
3 files changed, 114 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index 6447d7b..b48fbe6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.aws.sqs;
import java.util.HashMap;
+import java.util.Map.Entry;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
@@ -27,6 +28,7 @@ import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
@@ -188,6 +190,12 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle());
message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes());
message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes());
+
+ //add all sqs message attributes as camel message headers so that knowledge of
+ //the Sqs class MessageAttributeValue will not leak to the client
+ for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) {
+ message.setHeader(entry.getKey(), translateValue(entry.getValue()));
+ }
return exchange;
}
@@ -231,4 +239,14 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
+
+ private Object translateValue(MessageAttributeValue mav) {
+ Object result = null;
+ if (mav.getStringValue() != null) {
+ result = mav.getStringValue();
+ } else if (mav.getBinaryValue() != null) {
+ result = mav.getBinaryValue();
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index b4fa72b..0a4c948 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -16,7 +16,13 @@
*/
package org.apache.camel.component.aws.sqs;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
@@ -44,6 +50,7 @@ public class SqsProducer extends DefaultProducer {
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body);
+ request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders()));
addDelay(request, exchange);
LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
@@ -98,4 +105,21 @@ public class SqsProducer extends DefaultProducer {
public String toString() {
return "SqsProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
}
+
+ private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers) {
+ Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>();
+ for (Entry<String, Object> entry : headers.entrySet()) {
+ Object value = entry.getValue();
+ MessageAttributeValue mav = new MessageAttributeValue();
+ if (value instanceof String) {
+ mav.setDataType("String");
+ mav.withStringValue((String)value);
+ } else if (value instanceof ByteBuffer) {
+ mav.setDataType("Binary");
+ mav.withBinaryValue((ByteBuffer)value);
+ }
+ result.put(entry.getKey(), mav);
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
index efc4f0e..e6742ca 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.aws.sqs;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
@@ -31,6 +35,7 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -43,6 +48,12 @@ public class SqsProducerTest {
private static final String MESSAGE_MD5 = "00000000000000000000000000000000";
private static final String MESSAGE_ID = "11111111111111111111111111111111";
private static final String QUEUE_URL = "some://queue/url";
+ private static final String SAMPLE_MESSAGE_HEADER_NAME_1 = "header_name_1";
+ private static final String SAMPLE_MESSAGE_HEADER_VALUE_1 = "heder_value_1";
+ private static final String SAMPLE_MESSAGE_HEADER_NAME_2 = "header_name_2";
+ private static final ByteBuffer SAMPLE_MESSAGE_HEADER_VALUE_2 = ByteBuffer.wrap(new byte[10]);
+ private static final String SAMPLE_MESSAGE_HEADER_NAME_3 = "header_name_3";
+ private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3";
Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
@@ -121,5 +132,66 @@ public class SqsProducerTest {
underTest.process(exchange);
verify(inMessage).setHeader(SqsConstants.MD5_OF_BODY, MESSAGE_MD5);
}
+
+ @Test
+ public void isAttributeMessageStringHeaderOnTheRequest() throws Exception {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1);
+ when(inMessage.getHeaders()).thenReturn(headers);
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1,
+ capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
+ .getStringValue());
+ assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
+ .getBinaryValue());
+ }
+
+ @Test
+ public void isAttributeMessageByteBufferHeaderOnTheRequest() throws Exception {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2);
+ when(inMessage.getHeaders()).thenReturn(headers);
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2,
+ capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
+ .getBinaryValue());
+ assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
+ .getStringValue());
+ }
+
+ @Test
+ public void isAllAttributeMessagesOnTheRequest() throws Exception {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1);
+ headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2);
+ headers.put(SAMPLE_MESSAGE_HEADER_NAME_3, SAMPLE_MESSAGE_HEADER_VALUE_3);
+ when(inMessage.getHeaders()).thenReturn(headers);
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1,
+ capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
+ .getStringValue());
+ assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2,
+ capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
+ .getBinaryValue());
+ assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_3,
+ capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_3)
+ .getStringValue());
+ assertEquals(3, capture.getValue().getMessageAttributes().size());
+ }
+
+
+
}
\ No newline at end of file