You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/03/02 17:38:03 UTC

[camel-kamelets] branch main updated: chore: Add data-type-action Kamelet

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git


The following commit(s) were added to refs/heads/main by this push:
     new 331abebb chore: Add data-type-action Kamelet
331abebb is described below

commit 331abebb446f48aa26a7b84599f74b3753ef6146
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Wed Mar 1 20:17:11 2023 +0100

    chore: Add data-type-action Kamelet
    
    - Adds new action Kamelet to leverage Kamelet input/ouput data types
    - Enables users to apply data type conversion as part of a KameletBinding
    - Introduces new data type implementation representing the CloudEvents Http binding
    - Add some YAKS tests to verify data-type-action Kamelet
    - Use Kamelets from local directory when running YAKS tests in GitHub workflow
---
 .github/workflows/yaks-tests.yaml                  |  9 ++-
 docs/modules/ROOT/nav.adoc                         |  1 +
 kamelets/data-type-action.kamelet.yaml             | 70 ++++++++++++++++++++++
 .../aws2/s3/AWS2S3CloudEventOutputType.java        | 25 ++------
 .../converter/http/HttpCloudEventOutputType.java   | 52 ++++++++++++++++
 .../utils/format/converter/utils/CloudEvents.java  | 45 ++++++++++++++
 .../services/org/apache/camel/DataTypeConverter    |  1 +
 .../converter/http-cloudevents}                    |  4 +-
 .../aws2/s3/AWS2S3CloudEventOutputTypeTest.java    |  9 +--
 .../HttpCloudEventOutputTypeTest.java}             | 38 +++++++-----
 .../kamelets/data-type-action.kamelet.yaml         | 70 ++++++++++++++++++++++
 test/aws-s3/README.md                              |  2 +-
 test/aws-s3/aws-s3-knative-broker.feature          | 49 +++++++++++++++
 ...ding.feature => aws-s3-knative-channel.feature} |  0
 test/aws-s3/aws-s3-knative-cloudevents.feature     | 50 ++++++++++++++++
 test/aws-s3/aws-s3-to-http.feature                 | 49 +++++++++++++++
 test/aws-s3/aws-s3-to-http.yaml                    | 68 +++++++++++++++++++++
 test/aws-s3/aws-s3-to-knative-broker.yaml          | 53 ++++++++++++++++
 test/aws-s3/yaks-config.yaml                       |  2 +
 .../data-type-action/data-type-action-binding.yaml | 48 +++++++++++++++
 test/data-type-action/data-type-action.feature     | 33 ++++++++++
 test/{aws-s3 => data-type-action}/yaks-config.yaml | 35 +----------
 .../insert-field-action.feature                    |  2 +-
 test/rest-openapi-sink/rest-openapi-sink.feature   |  1 -
 .../timer-to-http/application.properties           |  6 +-
 test/timer-to-http/timer-to-http.feature           | 25 ++++++++
 test/timer-to-http/timer-to-http.groovy            | 24 ++++++++
 .../timer-to-http/yaks-config.yaml                 | 25 ++++++--
 28 files changed, 706 insertions(+), 90 deletions(-)

diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml
index 6a234b22..2d52f915 100644
--- a/.github/workflows/yaks-tests.yaml
+++ b/.github/workflows/yaks-tests.yaml
@@ -42,7 +42,7 @@ concurrency:
 
 env:
   YAKS_VERSION: 0.14.2
-  YAKS_RUN_OPTIONS: "--timeout=15m --local -e YAKS_CAMELK_MAX_ATTEMPTS=10 -e YAKS_JBANG_CAMEL_VERSION=4.0.0-M1"
+  YAKS_RUN_OPTIONS: "--timeout=15m --local -e YAKS_CAMELK_MAX_ATTEMPTS=10 -e YAKS_JBANG_CAMEL_VERSION=4.0.0-M1 -e YAKS_JBANG_KAMELETS_LOCAL_DIR=../../../kamelets"
 
 jobs:
   test:
@@ -58,6 +58,9 @@ jobs:
         distribution: 'temurin'
         java-version: 17
         cache: 'maven'
+    - name: Build Kamelet libraries
+      run: |
+        ./mvnw clean install -DskipTests
     - name: Get YAKS CLI
       run: |
         curl --fail -L --silent https://github.com/citrusframework/yaks/releases/download/v${YAKS_VERSION}/yaks-${YAKS_VERSION}-linux-64bit.tar.gz -o yaks.tar.gz
@@ -93,10 +96,12 @@ jobs:
         
         yaks run test/mail-sink $YAKS_RUN_OPTIONS
         yaks run test/timer-source $YAKS_RUN_OPTIONS
+        yaks run test/timer-to-http $YAKS_RUN_OPTIONS
+        yaks run test/data-type-action $YAKS_RUN_OPTIONS
         yaks run test/earthquake-source $YAKS_RUN_OPTIONS
         yaks run test/rest-openapi-sink $YAKS_RUN_OPTIONS
         yaks run test/kafka $YAKS_RUN_OPTIONS
