You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/03/29 12:28:05 UTC

[camel-kamelets] branch main updated (1e6dbbf -> 7c959e1)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git.


    from 1e6dbbf  AWS EC2: Introduce optional overrideEndpoint and uriEndpointOverride for testing purpose
     new e572451  chore: Add header deserialize option on Kafka source
     new 7c959e1  chore: Kafka source uses local bean for header deserialization

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/yaks-tests.yaml                  |   1 +
 kamelets/kafka-source.kamelet.yaml                 |  23 ++++-
 .../kafka/KafkaHeaderDeserializer.java             |  96 ++++++++++++++++++
 .../kafka/KafkaHeaderDeserializerTest.java         | 112 +++++++++++++++++++++
 .../resources/kamelets/kafka-source.kamelet.yaml   |  23 ++++-
 docs/local-build.sh => test/kafka/install.sh       |  18 ++--
 .../kafka-sink-test.yaml}                          |  21 ++--
 .../kafka-sink.feature}                            |  42 ++++----
 .../kafka-source-test.yaml}                        |  24 ++---
 test/kafka/kafka-source.feature                    |  66 ++++++++++++
 docs/source-watch.yml => test/kafka/uninstall.sh   |  15 +--
 test/{mail-sink => kafka}/yaks-config.yaml         |  23 +++--
 12 files changed, 388 insertions(+), 76 deletions(-)
 create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
 create mode 100644 library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
 copy docs/local-build.sh => test/kafka/install.sh (62%)
 mode change 100755 => 100644
 copy test/{mail-sink/timer-to-mail.yaml => kafka/kafka-sink-test.yaml} (78%)
 copy test/{insert-field-action/insert-field-action.feature => kafka/kafka-sink.feature} (54%)
 copy test/{mail-sink/timer-to-mail.yaml => kafka/kafka-source-test.yaml} (75%)
 create mode 100644 test/kafka/kafka-source.feature
 copy docs/source-watch.yml => test/kafka/uninstall.sh (80%)
 copy test/{mail-sink => kafka}/yaks-config.yaml (77%)

[camel-kamelets] 01/02: chore: Add header deserialize option on Kafka source

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit e5724510000d173d9d24a0f01326e58c6db288c6
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Thu Mar 17 19:57:06 2022 +0100

    chore: Add header deserialize option on Kafka source
    
    - Adds utility class to auto deserialize message headers from byte[] to String
    - Option must be explicitly enabled on the source Kamelet
    - Exclude non String Kafka headers from deserialization (kafka.HEADERS and CamelKafkaManualCommit)
---
 .github/workflows/yaks-tests.yaml                  |   1 +
 kamelets/kafka-source.kamelet.yaml                 |  21 +++-
 .../kafka/KafkaHeaderDeserializer.java             |  89 +++++++++++++++++
 .../kafka/KafkaHeaderDeserializerTest.java         | 108 +++++++++++++++++++++
 .../resources/kamelets/kafka-source.kamelet.yaml   |  21 +++-
 test/kafka/install.sh                              |  28 ++++++
 test/kafka/kafka-sink-test.yaml                    |  42 ++++++++
 test/kafka/kafka-sink.feature                      |  45 +++++++++
 test/kafka/kafka-source-test.yaml                  |  37 +++++++
 test/kafka/kafka-source.feature                    |  66 +++++++++++++
 test/kafka/uninstall.sh                            |  22 +++++
 test/kafka/yaks-config.yaml                        |  48 +++++++++
 12 files changed, 522 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml
index b4b7f91..e4b2d9d 100644
--- a/.github/workflows/yaks-tests.yaml
+++ b/.github/workflows/yaks-tests.yaml
@@ -115,6 +115,7 @@ jobs:
         yaks run test/timer-source $YAKS_RUN_OPTIONS
         yaks run test/earthquake-source $YAKS_RUN_OPTIONS
         yaks run test/rest-openapi-sink $YAKS_RUN_OPTIONS
+        yaks run test/kafka $YAKS_RUN_OPTIONS
     - name: YAKS Report
       if: failure()
       run: |
diff --git a/kamelets/kafka-source.kamelet.yaml b/kamelets/kafka-source.kamelet.yaml
index 82607c9..6bce42c 100644
--- a/kamelets/kafka-source.kamelet.yaml
+++ b/kamelets/kafka-source.kamelet.yaml
@@ -62,12 +62,12 @@ spec:
         default: SASL_SSL
       saslMechanism:
         title: SASL Mechanism
