You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ig...@apache.org on 2022/08/11 10:52:12 UTC

[camel] branch main updated: CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146)

This is an automated email from the ASF dual-hosted git repository.

igarashitm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c33b7beb1a CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146)
8c33b7beb1a is described below

commit 8c33b7beb1a544d2308090c48df3d75f34847de5
Author: Tomohisa Igarashi <tm...@gmail.com>
AuthorDate: Thu Aug 11 06:52:03 2022 -0400

    CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146)
---
 components/camel-kafka/pom.xml                     |  6 ++++++
 .../kafka/serde/DefaultKafkaHeaderSerializer.java  | 22 +++++++++++++++++++++-
 .../serde/DefaultKafkaHeaderSerializerTest.java    |  6 +++++-
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index b25d7acb729..2cf93c82245 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -76,6 +76,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jackson</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- Required by the admin client-->
         <dependency>
             <groupId>org.apache.camel</groupId>
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
index c9d6b5f0fb7..79846df5843 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
@@ -18,12 +18,15 @@ package org.apache.camel.component.kafka.serde;
 
 import java.nio.ByteBuffer;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer {
+public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaHeaderSerializer.class);
+    private CamelContext camelContext;
 
     @Override
     public byte[] serialize(final String key, final Object value) {
@@ -46,9 +49,26 @@ public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer {
         } else if (value instanceof byte[]) {
             return (byte[]) value;
         }
+        if (camelContext != null) {
+            byte[] converted = camelContext.getTypeConverter().tryConvertTo(byte[].class, value);
+            if (converted != null) {
+                return converted;
+            }
+        }
+
         LOG.debug("Cannot propagate header value of type[{}], skipping... "
                   + "Supported types: String, Integer, Long, Double, byte[].",
                 value != null ? value.getClass() : "null");
         return null;
     }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
index 69de631b0e3..9cfe65dc42f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.kafka.serde;
 import java.util.Arrays;
 import java.util.Collection;
 
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
@@ -26,11 +28,12 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 
 public class DefaultKafkaHeaderSerializerTest {
 
-    private KafkaHeaderSerializer serializer = new DefaultKafkaHeaderSerializer();
+    private DefaultKafkaHeaderSerializer serializer = new DefaultKafkaHeaderSerializer();
 
     @ParameterizedTest
     @MethodSource("primeNumbers")
     public void serialize(Object value, byte[] expectedResult) {
+        serializer.setCamelContext(new DefaultCamelContext());
         byte[] result = serializer.serialize("someKey", value);
 
         assertArrayEquals(expectedResult, result);
@@ -44,6 +47,7 @@ public class DefaultKafkaHeaderSerializerTest {
                 { 22.0D, new byte[] { 64, 54, 0, 0, 0, 0, 0, 0 } }, // double
                 { "someValue", "someValue".getBytes() }, // string
                 { new byte[] { 0, 2, -43 }, new byte[] { 0, 2, -43 } }, // byte[]
+                { new TextNode("foo"), "foo".getBytes() }, // jackson TextNode
                 { null, null }, // null
                 { new Object(), null } // unknown
                                       // type