You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/03/29 12:28:07 UTC

[camel-kamelets] 02/02: chore: Kafka source uses local bean for header deserialization

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit 7c959e1cb8c5dba287098f936c801b1f26d340dd
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Mon Mar 21 20:07:28 2022 +0100

    chore: Kafka source uses local bean for header deserialization
---
 kamelets/kafka-source.kamelet.yaml                        | 14 ++++++++------
 .../serialization/kafka/KafkaHeaderDeserializer.java      | 15 +++++++++++----
 .../serialization/kafka/KafkaHeaderDeserializerTest.java  | 12 ++++++++----
 .../src/main/resources/kamelets/kafka-source.kamelet.yaml | 14 ++++++++------
 4 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/kamelets/kafka-source.kamelet.yaml b/kamelets/kafka-source.kamelet.yaml
index 6bce42c..d7518b2 100644
--- a/kamelets/kafka-source.kamelet.yaml
+++ b/kamelets/kafka-source.kamelet.yaml
@@ -130,6 +130,12 @@ spec:
     - "camel:kafka"
     - "camel:kamelet"
   template:
+    beans:
+      - name: kafkaHeaderDeserializer
+        type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        property:
+          - key: enabled
+            value: '{{deserializeHeaders}}'
     from:
       uri: "kafka:{{topic}}"
       parameters:
@@ -143,10 +149,6 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-        - set-property:
-            name: deserializeHeaders
-            constant: "{{deserializeHeaders}}"
-        - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
-        - remove-property:
-            name: deserializeHeaders
+        - process:
+            ref: "{{kafkaHeaderDeserializer}}"
         - to: "kamelet:sink"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
index 7cab1ee..3fc24cc 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
+import org.apache.camel.Processor;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.support.SimpleTypeConverter;
@@ -31,12 +31,15 @@ import org.apache.camel.support.SimpleTypeConverter;
  * Uses given type converter implementation set on the Camel context to convert values. If no type converter is set
  * the deserializer uses its own fallback conversion implementation.
  */
-public class KafkaHeaderDeserializer {
+public class KafkaHeaderDeserializer implements Processor {
+
+    boolean enabled = false;
 
     private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);
 
-    public void process(@ExchangeProperty("deserializeHeaders") boolean deserializeHeaders, Exchange exchange) throws Exception {
-        if (!deserializeHeaders) {
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (!enabled) {
             return;
         }
 
@@ -86,4 +89,8 @@ public class KafkaHeaderDeserializer {
     private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
         return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
     }
+
+    public void setEnabled(String enabled) {
+        this.enabled = Boolean.parseBoolean(enabled);
+    }
 }
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
index d5d92f5..2d7e3bd 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
@@ -48,7 +48,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("fooNull", null);
         exchange.getMessage().setHeader("number", 1L);
 
-        processor.process(true, exchange);
+        processor.enabled = true;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
@@ -68,7 +69,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
         exchange.getMessage().setHeader("fooNull", null);
 
-        processor.process(true, exchange);
+        processor.enabled = true;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo"));
@@ -84,7 +86,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("foo", "bar");
         exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
 
-        processor.process(true, exchange);
+        processor.enabled = true;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
@@ -98,7 +101,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("foo", "bar");
         exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
 
-        processor.process(false, exchange);
+        processor.enabled = false;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
index 6bce42c..d7518b2 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
@@ -130,6 +130,12 @@ spec:
     - "camel:kafka"
     - "camel:kamelet"
   template:
+    beans:
+      - name: kafkaHeaderDeserializer
+        type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        property:
+          - key: enabled
+            value: '{{deserializeHeaders}}'
     from:
       uri: "kafka:{{topic}}"
       parameters:
@@ -143,10 +149,6 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-        - set-property:
-            name: deserializeHeaders
-            constant: "{{deserializeHeaders}}"
-        - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
-        - remove-property:
-            name: deserializeHeaders
+        - process:
+            ref: "{{kafkaHeaderDeserializer}}"
         - to: "kamelet:sink"