-    - uses: actions/upload-artifact@v2
+    - uses: actions/upload-artifact@v3
       if: failure()
       with:
         name: dumps
diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc
index 55ad5a07..f09da690 100644
--- a/docs/modules/ROOT/nav.adoc
+++ b/docs/modules/ROOT/nav.adoc
@@ -51,6 +51,7 @@
 * xref:chunk-template-action.adoc[]
 * xref:couchbase-sink.adoc[]
 * xref:cron-source.adoc[]
+* xref:data-type-action.adoc[]
 * xref:delay-action.adoc[]
 * xref:dns-dig-action.adoc[]
 * xref:dns-ip-action.adoc[]
diff --git a/kamelets/data-type-action.kamelet.yaml b/kamelets/data-type-action.kamelet.yaml
new file mode 100644
index 00000000..fc94067b
--- /dev/null
+++ b/kamelets/data-type-action.kamelet.yaml
@@ -0,0 +1,70 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: data-type-action
+  annotations:
+    camel.apache.org/kamelet.support.level: "Stable"
+    camel.apache.org/catalog.version: "4.0.0-SNAPSHOT"
+    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiIHN0YW5kYWxvbmU9Im5vIj8+CjxzdmcKICAgeG1sbnM6ZGM9Imh0dHA6Ly9wdXJsLm9yZy9kYy9lbGVtZW50cy8xLjEvIgogICB4bWxuczpjYz0iaHR0cDovL2NyZWF0aXZlY29tbW9ucy5vcmcvbnMjIgogICB4bWxuczpyZGY9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkvMDIvMjItcmRmLXN5bnRheC1ucyMiCiAgIHhtbG5zOnN2Zz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciCiAgIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyIKICAgeG1sbnM6c29kaXBvZGk9Imh0dHA6Ly9zb2RpcG [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "Actions"
+    camel.apache.org/kamelet.namespace: "Transformation"
+  labels:
+    camel.apache.org/kamelet.type: "action"
+spec:
+  definition:
+    title: "Data Type Action"
+    description: |-
+      Applies a given data type with respective data transformation.
+    required:
+      - format
+    type: object
+    properties:
+      scheme:
+        title: Component Scheme
+        description: The data type component scheme enables users to apply Camel component specific data type conversions.
+        type: string
+        default: "camel"
+        example: "camel"
+      format:
+        title: Data Type Format
+        description: Defines the data type that will be applied by this action. The Kameelet catalog supports different data types and performs automatic message conversion according to the given type.
+        type: string
+  dependencies:
+    - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
+    - "camel:kamelet"
+    - "camel:core"
+  template:
+    beans:
+      - name: dataTypeRegistry
+        type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
+      - name: dataTypeProcessor
+        type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
+        property:
+          - key: scheme
+            value: '{{scheme}}'
+          - key: format
+            value: '{{format}}'
+          - key: registry
+            value: '#bean:{{dataTypeRegistry}}'
+    from:
+      uri: "kamelet:source"
+      steps:
+      - process:
+          ref: "{{dataTypeProcessor}}"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
index 4bc87192..d8847886 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
@@ -17,14 +17,11 @@
 
 package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
 
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.kamelets.utils.format.converter.utils.CloudEvents;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
 
@@ -35,24 +32,14 @@ import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
 @DataType(scheme = "aws2-s3", name = "cloudevents", mediaType = "application/octet-stream")
 public class AWS2S3CloudEventOutputType implements DataTypeConverter {
 
-    static final String CAMEL_CLOUD_EVENT_TYPE = "CamelCloudEventType";
-    static final String CAMEL_CLOUD_EVENT_SOURCE = "CamelCloudEventSource";
-    static final String CAMEL_CLOUD_EVENT_SUBJECT = "CamelCloudEventSubject";
-    static final String CAMEL_CLOUD_EVENT_TIME = "CamelCloudEventTime";
-
     @Override
     public void convert(Exchange exchange) {
         final Map<String, Object> headers = exchange.getMessage().getHeaders();
 
-        headers.put(CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event.aws.s3.getObject");
-        headers.put(CAMEL_CLOUD_EVENT_SOURCE, "aws.s3.bucket." + exchange.getMessage().getHeader(AWS2S3Constants.BUCKET_NAME, String.class));
-        headers.put(CAMEL_CLOUD_EVENT_SUBJECT, exchange.getMessage().getHeader(AWS2S3Constants.KEY, String.class));
-        headers.put(CAMEL_CLOUD_EVENT_TIME, getEventTime(exchange));
-    }
-
-    private String getEventTime(Exchange exchange) {
-        final ZonedDateTime created
-                = ZonedDateTime.ofInstant(Instant.ofEpochMilli(exchange.getCreated()), ZoneId.systemDefault());
-        return DateTimeFormatter.ISO_INSTANT.format(created);
+        headers.put(CloudEvents.CAMEL_CLOUD_EVENT_ID, exchange.getExchangeId());
+        headers.put(CloudEvents.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event.aws.s3.getObject");
+        headers.put(CloudEvents.CAMEL_CLOUD_EVENT_SOURCE, "aws.s3.bucket." + exchange.getMessage().getHeader(AWS2S3Constants.BUCKET_NAME, String.class));
+        headers.put(CloudEvents.CAMEL_CLOUD_EVENT_SUBJECT, exchange.getMessage().getHeader(AWS2S3Constants.KEY, String.class));
+        headers.put(CloudEvents.CAMEL_CLOUD_EVENT_TIME, CloudEvents.getEventTime(exchange));
     }
 }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
new file mode 100644
index 00000000..bbad4637
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.http;
+
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.kamelets.utils.format.converter.utils.CloudEvents;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Output data type represents the CloudEvent V1 Http binding. The data type reads Camel specific
+ * CloudEvent headers and transforms these to Http headers according to the CloudEvents Http binding specification.
+ *
+ * By default, sets the Http content type header to application/json when not set explicitly.
+ */
+@DataType(scheme = "http", name = "cloudevents", mediaType = "application/json")
+public class HttpCloudEventOutputType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        final Map<String, Object> headers = exchange.getMessage().getHeaders();
+
+        headers.put("ce-id", exchange.getExchangeId());
+        headers.put("ce-specversion", headers.getOrDefault(CloudEvents.CAMEL_CLOUD_EVENT_VERSION, "1.0"));
+        headers.put("ce-type", headers.getOrDefault(CloudEvents.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event"));
+        headers.put("ce-source", headers.getOrDefault(CloudEvents.CAMEL_CLOUD_EVENT_SOURCE, "org.apache.camel"));
+
+        if (headers.containsKey(CloudEvents.CAMEL_CLOUD_EVENT_SUBJECT)) {
+            headers.put("ce-subject", headers.get(CloudEvents.CAMEL_CLOUD_EVENT_SUBJECT));
+        }
+
+        headers.put("ce-time", headers.getOrDefault(CloudEvents.CAMEL_CLOUD_EVENT_TIME, CloudEvents.getEventTime(exchange)));
+        headers.put(Exchange.CONTENT_TYPE, headers.getOrDefault(CloudEvents.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "application/json"));
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/CloudEvents.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/CloudEvents.java
new file mode 100644
index 00000000..d88ef661
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/CloudEvents.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.utils;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Helper class to manage CloudEvents specific Camel message headers and other utilities.
+ */
+public class CloudEvents {
+
+    public static final String CAMEL_CLOUD_EVENT_ID = "CamelCloudEventID";
+    public static final String CAMEL_CLOUD_EVENT_VERSION = "CamelCloudEventVersion";
+    public static final String CAMEL_CLOUD_EVENT_TYPE = "CamelCloudEventType";
+    public static final String CAMEL_CLOUD_EVENT_SOURCE = "CamelCloudEventSource";
+    public static final String CAMEL_CLOUD_EVENT_SUBJECT = "CamelCloudEventSubject";
+    public static final String CAMEL_CLOUD_EVENT_TIME = "CamelCloudEventTime";
+    public static final String CAMEL_CLOUD_EVENT_CONTENT_TYPE = Exchange.CONTENT_TYPE;
+
+    public static String getEventTime(Exchange exchange) {
+        final ZonedDateTime created
+                = ZonedDateTime.ofInstant(Instant.ofEpochMilli(exchange.getCreated()), ZoneId.systemDefault());
+        return DateTimeFormatter.ISO_INSTANT.format(created);
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
index 81e10256..1cd0ed78 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
@@ -18,3 +18,4 @@
 org.apache.camel.kamelets.utils.format.converter.standard
 org.apache.camel.kamelets.utils.format.converter.aws2.s3
 org.apache.camel.kamelets.utils.format.converter.aws2.ddb
+org.apache.camel.kamelets.utils.format.converter.http
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents
similarity index 81%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents
index 81e10256..e74fe8e2 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
+class=org.apache.camel.kamelets.utils.format.converter.http.HttpCloudEventOutputType
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
index 084f4c16..53570dd9 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.aws2.s3.AWS2S3Constants;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.utils.CloudEvents;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
@@ -52,9 +53,9 @@ class AWS2S3CloudEventOutputTypeTest {
 
         Assertions.assertTrue(exchange.getMessage().hasHeaders());
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey(AWS2S3Constants.KEY));
-        assertEquals("org.apache.camel.event.aws.s3.getObject", exchange.getMessage().getHeader(AWS2S3CloudEventOutputType.CAMEL_CLOUD_EVENT_TYPE));
-        assertEquals("test1.txt", exchange.getMessage().getHeader(AWS2S3CloudEventOutputType.CAMEL_CLOUD_EVENT_SUBJECT));
-        assertEquals("aws.s3.bucket.myBucket", exchange.getMessage().getHeader(AWS2S3CloudEventOutputType.CAMEL_CLOUD_EVENT_SOURCE));
+        assertEquals("org.apache.camel.event.aws.s3.getObject", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_TYPE));
+        assertEquals("test1.txt", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_SUBJECT));
+        assertEquals("aws.s3.bucket.myBucket", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_SOURCE));
     }
 
     @Test
@@ -64,4 +65,4 @@ class AWS2S3CloudEventOutputTypeTest {
         Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "cloudevents");
         Assertions.assertTrue(converter.isPresent());
     }
-}
\ No newline at end of file
+}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
similarity index 58%
copy from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
copy to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
index 084f4c16..01e331e6 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
+package org.apache.camel.kamelets.utils.format.converter.http;
 
 import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
