You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/07 16:20:54 UTC

[1/4] camel git commit: CAMEL-10953 handle pr comments

Repository: camel
Updated Branches:
  refs/heads/master cace0ee7e -> 9f1a4524e


CAMEL-10953 handle pr comments


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af830c52
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af830c52
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af830c52

Branch: refs/heads/master
Commit: af830c52469b5288cf4ea688419dd4cdb497dda8
Parents: 255124e
Author: Peter van Gestel <pe...@osudio.com>
Authored: Tue Mar 7 16:54:06 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 7 17:18:43 2017 +0100

----------------------------------------------------------------------
 .../camel/component/aws/sns/SnsEndpoint.java    |  3 +--
 .../camel/component/aws/sns/SnsProducer.java    | 26 +++++++++-----------
 2 files changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/af830c52/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
index 374a420..469ee4a 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
@@ -33,7 +33,6 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.aws.sqs.SqsHeaderFilterStrategy;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -111,7 +110,7 @@ public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
 
         // check the setting the headerFilterStrategy
         if (headerFilterStrategy == null) {
-            headerFilterStrategy = new SqsHeaderFilterStrategy();
+            headerFilterStrategy = new SnsHeaderFilterStrategy();
         }
         
         if (configuration.getTopicArn() == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/af830c52/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
index 087c436..36c4a15 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.aws.sns;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -89,31 +88,28 @@ public class SnsProducer extends DefaultProducer {
     }
 
     private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
-        HashMap result = new HashMap();
-        HeaderFilterStrategy headerFilterStrategy = this.getEndpoint().getHeaderFilterStrategy();
-        Iterator var5 = headers.entrySet().iterator();
-
-        while (var5.hasNext()) {
-            Entry entry = (Entry) var5.next();
-            if (!headerFilterStrategy.applyFilterToCamelHeaders((String) entry.getKey(), entry.getValue(), exchange)) {
+        Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>();
+        HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
+        for (Entry<String, Object> entry : headers.entrySet()) {
+            // only put the message header which is not filtered into the message attribute
+            if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
                 Object value = entry.getValue();
-                MessageAttributeValue mav;
                 if (value instanceof String) {
-                    mav = new MessageAttributeValue();
+                    MessageAttributeValue mav = new MessageAttributeValue();
                     mav.setDataType("String");
-                    mav.withStringValue((String) value);
+                    mav.withStringValue((String)value);
                     result.put(entry.getKey(), mav);
                 } else if (value instanceof ByteBuffer) {
-                    mav = new MessageAttributeValue();
+                    MessageAttributeValue mav = new MessageAttributeValue();
                     mav.setDataType("Binary");
-                    mav.withBinaryValue((ByteBuffer) value);
+                    mav.withBinaryValue((ByteBuffer)value);
                     result.put(entry.getKey(), mav);
                 } else {
-                    LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), entry.getValue());
+                    // cannot translate the message header to message attribute value
+                    LOG.warn("Cannot put the message header key={}, value={} into Sns MessageAttribute", entry.getKey(), entry.getValue());
                 }
             }
         }
-
         return result;
     }
 


[4/4] camel git commit: Regen docs

Posted by da...@apache.org.
Regen docs


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f1a4524
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f1a4524
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f1a4524

Branch: refs/heads/master
Commit: 9f1a4524efbc0d61b8b32d60d84c91c174dac94f
Parents: af830c5
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 7 17:20:43 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 7 17:20:43 2017 +0100

----------------------------------------------------------------------
 components/camel-aws/src/main/docs/aws-sns-component.adoc | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9f1a4524/components/camel-aws/src/main/docs/aws-sns-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-sns-component.adoc b/components/camel-aws/src/main/docs/aws-sns-component.adoc
index c1892cd..bdb7cb5 100644
--- a/components/camel-aws/src/main/docs/aws-sns-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-sns-component.adoc
@@ -50,7 +50,7 @@ The AWS Simple Notification System endpoint is configured using URI syntax:
 | topicNameOrArn |  | String | *Required* Topic name or ARN
 |=======================================================================
 
-#### Query Parameters (10 parameters):
+#### Query Parameters (11 parameters):
 
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -58,6 +58,7 @@ The AWS Simple Notification System endpoint is configured using URI syntax:
 | accessKey | producer |  | String | Amazon AWS Access Key
 | amazonSNSClient | producer |  | AmazonSNS | To use the AmazonSNS as the client
 | amazonSNSEndpoint | producer |  | String | The region with which the AWS-SNS client wants to work with.
+| headerFilterStrategy | producer |  | HeaderFilterStrategy | To use a custom HeaderFilterStrategy to map headers to/from Camel.
 | messageStructure | producer |  | String | The message structure to use such as json
 | policy | producer |  | String | The policy for this queue
 | proxyHost | producer |  | String | To define a proxy host when instantiating the SQS client


