You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/12/02 08:08:36 UTC

[camel-kamelets] 04/28: Add CloudEvent output type on AWS S3 Kamelet source

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit 96534b4fded6d900a93abbd7acaf8cd5c0c99618
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Thu Nov 17 17:09:49 2022 +0100

    Add CloudEvent output type on AWS S3 Kamelet source
---
 kamelets/aws-ddb-sink.kamelet.yaml                 |  2 +-
 kamelets/aws-s3-source.kamelet.yaml                |  2 +-
 library/camel-kamelets-utils/pom.xml               |  5 ++
 .../kamelets/utils/format/DataTypeProcessor.java   |  6 ++
 .../aws2/s3/AWS2S3CloudEventOutputType.java        | 62 ++++++++++++++
 .../converter/standard/JsonModelDataType.java      |  6 +-
 .../camel/datatype/converter/aws2-s3-cloudevents   | 18 ++++
 .../utils/format/DataTypeProcessorTest.java        | 98 ++++++++++++++++++++++
 .../DefaultDataTypeConverterResolverTest.java      |  3 +
 .../utils/format/DefaultDataTypeRegistryTest.java  |  6 ++
 .../s3/AWS2S3CloudEventOutputTypeTest.java}        | 58 +++++--------
 .../converter/standard/JsonModelDataTypeTest.java  |  2 +-
 .../format/converter/test/UppercaseDataType.java   | 31 +++++++
 .../services/org/apache/camel/DataTypeConverter    | 18 ++++
 .../camel/datatype/converter/camel-lowercase       | 18 ++++
 .../resources/kamelets/aws-ddb-sink.kamelet.yaml   |  2 +-
 .../resources/kamelets/aws-s3-source.kamelet.yaml  |  2 +-
 test/aws-s3/aws-s3-uri-binding.yaml                |  2 +-
 18 files changed, 295 insertions(+), 46 deletions(-)

diff --git a/kamelets/aws-ddb-sink.kamelet.yaml b/kamelets/aws-ddb-sink.kamelet.yaml
index a4e7a114..952ecfa1 100644
--- a/kamelets/aws-ddb-sink.kamelet.yaml
+++ b/kamelets/aws-ddb-sink.kamelet.yaml
@@ -124,7 +124,7 @@ spec:
         - key: format
           value: '{{inputFormat}}'
         - key: registry
-          value: '{{dataTypeRegistry}}'
+          value: '#bean:{{dataTypeRegistry}}'
     from:
       uri: "kamelet:source"
       steps:
diff --git a/kamelets/aws-s3-source.kamelet.yaml b/kamelets/aws-s3-source.kamelet.yaml
index a63af7dc..d937f6e5 100644
--- a/kamelets/aws-s3-source.kamelet.yaml
+++ b/kamelets/aws-s3-source.kamelet.yaml
@@ -130,7 +130,7 @@ spec:
           - key: format
             value: '{{outputFormat}}'
           - key: registry
-            value: '{{dataTypeRegistry}}'
+            value: '#bean:{{dataTypeRegistry}}'
       - name: renameHeaders
         type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:
diff --git a/library/camel-kamelets-utils/pom.xml b/library/camel-kamelets-utils/pom.xml
index 5b1441f3..2aba210d 100644
--- a/library/camel-kamelets-utils/pom.xml
+++ b/library/camel-kamelets-utils/pom.xml
@@ -82,6 +82,11 @@
             <artifactId>camel-aws2-s3</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cloudevents</artifactId>
