You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ig...@apache.org on 2022/08/11 10:52:12 UTC
[camel] branch main updated: CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146)
This is an automated email from the ASF dual-hosted git repository.
igarashitm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 8c33b7beb1a CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146)
8c33b7beb1a is described below
commit 8c33b7beb1a544d2308090c48df3d75f34847de5
Author: Tomohisa Igarashi <tm...@gmail.com>
AuthorDate: Thu Aug 11 06:52:03 2022 -0400
CAMEL-18380: camel-kafka: Support using TypeConverter as a fallback in the header serializer (#8146)
---
components/camel-kafka/pom.xml | 6 ++++++
.../kafka/serde/DefaultKafkaHeaderSerializer.java | 22 +++++++++++++++++++++-
.../serde/DefaultKafkaHeaderSerializerTest.java | 6 +++++-
3 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index b25d7acb729..2cf93c82245 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -76,6 +76,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jackson</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- Required by the admin client-->
<dependency>
<groupId>org.apache.camel</groupId>
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
index c9d6b5f0fb7..79846df5843 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
@@ -18,12 +18,15 @@ package org.apache.camel.component.kafka.serde;
import java.nio.ByteBuffer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer {
+public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaHeaderSerializer.class);
+ private CamelContext camelContext;
@Override
public byte[] serialize(final String key, final Object value) {
@@ -46,9 +49,26 @@ public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer {
} else if (value instanceof byte[]) {
return (byte[]) value;
}
+ if (camelContext != null) {
+ byte[] converted = camelContext.getTypeConverter().tryConvertTo(byte[].class, value);
+ if (converted != null) {
+ return converted;
+ }
+ }
+
LOG.debug("Cannot propagate header value of type[{}], skipping... "
+ "Supported types: String, Integer, Long, Double, byte[].",
value != null ? value.getClass() : "null");
return null;
}
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
index 69de631b0e3..9cfe65dc42f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.kafka.serde;
import java.util.Arrays;
import java.util.Collection;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -26,11 +28,12 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
public class DefaultKafkaHeaderSerializerTest {
- private KafkaHeaderSerializer serializer = new DefaultKafkaHeaderSerializer();
+ private DefaultKafkaHeaderSerializer serializer = new DefaultKafkaHeaderSerializer();
@ParameterizedTest
@MethodSource("primeNumbers")
public void serialize(Object value, byte[] expectedResult) {
+ serializer.setCamelContext(new DefaultCamelContext());
byte[] result = serializer.serialize("someKey", value);
assertArrayEquals(expectedResult, result);
@@ -44,6 +47,7 @@ public class DefaultKafkaHeaderSerializerTest {
{ 22.0D, new byte[] { 64, 54, 0, 0, 0, 0, 0, 0 } }, // double
{ "someValue", "someValue".getBytes() }, // string
{ new byte[] { 0, 2, -43 }, new byte[] { 0, 2, -43 } }, // byte[]
+ { new TextNode("foo"), "foo".getBytes() }, // jackson TextNode
{ null, null }, // null
{ new Object(), null } // unknown
// type