You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/07 16:20:56 UTC

[3/4] camel git commit: CAMEL-10953 add message attribute support to sns producer

CAMEL-10953 add message attribute support to sns producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/692ecaad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/692ecaad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/692ecaad

Branch: refs/heads/master
Commit: 692ecaad6e44fa4528a09464c6c230299d13cbb8
Parents: cace0ee
Author: Peter van Gestel <pe...@osudio.com>
Authored: Mon Mar 6 19:23:33 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 7 17:18:43 2017 +0100

----------------------------------------------------------------------
 .../camel/component/aws/sns/SnsEndpoint.java    | 26 ++++++++---
 .../aws/sns/SnsHeaderFilterStrategy.java        | 30 +++++++++++++
 .../camel/component/aws/sns/SnsProducer.java    | 45 ++++++++++++++++++--
 3 files changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
index ac7384b..b9276da 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
@@ -33,11 +33,9 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.aws.sqs.SqsHeaderFilterStrategy;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
+import org.apache.camel.spi.*;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +45,7 @@ import org.slf4j.LoggerFactory;
  */
 @UriEndpoint(firstVersion = "2.8.0", scheme = "aws-sns", title = "AWS Simple Notification System", syntax = "aws-sns:topicNameOrArn",
     producerOnly = true, label = "cloud,mobile,messaging")
-public class SnsEndpoint extends DefaultEndpoint {
+public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(SnsEndpoint.class);
 
@@ -58,6 +56,8 @@ public class SnsEndpoint extends DefaultEndpoint {
     private String topicNameOrArn; // to support component docs
     @UriParam
     private SnsConfiguration configuration;
+    @UriParam
+    private HeaderFilterStrategy headerFilterStrategy;
 
     @Deprecated
     public SnsEndpoint(String uri, CamelContext context, SnsConfiguration configuration) {
@@ -69,6 +69,17 @@ public class SnsEndpoint extends DefaultEndpoint {
         this.configuration = configuration;
     }
 
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        return headerFilterStrategy;
+    }
+
+    /**
+     * To use a custom HeaderFilterStrategy to map headers to/from Camel.
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+        this.headerFilterStrategy = strategy;
+    }
+
     public Consumer createConsumer(Processor processor) throws Exception {
         throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
     }
@@ -92,6 +103,11 @@ public class SnsEndpoint extends DefaultEndpoint {
             LOG.trace("Updating the SNS region with : {} " + configuration.getAmazonSNSEndpoint());
             snsClient.setEndpoint(configuration.getAmazonSNSEndpoint());
         }
+
+        // check the setting the headerFilterStrategy
+        if (headerFilterStrategy == null) {
+            headerFilterStrategy = new SqsHeaderFilterStrategy();
+        }
         
         if (configuration.getTopicArn() == null) {
             try {

http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
new file mode 100644
index 0000000..fb51835
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sns;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class SnsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+    public SnsHeaderFilterStrategy() {
+        initialize();  
+    }
+
+    protected void initialize() {
+        // filter headers begin with "Camel" or "org.apache.camel"
+        setOutFilterPattern("(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
index 5781155..69c11f7 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
@@ -16,17 +16,23 @@
  */
 package org.apache.camel.component.aws.sns;
 
-import com.amazonaws.services.sns.model.PublishRequest;
-import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.*;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.URISupport;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
 
 
@@ -51,7 +57,8 @@ public class SnsProducer extends DefaultProducer {
         request.setSubject(determineSubject(exchange));
         request.setMessageStructure(determineMessageStructure(exchange));
         request.setMessage(exchange.getIn().getBody(String.class));
-
+        request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(), exchange));
+        
         LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
         
         PublishResult result = getEndpoint().getSNSClient().publish(request);
@@ -79,7 +86,35 @@ public class SnsProducer extends DefaultProducer {
 
         return structure;
     }
-    
+	private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
+        HashMap result = new HashMap();
+        HeaderFilterStrategy headerFilterStrategy = this.getEndpoint().getHeaderFilterStrategy();
+        Iterator var5 = headers.entrySet().iterator();
+
+        while(var5.hasNext()) {
+            Entry entry = (Entry)var5.next();
+            if(!headerFilterStrategy.applyFilterToCamelHeaders((String)entry.getKey(), entry.getValue(), exchange)) {
+                Object value = entry.getValue();
+                MessageAttributeValue mav;
+                if(value instanceof String) {
+                    mav = new MessageAttributeValue();
+                    mav.setDataType("String");
+                    mav.withStringValue((String)value);
+                    result.put(entry.getKey(), mav);
+                } else if(value instanceof ByteBuffer) {
+                    mav = new MessageAttributeValue();
+                    mav.setDataType("Binary");
+                    mav.withBinaryValue((ByteBuffer)value);
+                    result.put(entry.getKey(), mav);
+                } else {
+                    LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), entry.getValue());
+                }
+            }
+        }
+
+        return result;
+    }
+	
     protected SnsConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
@@ -96,4 +131,6 @@ public class SnsProducer extends DefaultProducer {
     public SnsEndpoint getEndpoint() {
         return (SnsEndpoint) super.getEndpoint();
     }
+	
+	
 }
\ No newline at end of file