You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/11/04 17:50:54 UTC
[camel] 01/03: CAMEL-17167 - Camel-AWS2-SQS: Message attributes can
be at most 10
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit e99f81cd233f81eece7a0f56e7e2c40b9d9f3d81
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 4 16:33:02 2021 +0100
CAMEL-17167 - Camel-AWS2-SQS: Message attributes can be at most 10
---
.../camel/component/aws2/sqs/Sqs2Producer.java | 95 ++++++++++++----------
.../SqsProducerSendLocalstackMaxAttributesIT.java | 70 ++++++++++++++++
2 files changed, 120 insertions(+), 45 deletions(-)
diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
index e168517..710f495 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
@@ -56,6 +56,8 @@ public class Sqs2Producer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(Sqs2Producer.class);
+ private static final int MAX_ATTRIBUTES = 10;
+
private transient String sqsProducerToString;
public Sqs2Producer(Sqs2Endpoint endpoint) {
@@ -279,53 +281,56 @@ public class Sqs2Producer extends DefaultProducer {
// only put the message header which is not filtered into the
// message attribute
if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
- Object value = entry.getValue();
- if (value instanceof String && !((String) value).isEmpty()) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("String");
- mav.stringValue((String) value);
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof ByteBuffer) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("Binary");
- mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value));
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof Boolean) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("Number.Boolean");
- mav.stringValue(((Boolean) value) ? "1" : "0");
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof Number) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- final String dataType;
- if (value instanceof Integer) {
- dataType = "Number.int";
- } else if (value instanceof Byte) {
- dataType = "Number.byte";
- } else if (value instanceof Double) {
- dataType = "Number.double";
- } else if (value instanceof Float) {
- dataType = "Number.float";
- } else if (value instanceof Long) {
- dataType = "Number.long";
- } else if (value instanceof Short) {
- dataType = "Number.short";
+ // We are going to put the first MAX_ATTRIBUTES headers, because this is the maximum Attributes an SQS Message could accept
+ if (result.size() < MAX_ATTRIBUTES) {
+ Object value = entry.getValue();
+ if (value instanceof String && !((String) value).isEmpty()) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("String");
+ mav.stringValue((String) value);
+ result.put(entry.getKey(), mav.build());
+ } else if (value instanceof ByteBuffer) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("Binary");
+ mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value));
+ result.put(entry.getKey(), mav.build());
+ } else if (value instanceof Boolean) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("Number.Boolean");
+ mav.stringValue(((Boolean) value) ? "1" : "0");
+ result.put(entry.getKey(), mav.build());
+ } else if (value instanceof Number) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ final String dataType;
+ if (value instanceof Integer) {
+ dataType = "Number.int";
+ } else if (value instanceof Byte) {
+ dataType = "Number.byte";
+ } else if (value instanceof Double) {
+ dataType = "Number.double";
+ } else if (value instanceof Float) {
+ dataType = "Number.float";
+ } else if (value instanceof Long) {
+ dataType = "Number.long";
+ } else if (value instanceof Short) {
+ dataType = "Number.short";
+ } else {
+ dataType = "Number";
+ }
+ mav.dataType(dataType);
+ mav.stringValue(value.toString());
+ result.put(entry.getKey(), mav.build());
+ } else if (value instanceof Date) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("String");
+ mav.stringValue(value.toString());
+ result.put(entry.getKey(), mav.build());
} else {
- dataType = "Number";
+ // cannot translate the message header to message attribute
+ // value
+ LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(),
+ entry.getValue());
}
- mav.dataType(dataType);
- mav.stringValue(value.toString());
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof Date) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("String");
- mav.stringValue(value.toString());
- result.put(entry.getKey(), mav.build());
- } else {
- // cannot translate the message header to message attribute
- // value
- LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(),
- entry.getValue());
}
}
}
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java
new file mode 100644
index 0000000..544e18a
--- /dev/null
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendLocalstackMaxAttributesIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs.integration;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+public class SqsProducerSendLocalstackMaxAttributesIT extends Aws2SQSBaseTest {
+
+ @EndpointInject("direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendInOnly() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader("value1", "value1");
+ exchange.getIn().setHeader("value2", "value2");
+ exchange.getIn().setHeader("value3", "value3");
+ exchange.getIn().setHeader("value4", "value4");
+ exchange.getIn().setHeader("value5", "value5");
+ exchange.getIn().setHeader("value6", "value6");
+ exchange.getIn().setHeader("value7", "value7");
+ exchange.getIn().setHeader("value8", "value8");
+ exchange.getIn().setHeader("value9", "value9");
+ exchange.getIn().setHeader("value10", "value10");
+ exchange.getIn().setHeader("value11", "value11");
+ exchange.getIn().setBody("Test");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").startupOrder(2)
+ .toF("aws2-sqs://%s?autoCreateQueue=true", sharedNameGenerator.getName());
+
+ fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true", sharedNameGenerator.getName())
+ .startupOrder(1).log("${body}").to("mock:result");
+ }
+ };
+ }
+}