You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/12/02 08:08:36 UTC
[camel-kamelets] 04/28: Add CloudEvent output type on AWS S3 Kamelet source
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
commit 96534b4fded6d900a93abbd7acaf8cd5c0c99618
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Thu Nov 17 17:09:49 2022 +0100
Add CloudEvent output type on AWS S3 Kamelet source
---
kamelets/aws-ddb-sink.kamelet.yaml | 2 +-
kamelets/aws-s3-source.kamelet.yaml | 2 +-
library/camel-kamelets-utils/pom.xml | 5 ++
.../kamelets/utils/format/DataTypeProcessor.java | 6 ++
.../aws2/s3/AWS2S3CloudEventOutputType.java | 62 ++++++++++++++
.../converter/standard/JsonModelDataType.java | 6 +-
.../camel/datatype/converter/aws2-s3-cloudevents | 18 ++++
.../utils/format/DataTypeProcessorTest.java | 98 ++++++++++++++++++++++
.../DefaultDataTypeConverterResolverTest.java | 3 +
.../utils/format/DefaultDataTypeRegistryTest.java | 6 ++
.../s3/AWS2S3CloudEventOutputTypeTest.java} | 58 +++++--------
.../converter/standard/JsonModelDataTypeTest.java | 2 +-
.../format/converter/test/UppercaseDataType.java | 31 +++++++
.../services/org/apache/camel/DataTypeConverter | 18 ++++
.../camel/datatype/converter/camel-lowercase | 18 ++++
.../resources/kamelets/aws-ddb-sink.kamelet.yaml | 2 +-
.../resources/kamelets/aws-s3-source.kamelet.yaml | 2 +-
test/aws-s3/aws-s3-uri-binding.yaml | 2 +-
18 files changed, 295 insertions(+), 46 deletions(-)
diff --git a/kamelets/aws-ddb-sink.kamelet.yaml b/kamelets/aws-ddb-sink.kamelet.yaml
index a4e7a114..952ecfa1 100644
--- a/kamelets/aws-ddb-sink.kamelet.yaml
+++ b/kamelets/aws-ddb-sink.kamelet.yaml
@@ -124,7 +124,7 @@ spec:
- key: format
value: '{{inputFormat}}'
- key: registry
- value: '{{dataTypeRegistry}}'
+ value: '#bean:{{dataTypeRegistry}}'
from:
uri: "kamelet:source"
steps:
diff --git a/kamelets/aws-s3-source.kamelet.yaml b/kamelets/aws-s3-source.kamelet.yaml
index a63af7dc..d937f6e5 100644
--- a/kamelets/aws-s3-source.kamelet.yaml
+++ b/kamelets/aws-s3-source.kamelet.yaml
@@ -130,7 +130,7 @@ spec:
- key: format
value: '{{outputFormat}}'
- key: registry
- value: '{{dataTypeRegistry}}'
+ value: '#bean:{{dataTypeRegistry}}'
- name: renameHeaders
type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
property:
diff --git a/library/camel-kamelets-utils/pom.xml b/library/camel-kamelets-utils/pom.xml
index 5b1441f3..2aba210d 100644
--- a/library/camel-kamelets-utils/pom.xml
+++ b/library/camel-kamelets-utils/pom.xml
@@ -82,6 +82,11 @@
<artifactId>camel-aws2-s3</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-cloudevents</artifactId>
+ <!--<scope>provided</scope>-->
+ </dependency>
<!-- Test scoped dependencies -->
<dependency>
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
index 81c58330..def0f2b8 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java
@@ -31,6 +31,8 @@ import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
*/
public class DataTypeProcessor implements Processor, CamelContextAware {
+ public static final String DATA_TYPE_FORMAT_PROPERTY = "CamelDataTypeFormat";
+
private CamelContext camelContext;
private DefaultDataTypeRegistry registry;
@@ -42,6 +44,10 @@ public class DataTypeProcessor implements Processor, CamelContextAware {
@Override
public void process(Exchange exchange) throws Exception {
+ if (exchange.hasProperties() && exchange.getProperties().containsKey(DATA_TYPE_FORMAT_PROPERTY)) {
+ format = exchange.getProperty(DATA_TYPE_FORMAT_PROPERTY, String.class);
+ }
+
if (format == null || format.isEmpty()) {
return;
}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
new file mode 100644
index 00000000..655a4cef
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.component.cloudevents.CloudEvent;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Output data type represents AWS S3 get object response as CloudEvent V1. The data type sets Camel specific
+ * CloudEvent headers on the exchange.
+ */
+@DataType(scheme = "aws2-s3", name = "cloudevents")
+public class AWS2S3CloudEventOutputType implements DataTypeConverter {
+
+ @Override
+ public void convert(Exchange exchange) {
+ final Map<String, Object> headers = exchange.getMessage().getHeaders();
+
+ headers.put(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "kamelet:aws-s3-source");
+ headers.put(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, exchange.getMessage().getHeader(AWS2S3Constants.BUCKET_NAME, String.class));
+ headers.put(CloudEvent.CAMEL_CLOUD_EVENT_SUBJECT, exchange.getMessage().getHeader(AWS2S3Constants.KEY, String.class));
+ headers.put(CloudEvent.CAMEL_CLOUD_EVENT_TIME, getEventTime(exchange));
+ headers.put(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE, exchange.getMessage().getHeader(AWS2S3Constants.CONTENT_TYPE, String.class));
+
+ String encoding = exchange.getMessage().getHeader(AWS2S3Constants.CONTENT_ENCODING, String.class);
+ if (encoding != null) {
+ headers.put(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING, encoding);
+ }
+
+ exchange.getMessage().removeHeaders("CamelAwsS3*");
+ }
+
+ private String getEventTime(Exchange exchange) {
+ final ZonedDateTime created
+ = ZonedDateTime.ofInstant(Instant.ofEpochMilli(exchange.getCreated()), ZoneId.systemDefault());
+ return DateTimeFormatter.ISO_INSTANT.format(created);
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
index 047e6dd5..d8d4ca4e 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
@@ -36,15 +36,15 @@ import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
@DataType(name = "jsonObject")
public class JsonModelDataType implements DataTypeConverter {
- public static final String JSON_DATA_TYPE_KEY = "CamelJsonModelDataType";
+ public static final String DATA_TYPE_MODEL_PROPERTY = "CamelDataTypeModel";
@Override
public void convert(Exchange exchange) {
- if (!exchange.hasProperties() || !exchange.getProperties().containsKey(JSON_DATA_TYPE_KEY)) {
+ if (!exchange.hasProperties() || !exchange.getProperties().containsKey(DATA_TYPE_MODEL_PROPERTY)) {
return;
}
- String type = exchange.getProperty(JSON_DATA_TYPE_KEY, String.class);
+ String type = exchange.getProperty(DATA_TYPE_MODEL_PROPERTY, String.class);
try (JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), Class.forName(type))) {
Object unmarshalled = dataFormat.unmarshal(exchange, getBodyAsStream(exchange));
exchange.getMessage().setBody(unmarshalled);
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents
new file mode 100644
index 00000000..fafdd926
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.kamelets.utils.format.converter.aws2.s3.AWS2S3CloudEventOutputType
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DataTypeProcessorTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DataTypeProcessorTest.java
new file mode 100644
index 00000000..0140b6f9
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DataTypeProcessorTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DataTypeProcessorTest {
+
+ private final DefaultCamelContext camelContext = new DefaultCamelContext();
+
+ private final DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
+
+ private final DataTypeProcessor processor = new DataTypeProcessor();
+
+ @BeforeEach
+ void setup() {
+ CamelContextAware.trySetCamelContext(processor, camelContext);
+ CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
+ processor.setRegistry(dataTypeRegistry);
+ }
+
+ @Test
+ public void shouldApplyDataTypeConverterFromAnnotationLookup() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(new ByteArrayInputStream("Test".getBytes(StandardCharsets.UTF_8)));
+ processor.setFormat("uppercase");
+ processor.process(exchange);
+
+ assertEquals(String.class, exchange.getMessage().getBody().getClass());
+ assertEquals("TEST", exchange.getMessage().getBody());
+ }
+
+ @Test
+ public void shouldApplyDataTypeConverterFromResourceLookup() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(new ByteArrayInputStream("Test".getBytes(StandardCharsets.UTF_8)));
+ processor.setFormat("lowercase");
+ processor.process(exchange);
+
+ assertEquals(String.class, exchange.getMessage().getBody().getClass());
+ assertEquals("test", exchange.getMessage().getBody());
+ }
+
+ @Test
+ public void shouldHandleUnknownDataType() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(new ByteArrayInputStream("Test".getBytes(StandardCharsets.UTF_8)));
+ processor.setScheme("foo");
+ processor.setFormat("unknown");
+ processor.process(exchange);
+
+ assertEquals(ByteArrayInputStream.class, exchange.getMessage().getBody().getClass());
+ assertEquals("Test", exchange.getMessage().getBody(String.class));
+ }
+
+ public static class LowercaseDataType implements DataTypeConverter {
+
+ @Override
+ public void convert(Exchange exchange) {
+ exchange.getMessage().setBody(exchange.getMessage().getBody(String.class).toLowerCase());
+ }
+
+ @Override
+ public String getName() {
+ return "lowercase";
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
index 1972b047..b281f314 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
@@ -56,6 +56,9 @@ class DefaultDataTypeConverterResolverTest {
converter = resolver.resolve("foo", "json", camelContext);
Assertions.assertTrue(converter.isPresent());
Assertions.assertEquals(FooConverter.class, converter.get().getClass());
+
+ converter = resolver.resolve("camel", "lowercase", camelContext);
+ Assertions.assertTrue(converter.isPresent());
}
public static class FooConverter implements DataTypeConverter {
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
index e077b369..c72e7897 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import org.apache.camel.CamelContextAware;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
+import org.apache.camel.kamelets.utils.format.converter.test.UppercaseDataType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -49,6 +50,11 @@ class DefaultDataTypeRegistryTest {
Assertions.assertTrue(converter.isPresent());
Assertions.assertEquals(DefaultDataTypeConverter.class, converter.get().getClass());
Assertions.assertEquals(byte[].class, ((DefaultDataTypeConverter) converter.get()).getType());
+ converter = dataTypeRegistry.lookup( "lowercase");
+ Assertions.assertTrue(converter.isPresent());
+ converter = dataTypeRegistry.lookup( "uppercase");
+ Assertions.assertTrue(converter.isPresent());
+ Assertions.assertEquals(UppercaseDataType.class, converter.get().getClass());
}
}
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
similarity index 55%
copy from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
copy to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
index c175cc6d..10c51708 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
@@ -15,13 +15,16 @@
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.aws2.s3;
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Optional;
-import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
+import org.apache.camel.component.aws2.s3.AWS2S3Constants;
+import org.apache.camel.component.cloudevents.CloudEvents;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
@@ -31,54 +34,35 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class JsonModelDataTypeTest {
+class AWS2S3CloudEventOutputTypeTest {
private final DefaultCamelContext camelContext = new DefaultCamelContext();
- private final JsonModelDataType dataType = new JsonModelDataType();
+ private final AWS2S3CloudEventOutputType outputType = new AWS2S3CloudEventOutputType();
@Test
- void shouldMapFromStringToJsonModel() throws Exception {
+ void shouldMapToCloudEvent() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.setProperty(JsonModelDataType.JSON_DATA_TYPE_KEY, Person.class.getName());
- exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}");
- dataType.convert(exchange);
-
- assertEquals(Person.class, exchange.getMessage().getBody().getClass());
- assertEquals("Sheldon", exchange.getMessage().getBody(Person.class).getName());
+ exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test1.txt");
+ exchange.getMessage().setHeader(AWS2S3Constants.BUCKET_NAME, "myBucket");
+ exchange.getMessage().setHeader(AWS2S3Constants.CONTENT_TYPE, "text/plain");
+ exchange.getMessage().setHeader(AWS2S3Constants.CONTENT_ENCODING, StandardCharsets.UTF_8.toString());
+ exchange.getMessage().setBody(new ByteArrayInputStream("Test1".getBytes(StandardCharsets.UTF_8)));
+ outputType.convert(exchange);
+
+ Assertions.assertTrue(exchange.getMessage().hasHeaders());
+ Assertions.assertFalse(exchange.getMessage().getHeaders().containsKey(AWS2S3Constants.KEY));
+ assertEquals("kamelet:aws-s3-source", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_TYPE));
+ assertEquals("test1.txt", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_SUBJECT));
+ assertEquals("myBucket", exchange.getMessage().getHeader(CloudEvents.CAMEL_CLOUD_EVENT_SOURCE));
}
@Test
public void shouldLookupDataType() throws Exception {
DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("jsonObject");
+ Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "cloudevents");
Assertions.assertTrue(converter.isPresent());
}
-
- public static class Person {
- @JsonProperty
- private String name;
-
- @JsonProperty
- private Long age;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public Long getAge() {
- return age;
- }
-
- public void setAge(Long age) {
- this.age = age;
- }
- }
-
}
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
index c175cc6d..d93da234 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
@@ -41,7 +41,7 @@ public class JsonModelDataTypeTest {
void shouldMapFromStringToJsonModel() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.setProperty(JsonModelDataType.JSON_DATA_TYPE_KEY, Person.class.getName());
+ exchange.setProperty(JsonModelDataType.DATA_TYPE_MODEL_PROPERTY, Person.class.getName());
exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}");
dataType.convert(exchange);
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/test/UppercaseDataType.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/test/UppercaseDataType.java
new file mode 100644
index 00000000..60604f73
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/test/UppercaseDataType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.test;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+@DataType(name = "uppercase")
+public class UppercaseDataType implements DataTypeConverter {
+
+ @Override
+ public void convert(Exchange exchange) {
+ exchange.getMessage().setBody(exchange.getMessage().getBody(String.class).toUpperCase());
+ }
+}
diff --git a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/DataTypeConverter b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/DataTypeConverter
new file mode 100644
index 00000000..bf3aaf0d
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/DataTypeConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.kamelets.utils.format.converter.test
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-lowercase b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-lowercase
new file mode 100644
index 00000000..b140a56b
--- /dev/null
+++ b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-lowercase
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.kamelets.utils.format.DataTypeProcessorTest$LowercaseDataType
\ No newline at end of file
diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
index a4e7a114..952ecfa1 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml
@@ -124,7 +124,7 @@ spec:
- key: format
value: '{{inputFormat}}'
- key: registry
- value: '{{dataTypeRegistry}}'
+ value: '#bean:{{dataTypeRegistry}}'
from:
uri: "kamelet:source"
steps:
diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
index a63af7dc..d937f6e5 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml
@@ -130,7 +130,7 @@ spec:
- key: format
value: '{{outputFormat}}'
- key: registry
- value: '{{dataTypeRegistry}}'
+ value: '#bean:{{dataTypeRegistry}}'
- name: renameHeaders
type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
property:
diff --git a/test/aws-s3/aws-s3-uri-binding.yaml b/test/aws-s3/aws-s3-uri-binding.yaml
index 50522818..14d420f9 100644
--- a/test/aws-s3/aws-s3-uri-binding.yaml
+++ b/test/aws-s3/aws-s3-uri-binding.yaml
@@ -28,7 +28,7 @@ spec:
properties:
bucketNameOrArn: ${aws.s3.bucketNameOrArn}
overrideEndpoint: true
- outputFormat: json
+ outputFormat: cloudevents
uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}
accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}