@@ -23,45 +23,51 @@ import java.util.Optional;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
-import org.apache.camel.component.aws2.s3.AWS2S3Constants;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.utils.CloudEvents;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class AWS2S3CloudEventOutputTypeTest {
+class HttpCloudEventOutputTypeTest {
 
     private final DefaultCamelContext camelContext = new DefaultCamelContext();
 
-    private final AWS2S3CloudEventOutputType outputType = new AWS2S3CloudEventOutputType();
+    private final HttpCloudEventOutputType outputType = new HttpCloudEventOutputType();
 
     @Test
-    void shouldMapToCloudEvent() throws Exception {
+    void shouldMapToHttpCloudEvent() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test1.txt");
-        exchange.getMessage().setHeader(AWS2S3Constants.BUCKET_NAME, "myBucket");
-        exchange.getMessage().setHeader(AWS2S3Constants.CONTENT_TYPE, "text/plain");
-        exchange.getMessage().setHeader(AWS2S3Constants.CONTENT_ENCODING, StandardCharsets.UTF_8.toString());
+        exchange.getMessage().setHeader(CloudEvents.CAMEL_CLOUD_EVENT_SUBJECT, "test1.txt");
+        exchange.getMessage().setHeader(CloudEvents.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event");
+        exchange.getMessage().setHeader(CloudEvents.CAMEL_CLOUD_EVENT_SOURCE, "org.apache.camel.test");
+        exchange.getMessage().setHeader(CloudEvents.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "text/plain");
         exchange.getMessage().setBody(new ByteArrayInputStream("Test1".getBytes(StandardCharsets.UTF_8)));
+
         outputType.convert(exchange);
 
-        Assertions.assertTrue(exchange.getMessage().hasHeaders());
-        Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey(AWS2S3Constants.KEY));
-        assertEquals("org.apache.camel.event.aws.s3.getObject", exchange.getMessage().getHeader(AWS2S3CloudEventOutputType.CAMEL_CLOUD_EVENT_TYPE));
-        assertEquals("test1.txt", exchange.getMessage().getHeader(AWS2S3CloudEventOutputType.CAMEL_CLOUD_EVENT_SUBJECT));
-        assertEquals("aws.s3.bucket.myBucket", exchange.getMessage().getHeader(AWS2S3CloudEventOutputType.CAMEL_CLOUD_EVENT_SOURCE));
+        assertTrue(exchange.getMessage().hasHeaders());
+        assertEquals(exchange.getExchangeId(), exchange.getMessage().getHeader("ce-id"));
+        assertEquals("1.0", exchange.getMessage().getHeader("ce-specversion"));
+        assertEquals("org.apache.camel.event", exchange.getMessage().getHeader("ce-type"));
+        assertEquals("test1.txt", exchange.getMessage().getHeader("ce-subject"));
+        assertEquals("org.apache.camel.test", exchange.getMessage().getHeader("ce-source"));
+        assertTrue(exchange.getMessage().getHeaders().containsKey("ce-time"));
+        assertEquals("text/plain", exchange.getMessage().getHeader(Exchange.CONTENT_TYPE));
+        assertEquals("Test1", exchange.getMessage().getBody(String.class));
     }
 
     @Test
     public void shouldLookupDataType() throws Exception {
         DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
         CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "cloudevents");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("http", "cloudevents");
         Assertions.assertTrue(converter.isPresent());
     }
-}
\ No newline at end of file
+}
diff --git a/library/camel-kamelets/src/main/resources/kamelets/data-type-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/data-type-action.kamelet.yaml
new file mode 100644
index 00000000..fc94067b
--- /dev/null
+++ b/library/camel-kamelets/src/main/resources/kamelets/data-type-action.kamelet.yaml
@@ -0,0 +1,70 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: data-type-action
+  annotations:
+    camel.apache.org/kamelet.support.level: "Stable"
+    camel.apache.org/catalog.version: "4.0.0-SNAPSHOT"
+    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiIHN0YW5kYWxvbmU9Im5vIj8+CjxzdmcKICAgeG1sbnM6ZGM9Imh0dHA6Ly9wdXJsLm9yZy9kYy9lbGVtZW50cy8xLjEvIgogICB4bWxuczpjYz0iaHR0cDovL2NyZWF0aXZlY29tbW9ucy5vcmcvbnMjIgogICB4bWxuczpyZGY9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkvMDIvMjItcmRmLXN5bnRheC1ucyMiCiAgIHhtbG5zOnN2Zz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciCiAgIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyIKICAgeG1sbnM6c29kaXBvZGk9Imh0dHA6Ly9zb2RpcG [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "Actions"
+    camel.apache.org/kamelet.namespace: "Transformation"
+  labels:
+    camel.apache.org/kamelet.type: "action"
+spec:
+  definition:
+    title: "Data Type Action"
+    description: |-
+      Applies a given data type with respective data transformation.
+    required:
+      - format
+    type: object
+    properties:
+      scheme:
+        title: Component Scheme
+        description: The data type component scheme enables users to apply Camel component specific data type conversions.
+        type: string
+        default: "camel"
+        example: "camel"
+      format:
+        title: Data Type Format
+        description: Defines the data type that will be applied by this action. The Kameelet catalog supports different data types and performs automatic message conversion according to the given type.
+        type: string
+  dependencies:
+    - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
+    - "camel:kamelet"
+    - "camel:core"
+  template:
+    beans:
+      - name: dataTypeRegistry
+        type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
+      - name: dataTypeProcessor
+        type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
+        property:
+          - key: scheme
+            value: '{{scheme}}'
+          - key: format
+            value: '{{format}}'
+          - key: registry
+            value: '#bean:{{dataTypeRegistry}}'
+    from:
+      uri: "kamelet:source"
+      steps:
+      - process:
+          ref: "{{dataTypeProcessor}}"
diff --git a/test/aws-s3/README.md b/test/aws-s3/README.md
index e71f403f..7bf2aaec 100644
--- a/test/aws-s3/README.md
+++ b/test/aws-s3/README.md
@@ -70,7 +70,7 @@ $ yaks test aws-s3-uri-binding.feature
 To run tests with binding to Knative channel:
 
 ```shell script
-$ yaks test aws-s3-knative-binding.feature
+$ yaks test aws-s3-knative-channel.feature
 ```
 
 You will be provided with the test log output and the test results.
diff --git a/test/aws-s3/aws-s3-knative-broker.feature b/test/aws-s3/aws-s3-knative-broker.feature
new file mode 100644
index 00000000..fe935dc7
--- /dev/null
+++ b/test/aws-s3/aws-s3-knative-broker.feature
@@ -0,0 +1,49 @@
+@knative
+Feature: AWS S3 Kamelet - Knative broker binding
+
+  Background:
+    Given Knative event consumer timeout is 20000 ms
+    Given variables
+      | aws.s3.scheme | camel |
+      | aws.s3.format | string |
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Create infrastructure
+    # Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    # Create AWS-S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+    # Create Knative broker
+    Given create Knative broker default
+    And Knative broker default is running
+
+  Scenario: Verify AWS-S3 Kamelet to Knative binding
+    # Create binding
+    When load KameletBinding aws-s3-to-knative-broker.yaml
+    And KameletBinding aws-s3-to-knative-broker is available
+    And Camel K integration aws-s3-to-knative-broker is running
+    Then Camel K integration aws-s3-to-knative-broker should print Started aws-s3-to-knative-broker
+    # Verify Kamelet source
+    Given create Knative event consumer service event-consumer-service
+    Given create Knative trigger event-service-trigger on service event-consumer-service with filter on attributes
+      | type   | org.apache.camel.event |
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message}
+    Then expect Knative event data: ${aws.s3.message}
+    And verify Knative event
+      | type            | org.apache.camel.event |
+      | source          | @ignore@ |
+      | id              | @ignore@ |
+
+  Scenario: Remove resources
+    # Remove Camel K resources
+    Given delete KameletBinding aws-s3-to-knative-broker
+    Given delete Kubernetes service event-consumer-service
+    # Remove Knative resources
+    Given delete Knative broker default
+    # Stop LocalStack container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-knative-binding.feature b/test/aws-s3/aws-s3-knative-channel.feature
similarity index 100%
rename from test/aws-s3/aws-s3-knative-binding.feature
rename to test/aws-s3/aws-s3-knative-channel.feature
diff --git a/test/aws-s3/aws-s3-knative-cloudevents.feature b/test/aws-s3/aws-s3-knative-cloudevents.feature
new file mode 100644
index 00000000..647c8717
--- /dev/null
+++ b/test/aws-s3/aws-s3-knative-cloudevents.feature
@@ -0,0 +1,50 @@
+@knative
+Feature: AWS S3 Kamelet - cloud events data type
+
+  Background:
+    Given Knative event consumer timeout is 20000 ms
+    Given variables
+      | aws.s3.scheme | aws2-s3 |
+      | aws.s3.format | cloudevents |
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Create infrastructure
+    # Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    # Create AWS-S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+    # Create Knative broker
+    Given create Knative broker default
+    And Knative broker default is running
+
+  Scenario: Verify AWS-S3 Kamelet to Knative binding
+    # Create binding
+    When load KameletBinding aws-s3-to-knative-broker.yaml
+    And KameletBinding aws-s3-to-knative-broker is available
+    And Camel K integration aws-s3-to-knative-broker is running
+    Then Camel K integration aws-s3-to-knative-broker should print Started aws-s3-to-knative-broker
+    # Verify Kamelet source
+    Given create Knative event consumer service event-consumer-service
+    Given create Knative trigger event-service-trigger on service event-consumer-service with filter on attributes
+      | type   | org.apache.camel.event.aws.s3.getObject |
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message}
+    Then expect Knative event data: ${aws.s3.message}
+    And verify Knative event
+      | type            | org.apache.camel.event.aws.s3.getObject |
+      | source          | aws.s3.bucket.${aws.s3.bucketNameOrArn} |
+      | subject         | ${aws.s3.key} |
+      | id              | @ignore@ |
+
+  Scenario: Remove resources
+    # Remove Camel K resources
+    Given delete KameletBinding aws-s3-to-knative-broker
+    Given delete Kubernetes service event-consumer-service
+    # Remove Knative resources
+    Given delete Knative broker default
+    # Stop LocalStack container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-to-http.feature b/test/aws-s3/aws-s3-to-http.feature
new file mode 100644
index 00000000..59f1897d
--- /dev/null
+++ b/test/aws-s3/aws-s3-to-http.feature
@@ -0,0 +1,49 @@
+Feature: AWS S3 Kamelet - Http sink
+
+  Background:
+    Given Kubernetes timeout is 60000 ms
+    Given HTTP server timeout is 60000 ms
+    Given HTTP server "test-service"
+    Given variables
+      | aws.s3.bucketNameOrArn | mybucket |
+      | aws.s3.message | Hello from S3 Kamelet |
+      | aws.s3.key | hello.txt |
+
+  Scenario: Create infrastructure
+    # Create Http server
+    Given create Kubernetes service test-service
+    # Start LocalStack container
+    Given Enable service S3
+    Given start LocalStack container
+    # Create AWS-S3 client
+    Given New global Camel context
+    Given load to Camel registry amazonS3Client.groovy
+
+  Scenario: Verify AWS-S3 Kamelet to Http
+    # Create binding
+    When load KameletBinding aws-s3-to-http.yaml
+    And KameletBinding aws-s3-to-http is available
+    And Camel K integration aws-s3-to-http is running
+    Then Camel K integration aws-s3-to-http should print Started aws-s3-to-http
+    # Verify Kamelet source
+    Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}"
+    Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message}
+    # Verify Http request
+    Given expect HTTP request body: ${aws.s3.message}
+    And expect HTTP request headers
+      | ce-specversion | 1.0 |
+      | ce-id          | @notEmpty()@ |
+      | ce-subject     | ${aws.s3.key} |
+      | ce-source      | aws.s3.bucket.${aws.s3.bucketNameOrArn} |
+      | ce-type        | org.apache.camel.event.aws.s3.getObject |
+      | ce-time        | @notEmpty()@ |
+      | Content-Type   | application/json;charset=UTF-8 |
+    When receive POST /incoming
+    Then send HTTP 201 CREATED
+
+  Scenario: Remove resources
+    # Remove Camel K resources
+    Given delete KameletBinding aws-s3-to-http
+    Given delete Kubernetes service test-service
+    # Stop LocalStack container
+    Given stop LocalStack container
diff --git a/test/aws-s3/aws-s3-to-http.yaml b/test/aws-s3/aws-s3-to-http.yaml
new file mode 100644
index 00000000..d0e2bd41
--- /dev/null
+++ b/test/aws-s3/aws-s3-to-http.yaml
@@ -0,0 +1,68 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: aws-s3-to-http
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: aws-s3-source
+    properties:
+      bucketNameOrArn: ${aws.s3.bucketNameOrArn}
+      overrideEndpoint: true
+      uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}
+      accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
+      secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}
+      region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}
+  steps:
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: data-type-action
+      properties:
+        scheme: "aws2-s3"
+        format: "cloudevents"
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: data-type-action
+      properties:
+        scheme: "http"
+        format: "cloudevents"
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: drop-headers-action
+      properties:
+        pattern: "Camel*"
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: log-sink
+      properties:
+        showHeaders: true
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: http-sink
+    properties:
+      url: yaks:resolveURL('test-service')/incoming
diff --git a/test/aws-s3/aws-s3-to-knative-broker.yaml b/test/aws-s3/aws-s3-to-knative-broker.yaml
new file mode 100644
index 00000000..dcb883d8
--- /dev/null
+++ b/test/aws-s3/aws-s3-to-knative-broker.yaml
@@ -0,0 +1,53 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: aws-s3-to-knative-broker
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: aws-s3-source
+    properties:
+      bucketNameOrArn: ${aws.s3.bucketNameOrArn}
+      overrideEndpoint: true
+      uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}
+      accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
+      secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}
+      region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}
+  steps:
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: data-type-action
+      properties:
+        scheme: "${aws.s3.scheme}"
+        format: "${aws.s3.format}"
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: log-sink
+      properties:
+        showHeaders: true
+  sink:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
diff --git a/test/aws-s3/yaks-config.yaml b/test/aws-s3/yaks-config.yaml
index 90be09c8..ccbb58d7 100644
--- a/test/aws-s3/yaks-config.yaml
+++ b/test/aws-s3/yaks-config.yaml
@@ -44,7 +44,9 @@ config:
       - aws-s3-to-log-uri-based.groovy
       - aws-s3-to-log-secret-based.groovy
       - aws-s3-uri-binding.yaml