-        description: The Simple Authentication and Security Layer (SASL) Mechanism used. 
+        description: The Simple Authentication and Security Layer (SASL) Mechanism used.
         type: string
         default: PLAIN
       user:
         title: Username
-        description: Username to authenticate to Kafka 
+        description: Username to authenticate to Kafka
         type: string
         x-descriptors:
         - urn:camel:group:credentials
@@ -117,7 +117,16 @@ spec:
         x-descriptors:
         - urn:keda:metadata:consumerGroup
         - urn:keda:required
+      deserializeHeaders:
+        title: Automatically Deserialize Headers
+        description: When enabled the Kamelet source will deserialize all message headers to String representation.
+        type: boolean
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
   dependencies:
+    - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
+    - "camel:core"
     - "camel:kafka"
     - "camel:kamelet"
   template:
@@ -134,4 +143,10 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-      - to: "kamelet:sink"
+        - set-property:
+            name: deserializeHeaders
+            constant: "{{deserializeHeaders}}"
+        - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        - remove-property:
+            name: deserializeHeaders
+        - to: "kamelet:sink"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
new file mode 100644
index 0000000..7cab1ee
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.serialization.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.support.SimpleTypeConverter;
+
+/**
+ * Header deserializer used in Kafka source Kamelet. Automatically converts all message headers to String.
+ * Uses given type converter implementation set on the Camel context to convert values. If no type converter is set
+ * the deserializer uses its own fallback conversion implementation.
+ */
+public class KafkaHeaderDeserializer {
+
+    private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);
+
+    public void process(@ExchangeProperty("deserializeHeaders") boolean deserializeHeaders, Exchange exchange) throws Exception {
+        if (!deserializeHeaders) {
+            return;
+        }
+
+        Map<String, Object> headers = exchange.getMessage().getHeaders();
+
+        TypeConverter typeConverter = exchange.getContext().getTypeConverter();
+        if (typeConverter == null) {
+            typeConverter = defaultTypeConverter;
+        }
+
+        for (Map.Entry<String, Object> header : headers.entrySet()) {
+            if (shouldDeserialize(header)) {
+                header.setValue(typeConverter.convertTo(String.class, header.getValue()));
+            }
+        }
+    }
+
+    /**
+     * Fallback conversion strategy supporting null values, String and byte[]. Converts headers to respective
+     * String representation or null.
+     * @param type target type, always String in this case.
+     * @param exchange the exchange containing all headers to convert.
+     * @param value the current value to convert.
+     * @return String representation of given value or null if value itself is null.
+     */
+    private static Object convert(Class<?> type, Exchange exchange, Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        if (value instanceof byte[]) {
+            return new String((byte[]) value, StandardCharsets.UTF_8);
+        }
+
+        return value.toString();
+    }
+
+    /**
+     * Exclude special Kafka headers from auto deserialization.
+     * @param entry
+     * @return
+     */
+    private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
+        return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
+    }
+}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
new file mode 100644
index 0000000..d5d92f5
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.serialization.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.SimpleTypeConverter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class KafkaHeaderDeserializerTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final KafkaHeaderDeserializer processor = new KafkaHeaderDeserializer();
+
+    @BeforeEach
+    void setup() {
+        this.camelContext = new DefaultCamelContext();
+    }
+
+    @Test
+    void shouldDeserializeHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
+        exchange.getMessage().setHeader("fooNull", null);
+        exchange.getMessage().setHeader("number", 1L);
+
+        processor.process(true, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertEquals("barBytes", exchange.getMessage().getHeader("fooBytes"));
+        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("fooNull"));
+        Assertions.assertNull(exchange.getMessage().getHeader("fooNull"));
+        Assertions.assertEquals("1", exchange.getMessage().getHeader("number"));
+    }
+
+    @Test
+    void shouldDeserializeHeadersViaTypeConverter() throws Exception {
+        camelContext.setTypeConverter(new SimpleTypeConverter(true, (type, exchange, value) -> "converted"));
+
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
+        exchange.getMessage().setHeader("fooNull", null);
+
+        processor.process(true, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo"));
+        Assertions.assertEquals("converted", exchange.getMessage().getHeader("fooBytes"));
+        Assertions.assertEquals("converted", exchange.getMessage().getHeader("fooNull"));
+    }
+
+    @Test
+    void shouldFallbackToDefaultConverter() throws Exception {
+        camelContext.setTypeConverter(null);
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
+
+        processor.process(true, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertEquals("barBytes", exchange.getMessage().getHeader("fooBytes"));
+    }
+
+    @Test
+    void shouldNotDeserializeHeadersWhenDisabled() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
+
+        processor.process(false, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertTrue(exchange.getMessage().getHeader("fooBytes") instanceof byte[]);
+        Assertions.assertEquals(Arrays.toString("barBytes".getBytes(StandardCharsets.UTF_8)), Arrays.toString((byte[]) exchange.getMessage().getHeader("fooBytes")));
+    }
+}
diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
index 82607c9..6bce42c 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
@@ -62,12 +62,12 @@ spec:
         default: SASL_SSL
       saslMechanism:
         title: SASL Mechanism
-        description: The Simple Authentication and Security Layer (SASL) Mechanism used. 
+        description: The Simple Authentication and Security Layer (SASL) Mechanism used.
         type: string
         default: PLAIN
       user:
         title: Username
-        description: Username to authenticate to Kafka 
+        description: Username to authenticate to Kafka
         type: string
         x-descriptors:
         - urn:camel:group:credentials
@@ -117,7 +117,16 @@ spec:
         x-descriptors:
         - urn:keda:metadata:consumerGroup
         - urn:keda:required
+      deserializeHeaders:
+        title: Automatically Deserialize Headers
+        description: When enabled the Kamelet source will deserialize all message headers to String representation.
+        type: boolean
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
   dependencies:
+    - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
+    - "camel:core"
     - "camel:kafka"
     - "camel:kamelet"
   template:
@@ -134,4 +143,10 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-      - to: "kamelet:sink"
+        - set-property:
+            name: deserializeHeaders
+            constant: "{{deserializeHeaders}}"
+        - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        - remove-property:
+            name: deserializeHeaders
+        - to: "kamelet:sink"
diff --git a/test/kafka/install.sh b/test/kafka/install.sh
new file mode 100644
index 0000000..c5415ba
--- /dev/null
+++ b/test/kafka/install.sh
@@ -0,0 +1,28 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Install Kafka
+kubectl create -f https://strimzi.io/install/latest?namespace=default
+
+# Apply the `Kafka` Cluster CR file
+kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-ephemeral-single.yaml
+
+# wait for everything to start
+kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s
+
+# create default topic
+kubectl apply -f https://strimzi.io/examples/latest/topic/kafka-topic.yaml
diff --git a/test/kafka/kafka-sink-test.yaml b/test/kafka/kafka-sink-test.yaml
new file mode 100644
index 0000000..ffce924
--- /dev/null
+++ b/test/kafka/kafka-sink-test.yaml
@@ -0,0 +1,42 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: kafka-sink-test
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: timer-source
+    properties:
+      period: 5000
+      contentType: application/json
+      message: ${message}
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: kafka-sink
+    properties:
+      bootstrapServers: ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port}
+      user: ${user}
+      password: ${password}
+      topic: ${topic}
+      securityProtocol: ${securityProtocol}
diff --git a/test/kafka/kafka-sink.feature b/test/kafka/kafka-sink.feature
new file mode 100644
index 0000000..bc106f7
--- /dev/null
+++ b/test/kafka/kafka-sink.feature
@@ -0,0 +1,45 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+Feature: Kafka Kamelet sink
+
+  Background:
+    Given variable user is ""
+    Given variable password is ""
+    Given variables
+      | bootstrap.server.host     | my-cluster-kafka-bootstrap |
+      | bootstrap.server.port     | 9092 |
+      | securityProtocol          | PLAINTEXT |
+      | topic                     | my-topic |
+      | message                   | Camel K rocks! |
+    Given Kafka topic: ${topic}
+    Given Kafka topic partition: 0
+
+  Scenario: Create Kamelet binding
+    Given Camel K resource polling configuration
+      | maxAttempts          | 200   |
+      | delayBetweenAttempts | 2000  |
+    When load KameletBinding kafka-sink-test.yaml
+    Then Camel K integration kafka-sink-test should be running
+
+  Scenario: Receive message on Kafka topic and verify sink output
+    Given Kafka connection
+      | url         | ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} |
+    Then receive Kafka message with body: ${message}
+
+  Scenario: Remove resources
+    Given delete KameletBinding kafka-sink-test
diff --git a/test/kafka/kafka-source-test.yaml b/test/kafka/kafka-source-test.yaml
new file mode 100644
index 0000000..7fa6532
--- /dev/null
+++ b/test/kafka/kafka-source-test.yaml
@@ -0,0 +1,37 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: kafka-source-test
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: kafka-source
+    properties:
+      bootstrapServers: ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port}
+      user: ${user}
+      password: ${password}
+      topic: ${topic}
+      securityProtocol: ${securityProtocol}
+      deserializeHeaders: ${deserializeHeaders}
+  sink:
+    uri: http://kafka-to-http-service.${YAKS_NAMESPACE}/result
+
diff --git a/test/kafka/kafka-source.feature b/test/kafka/kafka-source.feature
new file mode 100644
index 0000000..b7759bc
--- /dev/null
+++ b/test/kafka/kafka-source.feature
@@ -0,0 +1,66 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+Feature: Kafka Kamelet source
+
+  Background:
+    Given variable user is ""
+    Given variable password is ""
+    Given variables
+      | bootstrap.server.host     | my-cluster-kafka-bootstrap |
+      | bootstrap.server.port     | 9092 |
+      | securityProtocol          | PLAINTEXT |
+      | deserializeHeaders        | true |
+      | topic                     | my-topic |
+      | source                    | Kafka Kamelet source |
+      | message                   | Camel K rocks! |
+    Given Kafka topic: ${topic}
+    Given Kafka topic partition: 0
+    Given HTTP server timeout is 15000 ms
+    Given HTTP server "kafka-to-http-service"
+
+  Scenario: Create Http server
+    Given create Kubernetes service kafka-to-http-service with target port 8080
+
+  Scenario: Create Kamelet binding
+    Given Camel K resource polling configuration
+      | maxAttempts          | 200   |
+      | delayBetweenAttempts | 2000  |
+    When load KameletBinding kafka-source-test.yaml
+    Then Camel K integration kafka-source-test should be running
+    And Camel K integration kafka-source-test should print Subscribing ${topic}-Thread 0 to topic ${topic}
+    And sleep 10sec
+
+  Scenario: Send message to Kafka topic and verify sink output
+    Given variable key is "citrus:randomNumber(4)"
+    Given Kafka connection
+      | url         | ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} |
+    Given Kafka message key: ${key}
+    When send Kafka message with body and headers: ${message}
+      | event-source | ${source} |
+    Then expect HTTP request body: ${message}
+    Then expect HTTP request headers
+      | event-source    | ${source} |
+      | kafka.TOPIC     | ${topic}  |
+      | kafka.KEY       | ${key}    |
+      | kafka.PARTITION | 0         |
+    And receive POST /result
+    And send HTTP 200 OK
+
+  Scenario: Remove resources
+    Given delete KameletBinding kafka-source-test
+    And delete Kubernetes service kafka-to-http-service
diff --git a/test/kafka/uninstall.sh b/test/kafka/uninstall.sh
new file mode 100644
index 0000000..81b3599
--- /dev/null
+++ b/test/kafka/uninstall.sh
@@ -0,0 +1,22 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# delete Kafka cluster
+kubectl delete kafka/my-cluster
+
+# delete default topic
+kubectl delete kafkatopic/my-topic
diff --git a/test/kafka/yaks-config.yaml b/test/kafka/yaks-config.yaml
new file mode 100644
index 0000000..3d1997b
--- /dev/null
+++ b/test/kafka/yaks-config.yaml
@@ -0,0 +1,48 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+config:
+  namespace:
+    temporary: false
+  runtime:
+    env:
+      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: CITRUS_TYPE_CONVERTER
+        value: camel
+    resources:
+      - kafka-source-test.yaml
+      - kafka-sink-test.yaml
+  dump:
+    enabled: true
+    failedOnly: true
+    includes:
+      - app=camel-k
+pre:
+  - name: Install Kafka
+    if: env:CI=true
+    script: install.sh
+  - name: Setup Kafka roles
+    if: env:CI=true
+    run: |
+      yaks role --add strimzi
+post:
+  - name: Uninstall Kafka
+    if: env:CI=true
+    script: uninstall.sh