[2/4] camel git commit: CAMEL-10953 solve checkstyle issues

Posted by da...@apache.org.
CAMEL-10953 solve checkstyle issues


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/255124eb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/255124eb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/255124eb

Branch: refs/heads/master
Commit: 255124eb83b2ab6c6d154fef438ceb25f5b74d43
Parents: 692ecaa
Author: Peter van Gestel <pe...@osudio.com>
Authored: Mon Mar 6 19:39:04 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 7 17:18:43 2017 +0100

----------------------------------------------------------------------
 .../camel/component/aws/sns/SnsEndpoint.java    |  7 ++-
 .../camel/component/aws/sns/SnsProducer.java    | 54 ++++++++++----------
 2 files changed, 34 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/255124eb/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
index b9276da..374a420 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
@@ -35,7 +35,12 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.aws.sqs.SqsHeaderFilterStrategy;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.spi.*;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/camel/blob/255124eb/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
index 69c11f7..087c436 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
@@ -16,23 +16,24 @@
  */
 package org.apache.camel.component.aws.sns;
 
-import com.amazonaws.services.sns.model.*;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
 
+import com.amazonaws.services.sns.model.MessageAttributeValue;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.URISupport;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
 
 
@@ -43,7 +44,7 @@ import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageFo
 public class SnsProducer extends DefaultProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SnsProducer.class);
-    
+
     private transient String snsProducerToString;
 
     public SnsProducer(Endpoint endpoint) {
@@ -58,13 +59,13 @@ public class SnsProducer extends DefaultProducer {
         request.setMessageStructure(determineMessageStructure(exchange));
         request.setMessage(exchange.getIn().getBody(String.class));
         request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(), exchange));
-        
+
         LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
-        
+
         PublishResult result = getEndpoint().getSNSClient().publish(request);
 
         LOG.trace("Received result [{}]", result);
-        
+
         Message message = getMessageForResponse(exchange);
         message.setHeader(SnsConstants.MESSAGE_ID, result.getMessageId());
     }
@@ -74,7 +75,7 @@ public class SnsProducer extends DefaultProducer {
         if (subject == null) {
             subject = getConfiguration().getSubject();
         }
-        
+
         return subject;
     }
 
@@ -86,25 +87,26 @@ public class SnsProducer extends DefaultProducer {
 
         return structure;
     }
-	private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
+
+    private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
         HashMap result = new HashMap();
         HeaderFilterStrategy headerFilterStrategy = this.getEndpoint().getHeaderFilterStrategy();
         Iterator var5 = headers.entrySet().iterator();
 
