You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/08/13 11:50:40 UTC

[2/2] git commit: CAMEL-7654 Support Message attribute for the SQSProducer

CAMEL-7654 Support Message attribute for the SQSProducer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ca8187c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ca8187c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ca8187c

Branch: refs/heads/master
Commit: 2ca8187cf079b011b7f4438d803cd51c96378dc9
Parents: e7c9e40
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Aug 13 16:58:21 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Aug 13 17:49:50 2014 +0800

----------------------------------------------------------------------
 .../camel/component/aws/sqs/SqsEndpoint.java    | 18 +++++
 .../camel/component/aws/sqs/SqsProducer.java    | 24 +++++++
 .../component/aws/sqs/SqsProducerTest.java      | 72 ++++++++++++++++++++
 3 files changed, 114 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index 6447d7b..b48fbe6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws.sqs;
 
 import java.util.HashMap;
+import java.util.Map.Entry;
 
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -27,6 +28,7 @@ import com.amazonaws.services.sqs.model.CreateQueueResult;
 import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
 import com.amazonaws.services.sqs.model.GetQueueUrlResult;
 import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
 import com.amazonaws.services.sqs.model.QueueAttributeName;
 import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
 
@@ -188,6 +190,12 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
         message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle());
         message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes());
         message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes());
+        
+        //add all sqs message attributes as camel message headers so that knowledge of 
+        //the Sqs class MessageAttributeValue will not leak to the client
+        for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) {
+            message.setHeader(entry.getKey(), translateValue(entry.getValue()));
+        }
         return exchange;
     }
 
@@ -231,4 +239,14 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
     public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
+    
+    private Object translateValue(MessageAttributeValue mav) {
+        Object result = null;
+        if (mav.getStringValue() != null) {
+            result = mav.getStringValue();
+        } else if (mav.getBinaryValue() != null) {
+            result = mav.getBinaryValue();
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index b4fa72b..0a4c948 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -16,7 +16,13 @@
  */
 package org.apache.camel.component.aws.sqs;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
 import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.amazonaws.services.sqs.model.SendMessageResult;
 
@@ -44,6 +50,7 @@ public class SqsProducer extends DefaultProducer {
     public void process(Exchange exchange) throws Exception {
         String body = exchange.getIn().getBody(String.class);
         SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body);
+        request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders()));
         addDelay(request, exchange);
 
         LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
@@ -98,4 +105,21 @@ public class SqsProducer extends DefaultProducer {
     public String toString() {
         return "SqsProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
     }
+    
+    private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers) {
+        Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>();
+        for (Entry<String, Object> entry : headers.entrySet()) {
+            Object value = entry.getValue();
+            MessageAttributeValue mav = new MessageAttributeValue();
+            if (value instanceof String) {
+                mav.setDataType("String");
+                mav.withStringValue((String)value);
+            } else if (value instanceof ByteBuffer) {
+                mav.setDataType("Binary");
+                mav.withBinaryValue((ByteBuffer)value);
+            }
+            result.put(entry.getKey(), mav);
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2ca8187c/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
index efc4f0e..e6742ca 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.aws.sqs;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.amazonaws.services.sqs.model.SendMessageResult;
@@ -31,6 +35,7 @@ import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
@@ -43,6 +48,12 @@ public class SqsProducerTest {
     private static final String MESSAGE_MD5 = "00000000000000000000000000000000";
     private static final String MESSAGE_ID = "11111111111111111111111111111111";
     private static final String QUEUE_URL = "some://queue/url";
+    private static final String SAMPLE_MESSAGE_HEADER_NAME_1 = "header_name_1";
+    private static final String SAMPLE_MESSAGE_HEADER_VALUE_1 = "heder_value_1";
+    private static final String SAMPLE_MESSAGE_HEADER_NAME_2 = "header_name_2";
+    private static final ByteBuffer SAMPLE_MESSAGE_HEADER_VALUE_2 = ByteBuffer.wrap(new byte[10]);
+    private static final String SAMPLE_MESSAGE_HEADER_NAME_3 = "header_name_3";
+    private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3";
     
     Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
 
@@ -121,5 +132,66 @@ public class SqsProducerTest {
         underTest.process(exchange);
         verify(inMessage).setHeader(SqsConstants.MD5_OF_BODY, MESSAGE_MD5);
     }
+    
+    @Test
+    public void isAttributeMessageStringHeaderOnTheRequest() throws Exception {
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1);
+        when(inMessage.getHeaders()).thenReturn(headers);
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1,
+                     capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
+                         .getStringValue());
+        assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
+            .getBinaryValue());
+    }
+    
+    @Test
+    public void isAttributeMessageByteBufferHeaderOnTheRequest() throws Exception {
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2);
+        when(inMessage.getHeaders()).thenReturn(headers);
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2,
+                     capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
+                         .getBinaryValue());
+        assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
+            .getStringValue());
+    }
+
+    @Test
+    public void isAllAttributeMessagesOnTheRequest() throws Exception {
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1);
+        headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2);
+        headers.put(SAMPLE_MESSAGE_HEADER_NAME_3, SAMPLE_MESSAGE_HEADER_VALUE_3);
+        when(inMessage.getHeaders()).thenReturn(headers);
+        underTest.process(exchange);
+
+        ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class);
+        verify(amazonSQSClient).sendMessage(capture.capture());
+
+        assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1,
+                     capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1)
+                         .getStringValue());
+        assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2,
+                     capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2)
+                         .getBinaryValue());
+        assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_3,
+                     capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_3)
+                         .getStringValue());
+        assertEquals(3, capture.getValue().getMessageAttributes().size());
+    }
+    
+    
+
 
 }
\ No newline at end of file