You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/11/04 17:33:20 UTC

[camel] 01/04: CAMEL-17167 - Camel-AWS2-SQS: Message attributes can be at most 10

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 930f6e1ee878e21df671f569267337f1916371a8
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 4 16:33:02 2021 +0100

    CAMEL-17167 - Camel-AWS2-SQS: Message attributes can be at most 10
---
 .../camel/component/aws2/sqs/Sqs2Producer.java     | 95 ++++++++++++----------
 .../SqsProducerSendLocalstackMaxAttributesIT.java  | 70 ++++++++++++++++
 2 files changed, 120 insertions(+), 45 deletions(-)

diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
index aeab1bc..790ed8e 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
@@ -58,6 +58,8 @@ public class Sqs2Producer extends DefaultProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(Sqs2Producer.class);
 
+    private static final int MAX_ATTRIBUTES = 10;
+
     private transient String sqsProducerToString;
 
     public Sqs2Producer(Sqs2Endpoint endpoint) {
@@ -294,53 +296,56 @@ public class Sqs2Producer extends DefaultProducer {
             // only put the message header which is not filtered into the
             // message attribute
             if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
-                Object value = entry.getValue();
-                if (value instanceof String && !((String) value).isEmpty()) {
-                    MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
-                    mav.dataType("String");
-                    mav.stringValue((String) value);
-                    result.put(entry.getKey(), mav.build());
-                } else if (value instanceof ByteBuffer) {
-                    MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
-                    mav.dataType("Binary");
-                    mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value));
-                    result.put(entry.getKey(), mav.build());
-                } else if (value instanceof Boolean) {
-                    MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
-                    mav.dataType("Number.Boolean");
-                    mav.stringValue(((Boolean) value) ? "1" : "0");
-                    result.put(entry.getKey(), mav.build());
-                } else if (value instanceof Number) {
-                    MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
-                    final String dataType;
-                    if (value instanceof Integer) {
-                        dataType = "Number.int";
-                    } else if (value instanceof Byte) {
-                        dataType = "Number.byte";
-                    } else if (value instanceof Double) {
-                        dataType = "Number.double";
-                    } else if (value instanceof Float) {
-                        dataType = "Number.float";
-                    } else if (value instanceof Long) {
-                        dataType = "Number.long";
-                    } else if (value instanceof Short) {
-                        dataType = "Number.short";
+                // We are going to put the first MAX_ATTRIBUTES headers, because this is the maximum Attributes an SQS Message could accept
+                if (result.size() < MAX_ATTRIBUTES) {
+                    Object value = entry.getValue();
+                    if (value instanceof String && !((String) value).isEmpty()) {
+                        MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+                        mav.dataType("String");
+                        mav.stringValue((String) value);
+                        result.put(entry.getKey(), mav.build());
+                    } else if (value instanceof ByteBuffer) {
+                        MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+                        mav.dataType("Binary");
+                        mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value));
+                        result.put(entry.getKey(), mav.build());
+                    } else if (value instanceof Boolean) {
+                        MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+                        mav.dataType("Number.Boolean");
+                        mav.stringValue(((Boolean) value) ? "1" : "0");
+                        result.put(entry.getKey(), mav.build());
+                    } else if (value instanceof Number) {
+                        MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+                        final String dataType;
+                        if (value instanceof Integer) {
+                            dataType = "Number.int";
+                        } else if (value instanceof Byte) {
+                            dataType = "Number.byte";
+                        } else if (value instanceof Double) {
+                            dataType = "Number.double";
+                        } else if (value instanceof Float) {
+                            dataType = "Number.float";
+                        } else if (value instanceof Long) {
+                            dataType = "Number.long";
+                        } else if (value instanceof Short) {
+                            dataType = "Number.short";
+                        } else {
+                            dataType = "Number";
+                        }
+                        mav.dataType(dataType);
+                        mav.stringValue(value.toString());
+                        result.put(entry.getKey(), mav.build());
+                    } else if (value instanceof Date) {
+                        MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+                        mav.dataType("String");
+                        mav.stringValue(value.toString());
+                        result.put(entry.getKey(), mav.build());
                     } else {
-                        dataType = "Number";
+                        // cannot translate the message header to message attribute
+                        // value
+                        LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(),
+                                entry.getValue());
                     }
-                    mav.dataType(dataType);
-                    mav.stringValue(value.toString());
-                    result.put(entry.getKey(), mav.build());
-                } else if (value instanceof Date) {
-                    MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
-                    mav.dataType("String");
-                    mav.stringValue(value.toString());
-                    result.put(entry.getKey(), mav.build());
-                } else {
-                    // cannot translate the message header to message attribute
-                    // value
-                    LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(),
-                            entry.getValue());
                 }
             }
         }
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java
new file mode 100644
index 0000000..544e18a
--- /dev/null
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs.integration;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+public class SqsProducerSendLocalstackMaxAttributesIT extends Aws2SQSBaseTest {
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void sendInOnly() throws Exception {
+        result.expectedMessageCount(1);
+
+        Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader("value1", "value1");
+                exchange.getIn().setHeader("value2", "value2");
+                exchange.getIn().setHeader("value3", "value3");
+                exchange.getIn().setHeader("value4", "value4");
+                exchange.getIn().setHeader("value5", "value5");
+                exchange.getIn().setHeader("value6", "value6");
+                exchange.getIn().setHeader("value7", "value7");
+                exchange.getIn().setHeader("value8", "value8");
+                exchange.getIn().setHeader("value9", "value9");
+                exchange.getIn().setHeader("value10", "value10");
+                exchange.getIn().setHeader("value11", "value11");
+                exchange.getIn().setBody("Test");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").startupOrder(2)
+                        .toF("aws2-sqs://%s?autoCreateQueue=true", sharedNameGenerator.getName());
+
+                fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true", sharedNameGenerator.getName())
+                        .startupOrder(1).log("${body}").to("mock:result");
+            }
+        };
+    }
+}