-        while(var5.hasNext()) {
-            Entry entry = (Entry)var5.next();
-            if(!headerFilterStrategy.applyFilterToCamelHeaders((String)entry.getKey(), entry.getValue(), exchange)) {
+        while (var5.hasNext()) {
+            Entry entry = (Entry) var5.next();
+            if (!headerFilterStrategy.applyFilterToCamelHeaders((String) entry.getKey(), entry.getValue(), exchange)) {
                 Object value = entry.getValue();
                 MessageAttributeValue mav;
-                if(value instanceof String) {
+                if (value instanceof String) {
                     mav = new MessageAttributeValue();
                     mav.setDataType("String");
-                    mav.withStringValue((String)value);
+                    mav.withStringValue((String) value);
                     result.put(entry.getKey(), mav);
-                } else if(value instanceof ByteBuffer) {
+                } else if (value instanceof ByteBuffer) {
                     mav = new MessageAttributeValue();
                     mav.setDataType("Binary");
-                    mav.withBinaryValue((ByteBuffer)value);
+                    mav.withBinaryValue((ByteBuffer) value);
                     result.put(entry.getKey(), mav);
                 } else {
                     LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), entry.getValue());
@@ -114,11 +116,11 @@ public class SnsProducer extends DefaultProducer {
 
         return result;
     }
-	
+
     protected SnsConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
-    
+
     @Override
     public String toString() {
         if (snsProducerToString == null) {
@@ -126,11 +128,11 @@ public class SnsProducer extends DefaultProducer {
         }
         return snsProducerToString;
     }
-    
+
     @Override
     public SnsEndpoint getEndpoint() {
         return (SnsEndpoint) super.getEndpoint();
     }
-	
-	
+
+
 }
\ No newline at end of file


[3/4] camel git commit: CAMEL-10953 add message attribute support to sns producer

Posted by da...@apache.org.
CAMEL-10953 add message attribute support to sns producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/692ecaad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/692ecaad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/692ecaad

Branch: refs/heads/master
Commit: 692ecaad6e44fa4528a09464c6c230299d13cbb8
Parents: cace0ee
Author: Peter van Gestel <pe...@osudio.com>
Authored: Mon Mar 6 19:23:33 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 7 17:18:43 2017 +0100

----------------------------------------------------------------------
 .../camel/component/aws/sns/SnsEndpoint.java    | 26 ++++++++---
 .../aws/sns/SnsHeaderFilterStrategy.java        | 30 +++++++++++++
 .../camel/component/aws/sns/SnsProducer.java    | 45 ++++++++++++++++++--
 3 files changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
index ac7384b..b9276da 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
@@ -33,11 +33,9 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.aws.sqs.SqsHeaderFilterStrategy;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
+import org.apache.camel.spi.*;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +45,7 @@ import org.slf4j.LoggerFactory;
  */
 @UriEndpoint(firstVersion = "2.8.0", scheme = "aws-sns", title = "AWS Simple Notification System", syntax = "aws-sns:topicNameOrArn",
     producerOnly = true, label = "cloud,mobile,messaging")
-public class SnsEndpoint extends DefaultEndpoint {
+public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(SnsEndpoint.class);
 
@@ -58,6 +56,8 @@ public class SnsEndpoint extends DefaultEndpoint {
     private String topicNameOrArn; // to support component docs
     @UriParam
     private SnsConfiguration configuration;
+    @UriParam
+    private HeaderFilterStrategy headerFilterStrategy;
 
     @Deprecated
     public SnsEndpoint(String uri, CamelContext context, SnsConfiguration configuration) {
@@ -69,6 +69,17 @@ public class SnsEndpoint extends DefaultEndpoint {
         this.configuration = configuration;
     }
 
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        return headerFilterStrategy;
+    }
+
+    /**
+     * To use a custom HeaderFilterStrategy to map headers to/from Camel.
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+        this.headerFilterStrategy = strategy;
+    }
+
     public Consumer createConsumer(Processor processor) throws Exception {
         throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
     }
@@ -92,6 +103,11 @@ public class SnsEndpoint extends DefaultEndpoint {
             LOG.trace("Updating the SNS region with : {} " + configuration.getAmazonSNSEndpoint());
             snsClient.setEndpoint(configuration.getAmazonSNSEndpoint());
         }
+
+        // check the setting the headerFilterStrategy
+        if (headerFilterStrategy == null) {
+            headerFilterStrategy = new SqsHeaderFilterStrategy();
+        }
         
         if (configuration.getTopicArn() == null) {
             try {

http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
new file mode 100644
index 0000000..fb51835
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sns;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class SnsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+    public SnsHeaderFilterStrategy() {
+        initialize();  
+    }
+
+    protected void initialize() {
+        // filter headers begin with "Camel" or "org.apache.camel"
+        setOutFilterPattern("(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
index 5781155..69c11f7 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
@@ -16,17 +16,23 @@
  */
 package org.apache.camel.component.aws.sns;
 
-import com.amazonaws.services.sns.model.PublishRequest;
-import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.*;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.URISupport;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
 
 
@@ -51,7 +57,8 @@ public class SnsProducer extends DefaultProducer {
         request.setSubject(determineSubject(exchange));
         request.setMessageStructure(determineMessageStructure(exchange));
         request.setMessage(exchange.getIn().getBody(String.class));
-
+        request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(), exchange));
+        
         LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
         
         PublishResult result = getEndpoint().getSNSClient().publish(request);
@@ -79,7 +86,35 @@ public class SnsProducer extends DefaultProducer {
 
         return structure;
     }
-    
+	private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
+        HashMap result = new HashMap();
+        HeaderFilterStrategy headerFilterStrategy = this.getEndpoint().getHeaderFilterStrategy();
+        Iterator var5 = headers.entrySet().iterator();
+
+        while(var5.hasNext()) {
+            Entry entry = (Entry)var5.next();
+            if(!headerFilterStrategy.applyFilterToCamelHeaders((String)entry.getKey(), entry.getValue(), exchange)) {
+                Object value = entry.getValue();
+                MessageAttributeValue mav;
+                if(value instanceof String) {
+                    mav = new MessageAttributeValue();
+                    mav.setDataType("String");
+                    mav.withStringValue((String)value);
+                    result.put(entry.getKey(), mav);
+                } else if(value instanceof ByteBuffer) {
+                    mav = new MessageAttributeValue();
+                    mav.setDataType("Binary");
+                    mav.withBinaryValue((ByteBuffer)value);
+                    result.put(entry.getKey(), mav);
+                } else {
+                    LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), entry.getValue());
+                }
+            }
+        }
+
+        return result;
+    }
+	
     protected SnsConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
@@ -96,4 +131,6 @@ public class SnsProducer extends DefaultProducer {
     public SnsEndpoint getEndpoint() {
         return (SnsEndpoint) super.getEndpoint();
     }
+	
+	
 }
\ No newline at end of file