+      - aws-s3-to-http.yaml
       - aws-s3-to-knative-channel.yaml
+      - aws-s3-to-knative-broker.yaml
       - utils/knative-channel-to-log.yaml
     cucumber:
       tags:
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/data-type-action/data-type-action-binding.yaml
new file mode 100644
index 00000000..da02745c
--- /dev/null
+++ b/test/data-type-action/data-type-action-binding.yaml
@@ -0,0 +1,48 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: data-type-action-binding
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: timer-source
+    properties:
+      period: 5000
+      contentType: application/json
+      message: >
+        ${input}
+  steps:
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: data-type-action
+      properties:
+        scheme: "http"
+        format: "cloudevents"
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: log-action
+      properties:
+        showHeaders: true
+  sink:
+    uri: yaks:resolveURL('test-service')/result
diff --git a/test/data-type-action/data-type-action.feature b/test/data-type-action/data-type-action.feature
new file mode 100644
index 00000000..9aa87a97
--- /dev/null
+++ b/test/data-type-action/data-type-action.feature
@@ -0,0 +1,33 @@
+Feature: Data type action
+
+  Background:
+    Given HTTP server timeout is 15000 ms
+    Given HTTP server "test-service"
+
+  Scenario: Create Http server
+    Given create Kubernetes service test-service with target port 8080
+
+  Scenario: Create Kamelet binding
+    Given variable uuid is "citrus:randomUUID()"
+    Given variable input is
+    """
+    { "id": "${uuid}" }
+    """
+    When load KameletBinding data-type-action-binding.yaml
+    Then Camel K integration data-type-action-binding should be running
+
+    # Verify output message sent
+    Given expect HTTP request body: ${input}
+    And expect HTTP request headers
+      | ce-specversion | 1.0 |
+      | ce-id          | @notEmpty()@ |
+      | ce-source      | org.apache.camel |
+      | ce-type        | org.apache.camel.event |
+      | ce-time        | @notEmpty()@ |
+      | Content-Type   | application/json;charset=UTF-8 |
+    When receive POST /result
+    Then send HTTP 200 OK
+
+  Scenario: Remove resources
+    Given delete KameletBinding data-type-action-binding
+    And delete Kubernetes service test-service
diff --git a/test/aws-s3/yaks-config.yaml b/test/data-type-action/yaks-config.yaml
similarity index 57%
copy from test/aws-s3/yaks-config.yaml
copy to test/data-type-action/yaks-config.yaml
index 90be09c8..a0b3d590 100644
--- a/test/aws-s3/yaks-config.yaml
+++ b/test/data-type-action/yaks-config.yaml
@@ -19,52 +19,21 @@ config:
   namespace:
     temporary: false
   runtime:
-    testcontainers:
-      enabled: true
     env:
-      - name: YAKS_CAMEL_AUTO_REMOVE_RESOURCES
-        value: false
       - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
         value: false
-      - name: YAKS_KAMELETS_AUTO_REMOVE_RESOURCES
-        value: false
       - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
         value: false
-      - name: YAKS_KNATIVE_AUTO_REMOVE_RESOURCES
-        value: false
       - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT
         value: true
-      - name: YAKS_TESTCONTAINERS_AUTO_REMOVE_RESOURCES
-        value: false
-      - name: CITRUS_TYPE_CONVERTER
-        value: camel
-    resources:
-      - amazonS3Client.groovy
-      - aws-s3-credentials.properties
-      - aws-s3-to-log-uri-based.groovy
-      - aws-s3-to-log-secret-based.groovy
-      - aws-s3-uri-binding.yaml
-      - aws-s3-to-knative-channel.yaml
-      - utils/knative-channel-to-log.yaml
-    cucumber:
-      tags:
-        - "not @ignored and not @knative"
     settings:
       loggers:
         - name: INTEGRATION_STATUS
           level: INFO
         - name: INTEGRATION_LOGS
           level: INFO
-      dependencies:
-        - groupId: org.apache.camel
-          artifactId: camel-aws2-s3
-          version: "@camel.version@"
-        - groupId: software.amazon.awssdk
-          artifactId: s3
-          version: "@aws-java-sdk2.version@"
-        - groupId: org.apache.camel
-          artifactId: camel-jackson
-          version: "@camel.version@"
+    resources:
+      - data-type-action-binding.yaml
   dump:
     enabled: true
     failedOnly: true
