You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/03/07 10:59:13 UTC
[camel] branch main updated: CAMEL-17602: camel-aws-sqs - consumer should map headers back to thei… (#7127)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new d143b27 CAMEL-17602: camel-aws-sqs - consumer should map headers back to thei… (#7127)
d143b27 is described below
commit d143b278ada22a16630a5bbc3ddf8b88b9e80e27
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 7 11:58:31 2022 +0100
CAMEL-17602: camel-aws-sqs - consumer should map headers back to thei… (#7127)
* CAMEL-17602: camel-aws-sqs - consumer should map headers back to their original type such as boolean, integer etc.
---
.../camel/component/aws2/sqs/Sqs2Consumer.java | 15 +--
.../component/aws2/sqs/Sqs2MessageHelper.java | 109 +++++++++++++++++++++
.../camel/component/aws2/sqs/Sqs2Producer.java | 51 +---------
.../ROOT/pages/camel-3x-upgrade-guide-3_16.adoc | 7 ++
4 files changed, 122 insertions(+), 60 deletions(-)
diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index f50e097..b8a1694 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -312,11 +312,10 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
// Need to apply the SqsHeaderFilterStrategy this time
HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
// add all sqs message attributes as camel message headers so that
- // knowledge of
- // the Sqs class MessageAttributeValue will not leak to the client
+ // knowledge of the Sqs class MessageAttributeValue will not leak to the client
for (Map.Entry<String, MessageAttributeValue> entry : msg.messageAttributes().entrySet()) {
String header = entry.getKey();
- Object value = translateValue(entry.getValue());
+ Object value = Sqs2MessageHelper.fromMessageAttributeValue(entry.getValue());
if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) {
message.setHeader(header, value);
}
@@ -324,16 +323,6 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
return exchange;
}
- private static Object translateValue(MessageAttributeValue mav) {
- Object result = null;
- if (mav.stringValue() != null) {
- result = mav.stringValue();
- } else if (mav.binaryValue() != null) {
- result = mav.binaryValue();
- }
- return result;
- }
-
@Override
public String toString() {
if (sqsConsumerToString == null) {
diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2MessageHelper.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2MessageHelper.java
new file mode 100644
index 0000000..24a587f
--- /dev/null
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2MessageHelper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import java.nio.ByteBuffer;
+import java.util.Date;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+public final class Sqs2MessageHelper {
+
+ private Sqs2MessageHelper() {
+ }
+
+ public static MessageAttributeValue toMessageAttributeValue(Object value) {
+ if (value instanceof String && !((String) value).isEmpty()) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("String");
+ mav.stringValue((String) value);
+ return mav.build();
+ } else if (value instanceof ByteBuffer) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("Binary");
+ mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value));
+ return mav.build();
+ } else if (value instanceof Boolean) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("Number.Boolean");
+ mav.stringValue(((Boolean) value) ? "1" : "0");
+ return mav.build();
+ } else if (value instanceof Number) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ final String dataType;
+ if (value instanceof Integer) {
+ dataType = "Number.int";
+ } else if (value instanceof Byte) {
+ dataType = "Number.byte";
+ } else if (value instanceof Double) {
+ dataType = "Number.double";
+ } else if (value instanceof Float) {
+ dataType = "Number.float";
+ } else if (value instanceof Long) {
+ dataType = "Number.long";
+ } else if (value instanceof Short) {
+ dataType = "Number.short";
+ } else {
+ dataType = "Number";
+ }
+ mav.dataType(dataType);
+ mav.stringValue(value.toString());
+ return mav.build();
+ } else if (value instanceof Date) {
+ MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
+ mav.dataType("String");
+ mav.stringValue(value.toString());
+ return mav.build();
+ }
+
+ return null;
+ }
+
+ public static Object fromMessageAttributeValue(MessageAttributeValue mav) {
+ if (mav == null) {
+ return null;
+ }
+ if (mav.binaryValue() != null) {
+ return mav.binaryValue();
+ } else if (mav.stringValue() != null) {
+ String s = mav.stringValue();
+ String dt = mav.dataType();
+ if (dt == null || "String".equals(dt)) {
+ return s;
+ } else if ("Number.Boolean".equals(dt)) {
+ return "1".equals(s) ? Boolean.TRUE : Boolean.FALSE;
+ } else if ("Number.int".equals(dt)) {
+ return Integer.valueOf(s);
+ } else if ("Number.byte".equals(dt)) {
+ return Byte.valueOf(s);
+ } else if ("Number.double".equals(dt)) {
+ return Double.valueOf(s);
+ } else if ("Number.float".equals(dt)) {
+ return Float.valueOf(s);
+ } else if ("Number.long".equals(dt)) {
+ return Long.valueOf(s);
+ } else if ("Number.short".equals(dt)) {
+ return Short.valueOf(s);
+ }
+ return s;
+ }
+
+ return null;
+ }
+
+}
diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
index d9ec0a3..3ca128b 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Producer.java
@@ -16,10 +16,8 @@
*/
package org.apache.camel.component.aws2.sqs;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -33,7 +31,6 @@ import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
@@ -302,51 +299,11 @@ public class Sqs2Producer extends DefaultProducer {
if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
// We are going to put the first MAX_ATTRIBUTES headers, because this is the maximum Attributes an SQS Message could accept
if (result.size() < MAX_ATTRIBUTES) {
- Object value = entry.getValue();
- if (value instanceof String && !((String) value).isEmpty()) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("String");
- mav.stringValue((String) value);
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof ByteBuffer) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("Binary");
- mav.binaryValue(SdkBytes.fromByteBuffer((ByteBuffer) value));
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof Boolean) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("Number.Boolean");
- mav.stringValue(((Boolean) value) ? "1" : "0");
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof Number) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- final String dataType;
- if (value instanceof Integer) {
- dataType = "Number.int";
- } else if (value instanceof Byte) {
- dataType = "Number.byte";
- } else if (value instanceof Double) {
- dataType = "Number.double";
- } else if (value instanceof Float) {
- dataType = "Number.float";
- } else if (value instanceof Long) {
- dataType = "Number.long";
- } else if (value instanceof Short) {
- dataType = "Number.short";
- } else {
- dataType = "Number";
- }
- mav.dataType(dataType);
- mav.stringValue(value.toString());
- result.put(entry.getKey(), mav.build());
- } else if (value instanceof Date) {
- MessageAttributeValue.Builder mav = MessageAttributeValue.builder();
- mav.dataType("String");
- mav.stringValue(value.toString());
- result.put(entry.getKey(), mav.build());
+ MessageAttributeValue mav = Sqs2MessageHelper.toMessageAttributeValue(entry.getValue());
+ if (mav != null) {
+ result.put(entry.getKey(), mav);
} else {
- // cannot translate the message header to message attribute
- // value
+ // cannot translate the message header to message attribute value
LOG.warn("Cannot put the message header key={}, value={} into Sqs MessageAttribute", entry.getKey(),
entry.getValue());
}
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_16.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_16.adoc
index 0e54c46..e6cde7c 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_16.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_16.adoc
@@ -321,6 +321,13 @@ and the `configure` method now takes a `CamelContext` as argument.
The option `baseURI` is renamed to `baseUri`.
+=== camel-aws
+
+The `camel-aws2-sqs` component will now map message headers from their original type
+such as boolean, integer, etc. This requires using Camel for both sending and receiving
+as AWS only have string or binary types, so Camel stores custom metadata in the message header
+to know its original type.
+
=== camel-stream
The producer will now by default append new line character to end of output.