+            <!--<scope>provided</scope>-->
+        </dependency>
 
         <!-- Test scoped dependencies -->
         <dependency>
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
index 81c58330..def0f2b8 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
@@ -31,6 +31,8 @@ import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
  */
 public class DataTypeProcessor implements Processor, CamelContextAware {
 
+    public static final String DATA_TYPE_FORMAT_PROPERTY = "CamelDataTypeFormat";
+
     private CamelContext camelContext;
 
     private DefaultDataTypeRegistry registry;
@@ -42,6 +44,10 @@ public class DataTypeProcessor implements Processor, CamelContextAware {
 
     @Override
     public void process(Exchange exchange) throws Exception {
+        if (exchange.hasProperties() && exchange.getProperties().containsKey(DATA_TYPE_FORMAT_PROPERTY)) {
+            format = exchange.getProperty(DATA_TYPE_FORMAT_PROPERTY, String.class);
+        }
+
         if (format == null || format.isEmpty()) {
             return;
         }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
new file mode 100644
index 00000000..655a4cef
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.component.cloudevents.CloudEvent;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Output data type represents AWS S3 get object response as CloudEvent V1. The data type sets Camel specific
+ * CloudEvent headers on the exchange.
+ */
+@DataType(scheme = "aws2-s3", name = "cloudevents")
+public class AWS2S3CloudEventOutputType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        final Map<String, Object> headers = exchange.getMessage().getHeaders();
+
+        headers.put(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "kamelet:aws-s3-source");
+        headers.put(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, exchange.getMessage().getHeader(AWS2S3Constants.BUCKET_NAME, String.class));
+        headers.put(CloudEvent.CAMEL_CLOUD_EVENT_SUBJECT, exchange.getMessage().getHeader(AWS2S3Constants.KEY, String.class));
+        headers.put(CloudEvent.CAMEL_CLOUD_EVENT_TIME, getEventTime(exchange));
+        headers.put(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE, exchange.getMessage().getHeader(AWS2S3Constants.CONTENT_TYPE, String.class));
+
+        String encoding = exchange.getMessage().getHeader(AWS2S3Constants.CONTENT_ENCODING, String.class);
+        if (encoding != null) {
+            headers.put(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING, encoding);
+        }
+
+        exchange.getMessage().removeHeaders("CamelAwsS3*");
+    }
+
+    private String getEventTime(Exchange exchange) {
+        final ZonedDateTime created
+                = ZonedDateTime.ofInstant(Instant.ofEpochMilli(exchange.getCreated()), ZoneId.systemDefault());
+        return DateTimeFormatter.ISO_INSTANT.format(created);
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
index 047e6dd5..d8d4ca4e 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
@@ -36,15 +36,15 @@ import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
 @DataType(name = "jsonObject")
 public class JsonModelDataType implements DataTypeConverter {
 
-    public static final String JSON_DATA_TYPE_KEY = "CamelJsonModelDataType";
+    public static final String DATA_TYPE_MODEL_PROPERTY = "CamelDataTypeModel";
 
     @Override
     public void convert(Exchange exchange) {
-        if (!exchange.hasProperties() || !exchange.getProperties().containsKey(JSON_DATA_TYPE_KEY)) {
+        if (!exchange.hasProperties() || !exchange.getProperties().containsKey(DATA_TYPE_MODEL_PROPERTY)) {
             return;
         }
 
-        String type = exchange.getProperty(JSON_DATA_TYPE_KEY, String.class);
+        String type = exchange.getProperty(DATA_TYPE_MODEL_PROPERTY, String.class);
         try (JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), Class.forName(type))) {
             Object unmarshalled = dataFormat.unmarshal(exchange, getBodyAsStream(exchange));
             exchange.getMessage().setBody(unmarshalled);
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents
new file mode 100644
index 00000000..fafdd926
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.kamelets.utils.format.converter.aws2.s3.AWS2S3CloudEventOutputType
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DataTypeProcessorTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DataTypeProcessorTest.java
new file mode 100644
index 00000000..0140b6f9
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DataTypeProcessorTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DataTypeProcessorTest {
+
+    private final DefaultCamelContext camelContext = new DefaultCamelContext();
+
+    private final DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
+
+    private final DataTypeProcessor processor = new DataTypeProcessor();
+
+    @BeforeEach
+    void setup() {
+        CamelContextAware.trySetCamelContext(processor, camelContext);
+        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
+        processor.setRegistry(dataTypeRegistry);
+    }
+
+    @Test
+    public void shouldApplyDataTypeConverterFromAnnotationLookup() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(new ByteArrayInputStream("Test".getBytes(StandardCharsets.UTF_8)));
+        processor.setFormat("uppercase");
+        processor.process(exchange);
+
+        assertEquals(String.class, exchange.getMessage().getBody().getClass());
+        assertEquals("TEST", exchange.getMessage().getBody());
+    }
+
+    @Test
+    public void shouldApplyDataTypeConverterFromResourceLookup() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(new ByteArrayInputStream("Test".getBytes(StandardCharsets.UTF_8)));
+        processor.setFormat("lowercase");
+        processor.process(exchange);
+
+        assertEquals(String.class, exchange.getMessage().getBody().getClass());
+        assertEquals("test", exchange.getMessage().getBody());
+    }
+
+    @Test
+    public void shouldHandleUnknownDataType() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setBody(new ByteArrayInputStream("Test".getBytes(StandardCharsets.UTF_8)));
+        processor.setScheme("foo");
+        processor.setFormat("unknown");
+        processor.process(exchange);
+
+        assertEquals(ByteArrayInputStream.class, exchange.getMessage().getBody().getClass());
+        assertEquals("Test", exchange.getMessage().getBody(String.class));
+    }
+
+    public static class LowercaseDataType implements DataTypeConverter {
+
+        @Override
+        public void convert(Exchange exchange) {
+            exchange.getMessage().setBody(exchange.getMessage().getBody(String.class).toLowerCase());
+        }
+
+        @Override
+        public String getName() {
+            return "lowercase";
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
index 1972b047..b281f314 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
@@ -56,6 +56,9 @@ class DefaultDataTypeConverterResolverTest {
         converter = resolver.resolve("foo", "json", camelContext);
         Assertions.assertTrue(converter.isPresent());
         Assertions.assertEquals(FooConverter.class, converter.get().getClass());
+
+        converter = resolver.resolve("camel", "lowercase", camelContext);
+        Assertions.assertTrue(converter.isPresent());
     }
 
     public static class FooConverter implements DataTypeConverter {
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
index e077b369..c72e7897 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
+import org.apache.camel.kamelets.utils.format.converter.test.UppercaseDataType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -49,6 +50,11 @@ class DefaultDataTypeRegistryTest {
         Assertions.assertTrue(converter.isPresent());
         Assertions.assertEquals(DefaultDataTypeConverter.class, converter.get().getClass());
         Assertions.assertEquals(byte[].class, ((DefaultDataTypeConverter) converter.get()).getType());
+        converter = dataTypeRegistry.lookup( "lowercase");
+        Assertions.assertTrue(converter.isPresent());
+        converter = dataTypeRegistry.lookup( "uppercase");
+        Assertions.assertTrue(converter.isPresent());
+        Assertions.assertEquals(UppercaseDataType.class, converter.get().getClass());
     }
 
 }
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
similarity index 55%
copy from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
copy to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
index c175cc6d..10c51708 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
@@ -15,13 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
 
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.component.cloudevents.CloudEvents;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
@@ -31,54 +34,35 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class JsonModelDataTypeTest {
+class AWS2S3CloudEventOutputTypeTest {
 
     private final DefaultCamelContext camelContext = new DefaultCamelContext();
 
-    private final JsonModelDataType dataType = new JsonModelDataType();
+    private final AWS2S3CloudEventOutputType outputType = new AWS2S3CloudEventOutputType();
 
     @Test
-    void shouldMapFromStringToJsonModel() throws Exception {
+    void shouldMapToCloudEvent() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.setProperty(JsonModelDataType.JSON_DATA_TYPE_KEY, Person.class.getName());
-        exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}");
-        dataType.convert(exchange);
-
-        assertEquals(Person.class, exchange.getMessage().getBody().getClass());
-        assertEquals("Sheldon", exchange.getMessage().getBody(Person.class).getName());
+        exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test1.txt");
+        exchange.getMessage().setHeader(AWS2S3Constants.BUCKET_NAME, "myBucket");
+        exchange.getMessage().setHeader(AWS2S3Constants.CONTENT_TYPE, "text/plain");
+        exchange.getMessage().setHeader(AWS2S3Constants.CONTENT_ENCODING, StandardCharsets.UTF_8.toString());
+        exchange.getMessage().setBody(new ByteArrayInputStream("Test1".getBytes(StandardCharsets.UTF_8)));
+        outputType.convert(exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertFalse(exchange.getMessage().getHeaders().containsKey(AWS2S3Constants.KEY));
+        assertEquals("kamelet:aws-s3-source", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_TYPE));
+        assertEquals("test1.txt", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_SUBJECT));
+        assertEquals("myBucket", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_SOURCE));
     }
 
     @Test
     public void shouldLookupDataType() throws Exception {
         DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
         CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("jsonObject");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "cloudevents");
         Assertions.assertTrue(converter.isPresent());
     }
-
-    public static class Person {
-        @JsonProperty
-        private String name;
-
-        @JsonProperty
-        private Long age;
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public Long getAge() {
-            return age;
-        }
-
-        public void setAge(Long age) {
-            this.age = age;
-        }
-    }
-
 }
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
index c175cc6d..d93da234 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
@@ -41,7 +41,7 @@ public class JsonModelDataTypeTest {
     void shouldMapFromStringToJsonModel() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.setProperty(JsonModelDataType.JSON_DATA_TYPE_KEY, Person.class.getName());
+        exchange.setProperty(JsonModelDataType.DATA_TYPE_MODEL_PROPERTY, Person.class.getName());
         exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}");
         dataType.convert(exchange);
 
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/test/UppercaseDataType.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/test/UppercaseDataType.java
new file mode 100644
index 00000000..60604f73
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/test/UppercaseDataType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.test;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+@DataType(name = "uppercase")
+public class UppercaseDataType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        exchange.getMessage().setBody(exchange.getMessage().getBody(String.class).toUpperCase());
+    }
+}
diff --git a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/DataTypeConverter b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/DataTypeConverter
new file mode 100644
index 00000000..bf3aaf0d
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/DataTypeConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.kamelets.utils.format.converter.test
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-lowercase b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-lowercase
new file mode 100644
index 00000000..b140a56b
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-lowercase
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.kamelets.utils.format.DataTypeProcessorTest$LowercaseDataType
\ No newline at end of file
diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
index a4e7a114..952ecfa1 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
@@ -124,7 +124,7 @@ spec:
         - key: format
           value: '{{inputFormat}}'
         - key: registry
-          value: '{{dataTypeRegistry}}'
+          value: '#bean:{{dataTypeRegistry}}'
     from:
       uri: "kamelet:source"
       steps:
diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
index a63af7dc..d937f6e5 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
@@ -130,7 +130,7 @@ spec:
           - key: format
             value: '{{outputFormat}}'
           - key: registry
-            value: '{{dataTypeRegistry}}'
+            value: '#bean:{{dataTypeRegistry}}'
       - name: renameHeaders
         type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
         property:
diff --git a/test/aws-s3/aws-s3-uri-binding.yaml b/test/aws-s3/aws-s3-uri-binding.yaml
index 50522818..14d420f9 100644
--- a/test/aws-s3/aws-s3-uri-binding.yaml
+++ b/test/aws-s3/aws-s3-uri-binding.yaml
@@ -28,7 +28,7 @@ spec:
     properties:
       bucketNameOrArn: ${aws.s3.bucketNameOrArn}
       overrideEndpoint: true
-      outputFormat: json
+      outputFormat: cloudevents
       uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}
       accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
       secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}