[camel-kamelets] 02/02: chore: Kafka source uses local bean for header deserialization

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit 7c959e1cb8c5dba287098f936c801b1f26d340dd
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Mon Mar 21 20:07:28 2022 +0100

    chore: Kafka source uses local bean for header deserialization
---
 kamelets/kafka-source.kamelet.yaml                        | 14 ++++++++------
 .../serialization/kafka/KafkaHeaderDeserializer.java      | 15 +++++++++++----
 .../serialization/kafka/KafkaHeaderDeserializerTest.java  | 12 ++++++++----
 .../src/main/resources/kamelets/kafka-source.kamelet.yaml | 14 ++++++++------
 4 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/kamelets/kafka-source.kamelet.yaml b/kamelets/kafka-source.kamelet.yaml
index 6bce42c..d7518b2 100644
--- a/kamelets/kafka-source.kamelet.yaml
+++ b/kamelets/kafka-source.kamelet.yaml
@@ -130,6 +130,12 @@ spec:
     - "camel:kafka"
     - "camel:kamelet"
   template:
+    beans:
+      - name: kafkaHeaderDeserializer
+        type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        property:
+          - key: enabled
+            value: '{{deserializeHeaders}}'
     from:
       uri: "kafka:{{topic}}"
       parameters:
@@ -143,10 +149,6 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-        - set-property:
-            name: deserializeHeaders
-            constant: "{{deserializeHeaders}}"
-        - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
-        - remove-property:
-            name: deserializeHeaders
+        - process:
+            ref: "{{kafkaHeaderDeserializer}}"
         - to: "kamelet:sink"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