diff --git a/test/insert-field-action/insert-field-action.feature b/test/insert-field-action/insert-field-action.feature
index 21037cc5..ebca10e8 100644
--- a/test/insert-field-action/insert-field-action.feature
+++ b/test/insert-field-action/insert-field-action.feature
@@ -41,7 +41,7 @@ Feature: Insert field Kamelet action
     """
     { "id": "@ignore@", "${field}": "${value}" }
     """
-    And HTTP request header Content-Type="application/json"
+    And expect HTTP request header: Content-Type="application/json;charset=UTF-8"
     When receive POST /result
     Then send HTTP 200 OK
 
diff --git a/test/rest-openapi-sink/rest-openapi-sink.feature b/test/rest-openapi-sink/rest-openapi-sink.feature
index dc1ee64d..868482fe 100644
--- a/test/rest-openapi-sink/rest-openapi-sink.feature
+++ b/test/rest-openapi-sink/rest-openapi-sink.feature
@@ -39,7 +39,6 @@ Feature: REST OpenAPI Kamelet sink
 
   Scenario: Verify proper addPet request message sent
     Given expect HTTP request body: ${pet}
-    And HTTP request header Content-Type is "application/json"
     When receive POST /petstore/pet
     And send HTTP 201 CREATED
 
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter b/test/timer-to-http/application.properties
similarity index 77%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
copy to test/timer-to-http/application.properties
index 81e10256..831affa6 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
+++ b/test/timer-to-http/application.properties
@@ -6,7 +6,7 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
+message=YAKS rocks!
\ No newline at end of file
diff --git a/test/timer-to-http/timer-to-http.feature b/test/timer-to-http/timer-to-http.feature
new file mode 100644
index 00000000..9f8aacb3
--- /dev/null
+++ b/test/timer-to-http/timer-to-http.feature
@@ -0,0 +1,25 @@
+Feature: Verify Camel K integrations
+
+  Background:
+    Given HTTP server "test-service"
+    Given HTTP server listening on port 8080
+    Given HTTP request timeout is 6000 ms
+    Given Kubernetes timeout is 60000 ms
+
+  Scenario: Verify timer-to-http integration
+    # Create Http server
+    Given create Kubernetes service test-service with target port 8080
+
+    # Run Camel K integration
+    Given Camel K integration property file application.properties
+    When load Camel K integration timer-to-http.groovy
+    Then Camel K integration timer-to-http should be running
+
+    # Verify Http request and send response
+    Then expect HTTP request body: YAKS rocks!
+    And receive PUT /messages
+    And HTTP response body: Thank You!
+    And send HTTP 200 OK
+
+    # Verify Camel K integration logs
+    And Camel K integration timer-to-http should print Thank You!
diff --git a/test/timer-to-http/timer-to-http.groovy b/test/timer-to-http/timer-to-http.groovy
new file mode 100644
index 00000000..300c5e8d
--- /dev/null
+++ b/test/timer-to-http/timer-to-http.groovy
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// camel-k: language=groovy
+
+from('timer:tick?period=5000')
+    .setHeader("CamelHttpMethod", constant("PUT"))
+    .setBody().constant('{{message}}')
+    .to('yaks:resolveURL(test-service)/messages')
+    .to('log:info?showStreams=true')
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter b/test/timer-to-http/yaks-config.yaml
similarity index 56%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
copy to test/timer-to-http/yaks-config.yaml
index 81e10256..a7387ea3 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
+++ b/test/timer-to-http/yaks-config.yaml
@@ -1,4 +1,4 @@
-#
+# ---------------------------------------------------------------------------
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -6,15 +6,28 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
+# ---------------------------------------------------------------------------
 
-org.apache.camel.kamelets.utils.format.converter.standard
-org.apache.camel.kamelets.utils.format.converter.aws2.s3
-org.apache.camel.kamelets.utils.format.converter.aws2.ddb
+config:
+  namespace:
+    temporary: false
+  runtime:
+    env:
+      - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT
+        value: true
+    settings:
+      loggers:
+        - name: INTEGRATION_STATUS
+          level: INFO
+        - name: INTEGRATION_LOGS
+          level: INFO
+    resources:
+      - timer-to-http.groovy
+      - application.properties