You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/03/29 12:28:07 UTC
[camel-kamelets] 02/02: chore: Kafka source uses local bean for header deserialization
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
commit 7c959e1cb8c5dba287098f936c801b1f26d340dd
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Mon Mar 21 20:07:28 2022 +0100
chore: Kafka source uses local bean for header deserialization
---
kamelets/kafka-source.kamelet.yaml | 14 ++++++++------
.../serialization/kafka/KafkaHeaderDeserializer.java | 15 +++++++++++----
.../serialization/kafka/KafkaHeaderDeserializerTest.java | 12 ++++++++----
.../src/main/resources/kamelets/kafka-source.kamelet.yaml | 14 ++++++++------
4 files changed, 35 insertions(+), 20 deletions(-)
diff --git a/kamelets/kafka-source.kamelet.yaml b/kamelets/kafka-source.kamelet.yaml
index 6bce42c..d7518b2 100644
--- a/kamelets/kafka-source.kamelet.yaml
+++ b/kamelets/kafka-source.kamelet.yaml
@@ -130,6 +130,12 @@ spec:
- "camel:kafka"
- "camel:kamelet"
template:
+ beans:
+ - name: kafkaHeaderDeserializer
+ type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+ property:
+ - key: enabled
+ value: '{{deserializeHeaders}}'
from:
uri: "kafka:{{topic}}"
parameters:
@@ -143,10 +149,6 @@ spec:
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- - set-property:
- name: deserializeHeaders
- constant: "{{deserializeHeaders}}"
- - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
- - remove-property:
- name: deserializeHeaders
+ - process:
+ ref: "{{kafkaHeaderDeserializer}}"
- to: "kamelet:sink"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
index 7cab1ee..3fc24cc 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
+import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.support.SimpleTypeConverter;
@@ -31,12 +31,15 @@ import org.apache.camel.support.SimpleTypeConverter;
* Uses given type converter implementation set on the Camel context to convert values. If no type converter is set
* the deserializer uses its own fallback conversion implementation.
*/
-public class KafkaHeaderDeserializer {
+public class KafkaHeaderDeserializer implements Processor {
+
+ boolean enabled = false;
private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);
- public void process(@ExchangeProperty("deserializeHeaders") boolean deserializeHeaders, Exchange exchange) throws Exception {
- if (!deserializeHeaders) {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (!enabled) {
return;
}
@@ -86,4 +89,8 @@ public class KafkaHeaderDeserializer {
private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
}
+
+ public void setEnabled(String enabled) {
+ this.enabled = Boolean.parseBoolean(enabled);
+ }
}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
index d5d92f5..2d7e3bd 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
@@ -48,7 +48,8 @@ class KafkaHeaderDeserializerTest {
exchange.getMessage().setHeader("fooNull", null);
exchange.getMessage().setHeader("number", 1L);
- processor.process(true, exchange);
+ processor.enabled = true;
+ processor.process(exchange);
Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
@@ -68,7 +69,8 @@ class KafkaHeaderDeserializerTest {
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
exchange.getMessage().setHeader("fooNull", null);
- processor.process(true, exchange);
+ processor.enabled = true;
+ processor.process(exchange);
Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo"));
@@ -84,7 +86,8 @@ class KafkaHeaderDeserializerTest {
exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
- processor.process(true, exchange);
+ processor.enabled = true;
+ processor.process(exchange);
Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
@@ -98,7 +101,8 @@ class KafkaHeaderDeserializerTest {
exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
- processor.process(false, exchange);
+ processor.enabled = false;
+ processor.process(exchange);
Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
index 6bce42c..d7518b2 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
@@ -130,6 +130,12 @@ spec:
- "camel:kafka"
- "camel:kamelet"
template:
+ beans:
+ - name: kafkaHeaderDeserializer
+ type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+ property:
+ - key: enabled
+ value: '{{deserializeHeaders}}'
from:
uri: "kafka:{{topic}}"
parameters:
@@ -143,10 +149,6 @@ spec:
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- - set-property:
- name: deserializeHeaders
- constant: "{{deserializeHeaders}}"
- - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
- - remove-property:
- name: deserializeHeaders
+ - process:
+ ref: "{{kafkaHeaderDeserializer}}"
- to: "kamelet:sink"