index 7cab1ee..3fc24cc 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
+import org.apache.camel.Processor;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.support.SimpleTypeConverter;
@@ -31,12 +31,15 @@ import org.apache.camel.support.SimpleTypeConverter;
  * Uses given type converter implementation set on the Camel context to convert values. If no type converter is set
  * the deserializer uses its own fallback conversion implementation.
  */
-public class KafkaHeaderDeserializer {
+public class KafkaHeaderDeserializer implements Processor {
+
+    boolean enabled = false;
 
     private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);
 
-    public void process(@ExchangeProperty("deserializeHeaders") boolean deserializeHeaders, Exchange exchange) throws Exception {
-        if (!deserializeHeaders) {
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (!enabled) {
             return;
         }
 
@@ -86,4 +89,8 @@ public class KafkaHeaderDeserializer {
     private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
         return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
     }
+
+    public void setEnabled(String enabled) {
+        this.enabled = Boolean.parseBoolean(enabled);
+    }
 }
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
index d5d92f5..2d7e3bd 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
@@ -48,7 +48,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("fooNull", null);
         exchange.getMessage().setHeader("number", 1L);
 
-        processor.process(true, exchange);
+        processor.enabled = true;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
@@ -68,7 +69,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
         exchange.getMessage().setHeader("fooNull", null);
 
-        processor.process(true, exchange);
+        processor.enabled = true;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo"));
@@ -84,7 +86,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("foo", "bar");
         exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
 
-        processor.process(true, exchange);
+        processor.enabled = true;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
@@ -98,7 +101,8 @@ class KafkaHeaderDeserializerTest {
         exchange.getMessage().setHeader("foo", "bar");
         exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
 
-        processor.process(false, exchange);
+        processor.enabled = false;
+        processor.process(exchange);
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
index 6bce42c..d7518b2 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
@@ -130,6 +130,12 @@ spec:
     - "camel:kafka"
     - "camel:kamelet"
   template:
+    beans:
+      - name: kafkaHeaderDeserializer
+        type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        property:
+          - key: enabled
+            value: '{{deserializeHeaders}}'
     from:
       uri: "kafka:{{topic}}"
       parameters:
@@ -143,10 +149,6 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-        - set-property:
-            name: deserializeHeaders
-            constant: "{{deserializeHeaders}}"
-        - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
-        - remove-property:
-            name: deserializeHeaders
+        - process:
+            ref: "{{kafkaHeaderDeserializer}}"
         - to: "kamelet:sink"