You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/07 16:20:56 UTC
[3/4] camel git commit: CAMEL-10953 add message attribute support to
sns producer
CAMEL-10953 add message attribute support to sns producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/692ecaad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/692ecaad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/692ecaad
Branch: refs/heads/master
Commit: 692ecaad6e44fa4528a09464c6c230299d13cbb8
Parents: cace0ee
Author: Peter van Gestel <pe...@osudio.com>
Authored: Mon Mar 6 19:23:33 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 7 17:18:43 2017 +0100
----------------------------------------------------------------------
.../camel/component/aws/sns/SnsEndpoint.java | 26 ++++++++---
.../aws/sns/SnsHeaderFilterStrategy.java | 30 +++++++++++++
.../camel/component/aws/sns/SnsProducer.java | 45 ++++++++++++++++++--
3 files changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
index ac7384b..b9276da 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsEndpoint.java
@@ -33,11 +33,9 @@ import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.component.aws.sqs.SqsHeaderFilterStrategy;
import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
+import org.apache.camel.spi.*;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +45,7 @@ import org.slf4j.LoggerFactory;
*/
@UriEndpoint(firstVersion = "2.8.0", scheme = "aws-sns", title = "AWS Simple Notification System", syntax = "aws-sns:topicNameOrArn",
producerOnly = true, label = "cloud,mobile,messaging")
-public class SnsEndpoint extends DefaultEndpoint {
+public class SnsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
private static final Logger LOG = LoggerFactory.getLogger(SnsEndpoint.class);
@@ -58,6 +56,8 @@ public class SnsEndpoint extends DefaultEndpoint {
private String topicNameOrArn; // to support component docs
@UriParam
private SnsConfiguration configuration;
+ @UriParam
+ private HeaderFilterStrategy headerFilterStrategy;
@Deprecated
public SnsEndpoint(String uri, CamelContext context, SnsConfiguration configuration) {
@@ -69,6 +69,17 @@ public class SnsEndpoint extends DefaultEndpoint {
this.configuration = configuration;
}
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return headerFilterStrategy;
+ }
+
+ /**
+ * To use a custom HeaderFilterStrategy to map headers to/from Camel.
+ */
+ public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+ this.headerFilterStrategy = strategy;
+ }
+
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
}
@@ -92,6 +103,11 @@ public class SnsEndpoint extends DefaultEndpoint {
LOG.trace("Updating the SNS region with : {} " + configuration.getAmazonSNSEndpoint());
snsClient.setEndpoint(configuration.getAmazonSNSEndpoint());
}
+
+ // check the setting the headerFilterStrategy
+ if (headerFilterStrategy == null) {
+ headerFilterStrategy = new SqsHeaderFilterStrategy();
+ }
if (configuration.getTopicArn() == null) {
try {
http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
new file mode 100644
index 0000000..fb51835
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsHeaderFilterStrategy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sns;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class SnsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+ public SnsHeaderFilterStrategy() {
+ initialize();
+ }
+
+ protected void initialize() {
+ // filter headers begin with "Camel" or "org.apache.camel"
+ setOutFilterPattern("(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/692ecaad/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
index 5781155..69c11f7 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sns/SnsProducer.java
@@ -16,17 +16,23 @@
*/
package org.apache.camel.component.aws.sns;
-import com.amazonaws.services.sns.model.PublishRequest;
-import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.*;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.URISupport;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
@@ -51,7 +57,8 @@ public class SnsProducer extends DefaultProducer {
request.setSubject(determineSubject(exchange));
request.setMessageStructure(determineMessageStructure(exchange));
request.setMessage(exchange.getIn().getBody(String.class));
-
+ request.setMessageAttributes(this.translateAttributes(exchange.getIn().getHeaders(), exchange));
+
LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
PublishResult result = getEndpoint().getSNSClient().publish(request);
@@ -79,7 +86,35 @@ public class SnsProducer extends DefaultProducer {
return structure;
}
-
+ private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) {
+ HashMap result = new HashMap();
+ HeaderFilterStrategy headerFilterStrategy = this.getEndpoint().getHeaderFilterStrategy();
+ Iterator var5 = headers.entrySet().iterator();
+
+ while(var5.hasNext()) {
+ Entry entry = (Entry)var5.next();
+ if(!headerFilterStrategy.applyFilterToCamelHeaders((String)entry.getKey(), entry.getValue(), exchange)) {
+ Object value = entry.getValue();
+ MessageAttributeValue mav;
+ if(value instanceof String) {
+ mav = new MessageAttributeValue();
+ mav.setDataType("String");
+ mav.withStringValue((String)value);
+ result.put(entry.getKey(), mav);
+ } else if(value instanceof ByteBuffer) {
+ mav = new MessageAttributeValue();
+ mav.setDataType("Binary");
+ mav.withBinaryValue((ByteBuffer)value);
+ result.put(entry.getKey(), mav);
+ } else {
+ LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ return result;
+ }
+
protected SnsConfiguration getConfiguration() {
return getEndpoint().getConfiguration();
}
@@ -96,4 +131,6 @@ public class SnsProducer extends DefaultProducer {
public SnsEndpoint getEndpoint() {
return (SnsEndpoint) super.getEndpoint();
}
+
+
}
\ No newline at end of file