You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cd...@apache.org on 2023/05/26 08:32:09 UTC

[camel-kamelets] branch main updated: chore: Enhance Kamelet data type implementations

This is an automated email from the ASF dual-hosted git repository.

cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git


The following commit(s) were added to refs/heads/main by this push:
     new abf5c423 chore: Enhance Kamelet data type implementations
abf5c423 is described below

commit abf5c423eab77985827f8787fbb3f99ca5e101c3
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Wed May 24 20:14:04 2023 +0200

    chore: Enhance Kamelet data type implementations
    
    - Refactor existing data type converter names to align with respective mime type (e.g. application/json -> application-json, plain/text -> plain-text)
    - Add data type converter implementations for avro-binary, avro-x-struct, application-x-struct, application-json, application-x-java-object
    - Enhance Avro/Protobuf schema resolver to resolve schemas from given exchange property, content class, schema file classpath reference or explicit schema content
    - Add Json schema resolver
    - Add resolve-pojo-schema-action.kamelet to resolve either Json, Avro or Protobuf schemas delegating to respective schema resolver implementations
    - Use new schema resolver in avro/protobuf serialize/deserialize action Kamelets
---
 .github/workflows/yaks-tests.yaml                  |   3 +
 kamelets/avro-deserialize-action.kamelet.yaml      |  22 ++-
 kamelets/avro-serialize-action.kamelet.yaml        |  22 ++-
 kamelets/protobuf-deserialize-action.kamelet.yaml  |  15 +-
 kamelets/protobuf-serialize-action.kamelet.yaml    |  15 +-
 .../resolve-pojo-schema-action.kamelet.yaml        |  72 +++++-----
 .../utils/format/DefaultDataTypeRegistry.java      |   8 +-
 .../camel/kamelets/utils/format/MimeType.java      |  53 +++++++
 .../BinaryDataType.java => SchemaType.java}        |  37 +++--
 .../converter/avro/Avro.java}                      |  22 +--
 .../format/converter/avro/AvroBinaryDataType.java  |  81 +++++++++++
 .../format/converter/avro/AvroSchemaResolver.java  | 160 +++++++++++++++++++++
 .../format/converter/avro/AvroStructDataType.java  |  70 +++++++++
 .../converter/aws2/ddb/Ddb2JsonInputType.java      |   6 +-
 .../aws2/s3/AWS2S3CloudEventOutputType.java        |   2 +-
 .../ByteArrayDataType.java}                        |  15 +-
 .../converter/http/HttpCloudEventOutputType.java   |   2 +-
 .../StringDataType.java => json/Json.java}         |  21 +--
 .../utils/format/converter/json/JsonDataType.java  |  49 +++++++
 .../format/converter/json/JsonFormatSchema.java    |  22 +++
 .../format/converter/json/JsonSchemaResolver.java  | 155 ++++++++++++++++++++
 .../format/converter/json/JsonStructDataType.java  |  59 ++++++++
 .../format/converter/pojo/JavaObjectDataType.java  | 106 ++++++++++++++
 .../StringDataType.java => protobuf/Protobuf.java} |  21 +--
 .../converter/protobuf/ProtobufSchemaResolver.java | 159 ++++++++++++++++++++
 .../converter/standard/JsonModelDataType.java      |  97 -------------
 .../StringDataType.java}                           |  21 ++-
 .../utils/format/converter/utils/PojoHelper.java   |  89 ++++++++++++
 .../utils/format/converter/utils/SchemaHelper.java |  34 +++++
 .../format/schema/DelegatingSchemaResolver.java    | 119 +++++++++++++++
 .../InflightProtobufSchemaResolver.java            |  41 ------
 .../services/org/apache/camel/DataTypeConverter    |   6 +-
 .../{aws2-ddb-json => aws2-ddb-application-json}   |   0
 ...cloudevents => aws2-s3-application-cloudevents} |   0
 .../{camel-binary => camel-application-json}       |   2 +-
 ...l-jsonObject => camel-application-octet-stream} |   2 +-
 ...amel-binary => camel-application-x-java-object} |   2 +-
 .../{camel-binary => camel-application-x-struct}   |   2 +-
 .../converter/{camel-binary => camel-avro-binary}  |   2 +-
 .../{camel-binary => camel-avro-x-struct}          |   2 +-
 .../converter/{camel-string => camel-plain-text}   |   2 +-
 ...tp-cloudevents => http-application-cloudevents} |   0
 .../DefaultDataTypeConverterResolverTest.java      |   8 +-
 .../utils/format/DefaultDataTypeRegistryTest.java  |  16 +--
 .../converter/aws2/ddb/Ddb2JsonInputTypeTest.java  |  16 +--
 .../aws2/s3/AWS2S3CloudEventOutputTypeTest.java    |   2 +-
 .../http/HttpCloudEventOutputTypeTest.java         |   2 +-
 .../converter/standard/JsonModelDataTypeTest.java  | 102 -------------
 .../ByteArrayDataTypeTest.java}                    |   9 +-
 .../standard/{ => text}/StringDataTypeTest.java    |   5 +-
 .../utils/headers/DeDuplicateHeadersTest.java      |   9 +-
 .../utils/headers/DuplicateHeadersTest.java        |  10 +-
 .../camel/datatype/converter/camel-jsonObject      |  18 ---
 .../kamelets/avro-deserialize-action.kamelet.yaml  |  22 ++-
 .../kamelets/avro-serialize-action.kamelet.yaml    |  22 ++-
 .../protobuf-deserialize-action.kamelet.yaml       |  15 +-
 .../protobuf-serialize-action.kamelet.yaml         |  15 +-
 ...aml => resolve-pojo-schema-action.kamelet.yaml} |  72 +++++-----
 test/avro-data-type/README.md                      |  42 ++++++
 test/avro-data-type/User.avsc                      |  23 +++
 test/avro-data-type/avro-data-type.feature         |  22 +++
 .../avro-to-log-binding.yaml}                      |  28 ++--
 .../json-to-avro-binding.yaml}                     |  22 +--
 .../yaks-config.yaml}                              |  61 ++++----
 test/avro-serdes-action/README.md                  |  42 ++++++
 test/avro-serdes-action/User.avsc                  |  23 +++
 test/avro-serdes-action/avro-serdes-action.feature |  22 +++
 .../avro-to-log-binding.yaml}                      |  28 ++--
 .../json-to-avro-binding.yaml}                     |  16 ++-
 .../yaks-config.yaml}                              |  61 ++++----
 test/aws-s3/aws-s3-knative-broker.feature          |   2 +-
 test/aws-s3/aws-s3-knative-cloudevents.feature     |   2 +-
 test/aws-s3/aws-s3-to-http.yaml                    |   4 +-
 .../data-type-action/data-type-action-binding.yaml |   2 +-
 74 files changed, 1705 insertions(+), 656 deletions(-)

diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml
index 2d52f915..1994831a 100644
--- a/.github/workflows/yaks-tests.yaml
+++ b/.github/workflows/yaks-tests.yaml
@@ -91,6 +91,9 @@ jobs:
         yaks run test/aws-ddb-sink $YAKS_RUN_OPTIONS
         yaks run test/aws-s3 $YAKS_RUN_OPTIONS
         
+        yaks run test/avro-data-type $YAKS_RUN_OPTIONS
+        yaks run test/avro-serdes-action $YAKS_RUN_OPTIONS
+        
         yaks run test/extract-field-action $YAKS_RUN_OPTIONS
         yaks run test/insert-field-action $YAKS_RUN_OPTIONS
         
diff --git a/kamelets/avro-deserialize-action.kamelet.yaml b/kamelets/avro-deserialize-action.kamelet.yaml
index 3bec55f7..ebff6605 100644
--- a/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/kamelets/avro-deserialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
     title: "Avro Deserialize Action"
     description: "Deserialize payload to Avro"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -54,23 +52,21 @@ spec:
   - "camel:core"
   - "camel:jackson-avro"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+        property:
+          - key: validate
+            value: '{{validate}}'
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
-      - set-property:
-          name: validate
-          constant: "{{validate}}"
       - unmarshal:
           avro:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
-      - remove-property:
-          name: schema
-      - remove-property:
-          name: validate
+            schemaResolver: "#bean:{{schemaResolver}}"
       - remove-header:
           name: "Content-Type"
diff --git a/kamelets/avro-serialize-action.kamelet.yaml b/kamelets/avro-serialize-action.kamelet.yaml
index 06830b27..93a50270 100644
--- a/kamelets/avro-serialize-action.kamelet.yaml
+++ b/kamelets/avro-serialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
     title: "Avro Serialize Action"
     description: "Serialize payload to Avro"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -54,24 +52,22 @@ spec:
   - "camel:core"
   - "camel:jackson-avro"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+        property:
+          - key: validate
+            value: '{{validate}}'
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
-      - set-property:
-          name: validate
-          constant: "{{validate}}"
       - marshal:
           avro:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
-      - remove-property:
-          name: schema
-      - remove-property:
-          name: validate
+            schemaResolver: "#bean:{{schemaResolver}}"
       - set-header:
           name: "Content-Type"
           constant: "application/avro"
diff --git a/kamelets/protobuf-deserialize-action.kamelet.yaml b/kamelets/protobuf-deserialize-action.kamelet.yaml
index bf06dc4f..c0727a5f 100644
--- a/kamelets/protobuf-deserialize-action.kamelet.yaml
+++ b/kamelets/protobuf-deserialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
     title: "Protobuf Deserialize Action"
     description: "Deserialize payload to Protobuf"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -46,18 +44,19 @@ spec:
   - "camel:core"
   - "camel:jackson-protobuf"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+        property:
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
       - unmarshal:
           protobuf:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
-      - remove-property:
-          name: schema
+            schemaResolver: "#bean:{{schemaResolver}}"
       - remove-header:
           name: "Content-Type"
diff --git a/kamelets/protobuf-serialize-action.kamelet.yaml b/kamelets/protobuf-serialize-action.kamelet.yaml
index 56f321db..36218aec 100644
--- a/kamelets/protobuf-serialize-action.kamelet.yaml
+++ b/kamelets/protobuf-serialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
     title: "Protobuf Serialize Action"
     description: "Serialize payload to Protobuf"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -46,19 +44,20 @@ spec:
   - "camel:core"
   - "camel:jackson-protobuf"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+        property:
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
       - marshal:
           protobuf:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
-      - remove-property:
-          name: schema
+            schemaResolver: "#bean:{{schemaResolver}}"
       - set-header:
           name: "Content-Type"
           constant: "application/protobuf"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml b/kamelets/resolve-pojo-schema-action.kamelet.yaml
similarity index 75%
copy from library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
copy to kamelets/resolve-pojo-schema-action.kamelet.yaml
index 3bec55f7..3bafc090 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/kamelets/resolve-pojo-schema-action.kamelet.yaml
@@ -18,7 +18,7 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: Kamelet
 metadata:
-  name: avro-deserialize-action
+  name: resolve-pojo-schema-action
   annotations:
     camel.apache.org/kamelet.support.level: "Stable"
     camel.apache.org/catalog.version: "4.0.0-SNAPSHOT"
@@ -30,47 +30,47 @@ metadata:
     camel.apache.org/kamelet.type: "action"
 spec:
   definition:
-    title: "Avro Deserialize Action"
-    description: "Deserialize payload to Avro"
+    title: "Resolve Schema Action"
+    description: "Resolves schema from given mime type and payload. Sets the resolved schema, the schema type and its content class as properties for later reference."
     type: object
-    required:
-    - schema
     properties:
+      mimeType:
+        title: Mime Type
+        description: The mime type to determine the schema resolver implementation that should perform the operation.
+        type: string
+        default: "application/json"
+        example: "application/json"
       schema:
         title: Schema
-        description: The Avro schema to use during serialization (as single-line, using JSON format)
+        description: Optional schema content (as single-line, using JSON format).
+        type: string
+      contentClass:
+        title: Content Class
+        description: Type information of the content object. Fully qualified class name.
+        type: string
+        example: "org.apache.camel.content.Foo"
+      targetMimeType:
+        title: Target Mime Type
+        description: Additional mime type information used to determine the schema resolver. Usually only used in combination with mime type "application/x-java-object"
         type: string
-        example: '{"type": "record", "namespace": "com.example", "name": "FullName", "fields": [{"name": "first", "type": "string"},{"name": "last", "type": "string"}]}'
-      validate:
-        title: Validate
-        description: Indicates if the content must be validated against the schema
-        type: boolean
-        default: true
-        x-descriptors:
-        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        example: "application/json"
   dependencies:
-  - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
-  - "camel:kamelet"
-  - "camel:core"
-  - "camel:jackson-avro"
+    - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
+    - "camel:kamelet"
+    - "camel:core"
+    - "camel:jackson-avro"
+    - "camel:jackson-protobuf"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.schema.DelegatingSchemaResolver"
+        properties:
+          mimeType: '{{mimeType}}'
+          schema: '{{schema:}}'
+          contentClass: '{{contentClass:}}'
+          targetMimeType: '{{targetMimeType:}}'
     from:
-      uri: kamelet:source
+      uri: "kamelet:source"
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
-      - set-property:
-          name: validate
-          constant: "{{validate}}"
-      - unmarshal:
-          avro:
-            library: Jackson
-            unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
-      - remove-property:
-          name: schema
-      - remove-property:
-          name: validate
-      - remove-header:
-          name: "Content-Type"
+        - process:
+            ref: "{{schemaResolver}}"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
index 24c77b70..cb01358a 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
@@ -26,9 +26,8 @@ import java.util.Optional;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.StringDataType;
+import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType;
+import org.apache.camel.kamelets.utils.format.converter.text.StringDataType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverterResolver;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
@@ -105,9 +104,8 @@ public class DefaultDataTypeRegistry extends ServiceSupport implements DataTypeR
         if (classpathScan) {
             dataTypeLoaders.add(new AnnotationDataTypeLoader());
         } else if (useDefaultConverters) {
-            addDataTypeConverter(new BinaryDataType());
+            addDataTypeConverter(new ByteArrayDataType());
             addDataTypeConverter(new StringDataType());
-            addDataTypeConverter(new JsonModelDataType());
         }
 
         for (DataTypeLoader loader : dataTypeLoaders) {
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java
new file mode 100644
index 00000000..7c205439
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.util.Objects;
+
+public enum MimeType {
+    JSON("application/json"),
+    PROTOBUF("application/protobuf"),
+    AVRO("application/avro"),
+    AVRO_BINARY("avro/binary"),
+    AVRO_STRUCT("avro/x-struct"),
+    BINARY("application/octet-stream"),
+    TEXT("text/plain"),
+    JAVA_OBJECT("application/x-java-object"),
+    STRUCT("application/x-struct");
+
+    private static final MimeType[] VALUES = values();
+    private final String type;
+
+    MimeType(String type) {
+        this.type = type;
+    }
+
+    public String type() {
+        return type;
+    }
+
+    public static MimeType of(String type) {
+        for (MimeType mt : VALUES) {
+            if (Objects.equals(type, mt.type)) {
+                return mt;
+            }
+        }
+
+        throw new IllegalArgumentException("Unsupported type: " + type);
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java
similarity index 51%
copy from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
copy to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java
index 532e998b..10fa2c58 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java
@@ -15,24 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import java.util.Arrays;
+import java.util.Objects;
 
 /**
- * Binary data type.
+ * Supported schema type for Java object serialization/deserialization
  */
-@DataType(name = "binary", mediaType = "application/octet-stream")
-public class BinaryDataType implements DataTypeConverter {
+public enum SchemaType {
+    PROTOBUF("protobuf"),
+    AVRO("avsc"),
+    JSON("json");
 
-    private static final DataTypeConverter DELEGATE =
-            new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary", "application/octet-stream", byte[].class);
+    private static final SchemaType[] VALUES = values();
 
-    @Override
-    public void convert(Exchange exchange) {
-        DELEGATE.convert(exchange);
+    private final String schemaType;
+
+    SchemaType(String type) {
+        this.schemaType = type;
+    }
+
+    public String type() {
+        return schemaType;
+    }
+
+    public static SchemaType of(String type) {
+        return Arrays.stream(VALUES)
+                .filter(s -> Objects.equals(s.schemaType, type))
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(String.format("Unsupported schema type '%s'", type)));
     }
 }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java
similarity index 52%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java
index a75df4d1..cfadf972 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java
@@ -14,24 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.kamelets.utils.serialization;
 
-import com.fasterxml.jackson.core.FormatSchema;
-import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+package org.apache.camel.kamelets.utils.format.converter.avro;
 
-import org.apache.avro.Schema;
-import org.apache.camel.Exchange;
-import org.apache.camel.component.jackson.SchemaResolver;
+import com.fasterxml.jackson.dataformat.avro.AvroMapper;
 
-public class InflightAvroSchemaResolver implements SchemaResolver {
+public final class Avro {
 
-    @Override
-    public FormatSchema resolve(Exchange exchange) {
-        String schemaJson = (String) exchange.getProperty("schema");
-        Boolean validate = Boolean.valueOf((String) exchange.getProperty("validate"));
-        Schema raw = new Schema.Parser().setValidate(validate).parse(schemaJson);
-        AvroSchema schema = new AvroSchema(raw);
-        return schema;
-    }
+    public static final AvroMapper MAPPER = new AvroMapper();
 
+    private Avro() {
+        // prevent instantiation of utility class
+    }
 }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java
new file mode 100644
index 00000000..465da568
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Jackson Avro data format to marshal given JsonNode in Exchange body to a binary (byte array) representation.
+ * Uses given Avro schema from the Exchange properties when marshalling the payload (usually already resolved via schema
+ * resolver Kamelet action).
+ */
+@DataType(name = "avro-binary", mediaType = "avro/binary")
+public class AvroBinaryDataType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        AvroSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+        if (schema == null) {
+            throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange);
+        }
+
+        try {
+            byte[] marshalled = Avro.MAPPER.writer().forType(JsonNode.class).with(schema)
+                    .writeValueAsBytes(getBodyAsJsonNode(exchange, schema));
+            exchange.getMessage().setBody(marshalled);
+
+            exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.AVRO_BINARY.type());
+            exchange.getMessage().setHeader(SchemaHelper.CONTENT_SCHEMA,
+                    exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, "", String.class));
+        } catch (InvalidPayloadException | IOException e) {
+            throw new CamelExecutionException("Failed to apply Avro binary data type on exchange", exchange, e);
+        }
+    }
+
+    private JsonNode getBodyAsJsonNode(Exchange exchange, AvroSchema schema) throws InvalidPayloadException, IOException {
+        if (exchange.getMessage().getBody() instanceof  JsonNode) {
+            return (JsonNode) exchange.getMessage().getBody();
+        }
+
+        return Avro.MAPPER.reader().forType(JsonNode.class).with(schema)
+                .readValue(getBodyAsStream(exchange));
+    }
+
+    private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+        InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+        if (bodyStream == null) {
+            bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+        }
+
+        return bodyStream;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java
new file mode 100644
index 00000000..e6a71ad0
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.avro;
+
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import org.apache.avro.Schema;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jackson.SchemaResolver;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.spi.Resource;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.util.ObjectHelper;
+
+public class AvroSchemaResolver implements SchemaResolver, Processor {
+    private final ConcurrentMap<String, AvroSchema> schemes;
+
+    private AvroSchema schema;
+    private String contentClass;
+
+    private boolean validate = true;
+
+    public AvroSchemaResolver() {
+        this.schemes = new ConcurrentHashMap<>();
+    }
+
+    public String getSchema() {
+        if (this.schema != null) {
+            return this.schema.getAvroSchema().toString();
+        }
+
+        return null;
+    }
+
+    public void setSchema(String schema) {
+        if (ObjectHelper.isNotEmpty(schema)) {
+            this.schema = new AvroSchema(new Schema.Parser().setValidate(validate).parse(schema));
+        } else {
+            this.schema = null;
+        }
+    }
+
+    public boolean isValidate() {
+        return validate;
+    }
+
+    public void setValidate(boolean validate) {
+        this.validate = validate;
+    }
+
+    public String getContentClass() {
+        return contentClass;
+    }
+
+    public void setContentClass(String contentClass) {
+        if (ObjectHelper.isNotEmpty(contentClass)) {
+            this.contentClass = contentClass;
+        } else {
+            this.contentClass = null;
+        }
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Object payload = exchange.getMessage().getBody();
+        if (payload == null) {
+            return;
+        }
+
+        AvroSchema answer = computeIfAbsent(exchange);
+
+        if (answer != null) {
+            exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer);
+            exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.AVRO.type());
+            exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass));
+        }
+    }
+
+    @Override
+    public FormatSchema resolve(Exchange exchange) {
+        AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+        if (answer == null) {
+            answer = computeIfAbsent(exchange);
+        }
+
+        return answer;
+    }
+
+    private AvroSchema computeIfAbsent(Exchange exchange) {
+         if (this.schema != null) {
+            return this.schema;
+         }
+
+        AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+        if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) {
+            String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class);
+            Schema raw = new Schema.Parser().setValidate(validate).parse(schemaJson);
+            answer = new AvroSchema(raw);
+        }
+
+        if (answer == null) {
+            String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass);
+            if (contentClass != null) {
+                answer = this.schemes.computeIfAbsent(contentClass, t -> {
+                    Resource res = PluginHelper.getResourceLoader(exchange.getContext())
+                            .resolveResource("classpath:schemas/" + SchemaType.AVRO.type() + "/" + t + "." + SchemaType.AVRO.type());
+
+                    try {
+                        if (res.exists()) {
+                            try (InputStream is = res.getInputStream()) {
+                                if (is != null) {
+                                    return Avro.MAPPER.schemaFrom(is);
+                                }
+                            }
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(
+                                "Unable to load Avro schema for type: " + t + ", resource: " + res.getLocation(), e);
+                    }
+
+                    try {
+                        return Avro.MAPPER.schemaFor(Class.forName(contentClass));
+                    } catch (JsonMappingException | ClassNotFoundException e) {
+                        throw new RuntimeException(
+                                "Unable to compute Avro schema for type: " + t, e);
+                    }
+                });
+            }
+        }
+
+        if (answer != null) {
+            this.schema = answer;
+        }
+
+        return answer;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java
new file mode 100644
index 00000000..c1997b04
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Avro Jackson data format to unmarshal Exchange body to generic JsonNode.
+ * Uses given Avro schema from the Exchange properties when unmarshalling the payload (usually already resolved via schema
+ * resolver Kamelet action).
+ */
+@DataType(name = "avro-x-struct", mediaType = "application/x-struct")
+public class AvroStructDataType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        AvroSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+        if (schema == null) {
+            throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange);
+        }
+
+        try {
+            Object unmarshalled = Avro.MAPPER.reader().forType(JsonNode.class).with(schema)
+                    .readValue(getBodyAsStream(exchange));
+            exchange.getMessage().setBody(unmarshalled);
+
+            exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type());
+        } catch (InvalidPayloadException | IOException e) {
+            throw new CamelExecutionException("Failed to apply Avro x-struct data type on exchange", exchange, e);
+        }
+    }
+
+    private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+        InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+        if (bodyStream == null) {
+            bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+        }
+
+        return bodyStream;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
index 471e569f..0f6e979e 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
@@ -27,12 +27,12 @@ import java.util.stream.Stream;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.aws2.ddb.Ddb2Constants;
 import org.apache.camel.component.aws2.ddb.Ddb2Operations;
 import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.kamelets.utils.format.converter.json.Json;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
 import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
@@ -77,10 +77,10 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
  * In case key and item attribute value maps are identical you can omit the special top level properties completely. The
  * converter will map the whole Json body as is then and use it as source for the attribute value map.
  */
-@DataType(scheme = "aws2-ddb", name = "json", mediaType = "application/json")
+@DataType(scheme = "aws2-ddb", name = "application-json", mediaType = "application/json")
 public class Ddb2JsonInputType implements DataTypeConverter {
 
-    private final JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), JsonNode.class);
+    private final JacksonDataFormat dataFormat = new JacksonDataFormat(Json.MAPPER, JsonNode.class);
 
     @Override
     public void convert(Exchange exchange) {
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
index d8847886..241840e8 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
@@ -29,7 +29,7 @@ import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
  * Output data type represents AWS S3 get object response as CloudEvent V1. The data type sets Camel specific
  * CloudEvent headers on the exchange.
  */
-@DataType(scheme = "aws2-s3", name = "cloudevents", mediaType = "application/octet-stream")
+@DataType(scheme = "aws2-s3", name = "application-cloudevents", mediaType = "application/octet-stream")
 public class AWS2S3CloudEventOutputType implements DataTypeConverter {
 
     @Override
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java
similarity index 64%
copy from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
copy to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java
index d60b2aaa..cdacabbd 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java
@@ -15,24 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.bytes;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
+import org.apache.camel.kamelets.utils.format.MimeType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
 
 /**
- * String data type.
+ * Generic binary data type uses Camel message body converter mechanism to convert content to byte array representation.
  */
-@DataType(name = "string", mediaType = "text/plain")
-public class StringDataType implements DataTypeConverter {
+@DataType(name = "application-octet-stream", mediaType = "application/octet-stream")
+public class ByteArrayDataType implements DataTypeConverter {
 
-    private static final DataTypeConverter DELEGATE =
-            new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", "text/plain", String.class);
+    private static final DataTypeConverter DELEGATE = new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary",
+            MimeType.BINARY.type(), byte[].class);
 
     @Override
     public void convert(Exchange exchange) {
         DELEGATE.convert(exchange);
+
+        exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.BINARY.type());
     }
 }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
index bbad4637..05e937ff 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
@@ -30,7 +30,7 @@ import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
  *
  * By default, sets the Http content type header to application/json when not set explicitly.
  */
-@DataType(scheme = "http", name = "cloudevents", mediaType = "application/json")
+@DataType(scheme = "http", name = "application-cloudevents", mediaType = "application/json")
 public class HttpCloudEventOutputType implements DataTypeConverter {
 
     @Override
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java
similarity index 53%
copy from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
copy to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java
index d60b2aaa..cbcbe57c 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java
@@ -15,24 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.json;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
-/**
- * String data type.
- */
-@DataType(name = "string", mediaType = "text/plain")
-public class StringDataType implements DataTypeConverter {
+public final class Json {
 
-    private static final DataTypeConverter DELEGATE =
-            new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", "text/plain", String.class);
+    public static final ObjectMapper MAPPER = new ObjectMapper();
 
-    @Override
-    public void convert(Exchange exchange) {
-        DELEGATE.convert(exchange);
+    private Json() {
+        // prevent instantiation of utility class
     }
 }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java
new file mode 100644
index 00000000..42a20a2a
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Jackson data format to marshal given Exchange payload to a Json (binary byte array representation).
+ * Requires Exchange payload as JsonNode representation.
+ */
+@DataType(name = "application-json", mediaType = "application/json")
+public class JsonDataType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        try {
+            byte[] marshalled = Json.MAPPER.writer().forType(JsonNode.class).writeValueAsBytes(exchange.getMessage().getBody());
+            exchange.getMessage().setBody(marshalled);
+
+            exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.JSON.type());
+            exchange.getMessage().setHeader(SchemaHelper.CONTENT_SCHEMA,
+                    exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, "", String.class));
+        } catch (JsonProcessingException e) {
+            throw new CamelExecutionException("Failed to apply Json output data type on exchange", exchange, e);
+        }
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java
new file mode 100644
index 00000000..cea76aa6
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java
@@ -0,0 +1,22 @@
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+
+public class JsonFormatSchema implements FormatSchema {
+    private final JsonNode schema;
+
+    public JsonFormatSchema(JsonNode schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public String getSchemaType() {
+        return SchemaType.JSON.type();
+    }
+
+    public JsonNode getSchema() {
+        return schema;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java
new file mode 100644
index 00000000..74a42a33
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jackson.SchemaResolver;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.spi.Resource;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.util.ObjectHelper;
+
+public class JsonSchemaResolver implements SchemaResolver, Processor {
+    private final ConcurrentMap<String, JsonNode> schemes;
+
+    private JsonNode schema;
+    private String contentClass;
+
+    public JsonSchemaResolver() {
+        this.schemes = new ConcurrentHashMap<>();
+    }
+
+    public String getSchema() {
+        if (this.schema != null) {
+            try {
+                return Json.MAPPER.writeValueAsString(this.schema);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        return null;
+    }
+
+    public void setSchema(String schema) {
+        if (ObjectHelper.isNotEmpty(schema)) {
+            try {
+                this.schema = Json.MAPPER.readTree(schema);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            this.schema = null;
+        }
+    }
+
+    public String getContentClass() {
+        return contentClass;
+    }
+
+    public void setContentClass(String contentClass) {
+        if (ObjectHelper.isNotEmpty(contentClass)) {
+            this.contentClass = contentClass;
+        } else {
+            this.contentClass = null;
+        }
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Object payload = exchange.getMessage().getBody();
+        if (payload == null) {
+            return;
+        }
+
+        JsonNode answer = computeIfAbsent(exchange);
+
+        if (answer != null) {
+            exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer);
+            exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.JSON.type());
+            exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass));
+        }
+    }
+
+    private JsonNode computeIfAbsent(Exchange exchange) {
+        if (this.schema != null) {
+            return this.schema;
+        }
+
+        JsonNode answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, JsonNode.class);
+
+        if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) {
+            String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class);
+            try {
+                answer = Json.MAPPER.readTree(schemaJson);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException("Unable to load Json schema", e);
+            }
+        }
+
+        if (answer == null) {
+            String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass);
+            if (contentClass != null) {
+                answer = this.schemes.computeIfAbsent(contentClass, t -> {
+                    Resource res = PluginHelper.getResourceLoader(exchange.getContext())
+                            .resolveResource("classpath:schemas/" + SchemaType.JSON.type() + "/" + t + "." + SchemaType.JSON.type());
+
+                    try {
+                        if (res.exists()) {
+                            try (InputStream is = res.getInputStream()) {
+                                if (is != null) {
+                                    return Json.MAPPER.readTree(is);
+                                }
+                            }
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(
+                                "Unable to load Json schema for type: " + t + ", resource: " + res.getLocation(), e);
+                    }
+
+                    return null;
+                });
+            }
+        }
+
+        if (answer != null) {
+            this.schema = answer;
+        }
+
+        return answer;
+    }
+
+    @Override
+    public FormatSchema resolve(Exchange exchange) {
+        JsonNode answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, JsonNode.class);
+        if (answer == null) {
+            answer = computeIfAbsent(exchange);
+        }
+
+        return new JsonFormatSchema(answer);
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java
new file mode 100644
index 00000000..56543746
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Jackson data format to unmarshal Exchange body to generic JsonNode representation.
+ */
+@DataType(name = "application-x-struct", mediaType = "application/x-struct")
+public class JsonStructDataType implements DataTypeConverter {
+
+    @Override
+    public void convert(Exchange exchange) {
+        try {
+            Object unmarshalled = Json.MAPPER.reader().forType(JsonNode.class).readValue(getBodyAsStream(exchange));
+            exchange.getMessage().setBody(unmarshalled);
+
+            exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type());
+        } catch (InvalidPayloadException | IOException e) {
+            throw new CamelExecutionException("Failed to apply Json input data type on exchange", exchange, e);
+        }
+    }
+
+    private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+        InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+        if (bodyStream == null) {
+            bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+        }
+
+        return bodyStream;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java
new file mode 100644
index 00000000..1fcbc869
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.pojo;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.avro.Avro;
+import org.apache.camel.kamelets.utils.format.converter.json.Json;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Data type able to unmarshal Exchange body to Java object. Supports both Avro and Json schema types and uses respective
+ * Jackson implementation for the unmarshal operation.
+ * Requires proper setting of content schema, class and schema type in Exchange properties
+ * (usually resolved via Avro or Json schema resolver Kamelet action).
+ */
+@DataType(name = "application-x-java-object", mediaType = "application/x-java-object")
+public class JavaObjectDataType implements DataTypeConverter, CamelContextAware {
+
+    private CamelContext camelContext;
+
+    @Override
+    public void convert(Exchange exchange) {
+        ObjectHelper.notNull(camelContext, "camelContext");
+
+        FormatSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, FormatSchema.class);
+        if (schema == null) {
+            throw new CamelExecutionException("Missing proper schema for Java object data type processing", exchange);
+        }
+
+        String contentClass = SchemaHelper.resolveContentClass(exchange, null);
+        if (contentClass == null) {
+            throw new CamelExecutionException("Missing content class information for Java object data type processing",
+                    exchange);
+        }
+
+        SchemaType schemaType = SchemaType.of(exchange.getProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.JSON.type(), String.class));
+
+        try {
+            Class<?> contentType = camelContext.getClassResolver().resolveMandatoryClass(contentClass);
+            Object unmarshalled;
+
+            if (schemaType == SchemaType.AVRO) {
+                unmarshalled = Avro.MAPPER.reader().forType(contentType).with(schema).readValue(getBodyAsStream(exchange));
+            } else if (schemaType == SchemaType.JSON) {
+                unmarshalled = Json.MAPPER.reader().forType(contentType).with(schema).readValue(getBodyAsStream(exchange));
+            } else {
+                throw new CamelExecutionException(String.format("Unsupported schema type '%s'", schemaType), exchange);
+            }
+
+            exchange.getMessage().setBody(unmarshalled);
+
+            exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type());
+        } catch (InvalidPayloadException | IOException | ClassNotFoundException e) {
+            throw new CamelExecutionException("Failed to apply Java object data type on exchange", exchange, e);
+        }
+    }
+
+    private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+        InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+        if (bodyStream == null) {
+            bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+        }
+
+        return bodyStream;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java
similarity index 53%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java
index d60b2aaa..1bb5d1c1 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java
@@ -15,24 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.protobuf;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import com.fasterxml.jackson.dataformat.protobuf.ProtobufMapper;
 
-/**
- * String data type.
- */
-@DataType(name = "string", mediaType = "text/plain")
-public class StringDataType implements DataTypeConverter {
+public final class Protobuf {
 
-    private static final DataTypeConverter DELEGATE =
-            new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", "text/plain", String.class);
+    public static final ProtobufMapper MAPPER = new ProtobufMapper();
 
-    @Override
-    public void convert(Exchange exchange) {
-        DELEGATE.convert(exchange);
+    private Protobuf() {
+        // prevent instantiation of utility class
     }
 }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java
new file mode 100644
index 00000000..9fba1091
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.protobuf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchema;
+import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchemaLoader;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.jackson.SchemaResolver;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.spi.Resource;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.util.ObjectHelper;
+
+public class ProtobufSchemaResolver implements SchemaResolver, Processor {
+    private final ConcurrentMap<String, ProtobufSchema> schemes;
+
+    private ProtobufSchema schema;
+    private String contentClass;
+
+    public ProtobufSchemaResolver() {
+        this.schemes = new ConcurrentHashMap<>();
+    }
+
+    public String getSchema() {
+        if (this.schema != null) {
+            return this.schema.getSource().toString();
+        }
+
+        return null;
+    }
+
+    public void setSchema(String schema) {
+        if (ObjectHelper.isNotEmpty(schema)) {
+            try {
+                this.schema = ProtobufSchemaLoader.std.parse(schema);
+            } catch (IOException e) {
+                throw new RuntimeCamelException("Cannot parse protobuf schema", e);
+            }
+        } else {
+            this.schema = null;
+        }
+    }
+
+    public String getContentClass() {
+        return contentClass;
+    }
+
+    public void setContentClass(String contentClass) {
+        if (ObjectHelper.isNotEmpty(contentClass)) {
+            this.contentClass = contentClass;
+        } else {
+            this.contentClass = null;
+        }
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Object payload = exchange.getMessage().getBody();
+        if (payload == null) {
+            return;
+        }
+
+        ProtobufSchema answer = computeIfAbsent(exchange);
+
+        if (answer != null) {
+            exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer);
+            exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.PROTOBUF.type());
+            exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass));
+        }
+    }
+
+    @Override
+    public FormatSchema resolve(Exchange exchange) {
+        ProtobufSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, ProtobufSchema.class);
+        if (answer == null) {
+            answer = computeIfAbsent(exchange);
+        }
+
+        return answer;
+    }
+
+    private ProtobufSchema computeIfAbsent(Exchange exchange) {
+         if (this.schema != null) {
+            return this.schema;
+         }
+
+        ProtobufSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, ProtobufSchema.class);
+
+        if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) {
+            String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class);
+            try {
+                answer = ProtobufSchemaLoader.std.parse(schemaJson);
+            } catch (IOException e) {
+                throw new RuntimeException("Unable to parse Protobuf schema", e);
+            }
+        }
+
+        if (answer == null) {
+            String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass);
+            if (contentClass != null) {
+                answer = this.schemes.computeIfAbsent(contentClass, t -> {
+                    Resource res = PluginHelper.getResourceLoader(exchange.getContext())
+                            .resolveResource("classpath:schemas/" + SchemaType.AVRO.type() + "/" + t + "." + SchemaType.AVRO.type());
+
+                    try {
+                        if (res.exists()) {
+                            try (InputStream is = res.getInputStream()) {
+                                if (is != null) {
+                                    return Protobuf.MAPPER.schemaLoader().load(is);
+                                }
+                            }
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(
+                                "Unable to load Protobuf schema for type: " + t + ", resource: " + res.getLocation(), e);
+                    }
+
+                    try {
+                        return Protobuf.MAPPER.generateSchemaFor(Class.forName(contentClass));
+                    } catch (JsonMappingException | ClassNotFoundException e) {
+                        throw new RuntimeException(
+                                "Unable to compute Protobuf schema for type: " + t, e);
+                    }
+                });
+            }
+        }
+
+        if (answer != null) {
+            this.schema = answer;
+        }
+
+        return answer;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
deleted file mode 100644
index 183f1112..00000000
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kamelets.utils.format.converter.standard;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.Exchange;
-import org.apache.camel.InvalidPayloadException;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
-import org.apache.camel.util.ObjectHelper;
-
-/**
- * Data type converter able to unmarshal to given unmarshalType using jackson data format.
- * <p/>
- * Unmarshal type should be given as a fully qualified class name in the exchange properties.
- */
-@DataType(name = "jsonObject", mediaType = "application/json")
-public class JsonModelDataType implements DataTypeConverter, CamelContextAware {
-
-    public static final String DATA_TYPE_MODEL_PROPERTY = "CamelDataTypeModel";
-
-    private String model;
-
-    private CamelContext camelContext;
-
-    private static final ObjectMapper mapper = new ObjectMapper();
-
-    @Override
-    public void convert(Exchange exchange) {
-        String type;
-        if (exchange.hasProperties() && exchange.getProperties().containsKey(DATA_TYPE_MODEL_PROPERTY)) {
-            type = exchange.getProperty(DATA_TYPE_MODEL_PROPERTY, String.class);
-        } else {
-            type = model;
-        }
-
-        if (type == null) {
-            // neither model property nor exchange property defines proper type - do nothing
-            return;
-        }
-
-        ObjectHelper.notNull(camelContext, "camelContext");
-
-        try {
-            Object unmarshalled = mapper.reader().forType(camelContext.getClassResolver().resolveMandatoryClass(type)).readValue(getBodyAsStream(exchange));
-            exchange.getMessage().setBody(unmarshalled);
-        } catch (Exception e) {
-            throw new CamelExecutionException(
-                    String.format("Failed to load Json unmarshalling type '%s'", type), exchange, e);
-        }
-    }
-
-    private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
-        InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
-
-        if (bodyStream == null) {
-            bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
-        }
-
-        return bodyStream;
-    }
-
-    @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    public void setModel(String model) {
-        this.model = model;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java
similarity index 57%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java
index 532e998b..9569e1d1 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java
@@ -15,24 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.text;
+
+import java.nio.charset.StandardCharsets;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
+import org.apache.camel.kamelets.utils.format.MimeType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
 
 /**
- * Binary data type.
+ * Generic String data type converts Exchange payload to String representation using the Camel message body converter mechanism.
+ * By default, uses UTF-8 charset as encoding.
  */
-@DataType(name = "binary", mediaType = "application/octet-stream")
-public class BinaryDataType implements DataTypeConverter {
+@DataType(name = "text-plain", mediaType = "text/plain")
+public class StringDataType implements DataTypeConverter {
 
-    private static final DataTypeConverter DELEGATE =
-            new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary", "application/octet-stream", byte[].class);
+    private static final DataTypeConverter DELEGATE = new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string",
+            MimeType.TEXT.type(), String.class);
 
     @Override
     public void convert(Exchange exchange) {
+        exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, StandardCharsets.UTF_8.name());
+
         DELEGATE.convert(exchange);
+
+        exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.TEXT.type());
     }
 }
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java
new file mode 100644
index 00000000..17f22280
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.utils;
+
+import java.util.Objects;
+
+import org.apache.camel.Exchange;
+
+public final class PojoHelper {
+    private PojoHelper() {
+    }
+
+    public static boolean isString(Class<?> type) {
+        return String.class.isAssignableFrom(type);
+    }
+
+    public static boolean isNumber(Class<?> type) {
+        return Number.class.isAssignableFrom(type)
+                || int.class.isAssignableFrom(type)
+                || long.class.isAssignableFrom(type)
+                || short.class.isAssignableFrom(type)
+                || char.class.isAssignableFrom(type)
+                || float.class.isAssignableFrom(type)
+                || double.class.isAssignableFrom(type);
+    }
+
+    public static boolean isPrimitive(Class<?> type) {
+        return type.isPrimitive()
+                || (type.isArray() && type.getComponentType().isPrimitive())
+                || char.class.isAssignableFrom(type) || Character.class.isAssignableFrom(type)
+                || byte.class.isAssignableFrom(type) || Byte.class.isAssignableFrom(type)
+                || boolean.class.isAssignableFrom(type) || Boolean.class.isAssignableFrom(type);
+    }
+
+    public static boolean isPojo(Class<?> type) {
+        Package pkg = type.getPackage();
+        if (pkg != null) {
+            if (pkg.getName().startsWith("java")
+                    || pkg.getName().startsWith("javax")
+                    || pkg.getName().startsWith("com.sun")
+                    || pkg.getName().startsWith("com.oracle")) {
+                return false;
+            }
+        }
+
+        if (isNumber(type)) {
+            return false;
+        }
+        if (isPrimitive(type)) {
+            return false;
+        }
+        if (isString(type)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public static boolean hasProperty(Exchange exchange, String name) {
+        return exchange.getProperties().containsKey(name);
+    }
+
+    public static boolean hasProperty(Exchange exchange, String name, Object value) {
+        return Objects.equals(
+                value,
+                exchange.getProperty(name, value.getClass()));
+    }
+
+    public static boolean hasHeader(Exchange exchange, String name, Object value) {
+        return Objects.equals(
+                value,
+                exchange.getMessage().getHeader(name, value.getClass()));
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java
new file mode 100644
index 00000000..87dea3f6
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java
@@ -0,0 +1,34 @@
+package org.apache.camel.kamelets.utils.format.converter.utils;
+
+import org.apache.camel.Exchange;
+
+public class SchemaHelper {
+
+    public static final String SCHEMA = "schema";
+    public static final String VALIDATE = "validate";
+    public static final String CONTENT_SCHEMA = "X-Content-Schema";
+    public static final String CONTENT_SCHEMA_TYPE = "X-Content-Schema-Type";
+    public static final String CONTENT_CLASS = "X-Content-Class";
+
+    private SchemaHelper() {
+    }
+
+    /**
+     * Helper resolves content class from exchange properties and as a fallback tries to retrieve the content class
+     * from the payload body type.
+     * @param exchange the Camel exchange eventually holding content class information in its properties.
+     * @param fallback the fallback content class information when no exchange property is set.
+     * @return the content class as String representation.
+     */
+    public static String resolveContentClass(Exchange exchange, String fallback) {
+        String contentClass = exchange.getProperty(CONTENT_CLASS, fallback, String.class);
+        if (contentClass == null) {
+            Object payload = exchange.getMessage().getBody();
+            if (payload != null && PojoHelper.isPojo(payload.getClass())) {
+                contentClass = payload.getClass().getName();
+            }
+        }
+
+        return contentClass;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java
new file mode 100644
index 00000000..26b541ee
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.schema;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver;
+import org.apache.camel.kamelets.utils.format.converter.json.JsonSchemaResolver;
+import org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Schema resolver processor delegates to either Avro or Json schema resolver based on the given mimetype property.
+ * When mimetype is of type application/x-java-object uses additional target mimetype (usually the produces mimetype) to
+ * determine the schema resolver (Avro or Json).
+ * Delegates to schema resolver and sets proper content class and schema properties on the delegate.
+ */
+public class DelegatingSchemaResolver implements Processor {
+    private String mimeType;
+    private String targetMimeType;
+
+    private String schema;
+    private String contentClass;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (ObjectHelper.isEmpty(mimeType)) {
+            return;
+        }
+
+        MimeType mimeType = MimeType.of(this.mimeType);
+        Processor resolver;
+        if (mimeType.equals(MimeType.JAVA_OBJECT)) {
+            if (ObjectHelper.isEmpty(targetMimeType)) {
+                return;
+            }
+            resolver = fromMimeType(MimeType.of(targetMimeType));
+        } else {
+            resolver = fromMimeType(mimeType);
+        }
+
+        if (resolver != null) {
+            resolver.process(exchange);
+        }
+    }
+
+    private Processor fromMimeType(MimeType mimeType) {
+        switch (mimeType) {
+            case PROTOBUF:
+                ProtobufSchemaResolver protobufSchemaResolver = new ProtobufSchemaResolver();
+                protobufSchemaResolver.setSchema(this.schema);
+                protobufSchemaResolver.setContentClass(this.contentClass);
+                return protobufSchemaResolver;
+            case AVRO:
+            case AVRO_BINARY:
+            case AVRO_STRUCT:
+                AvroSchemaResolver avroSchemaResolver = new AvroSchemaResolver();
+                avroSchemaResolver.setSchema(this.schema);
+                avroSchemaResolver.setContentClass(this.contentClass);
+                return avroSchemaResolver;
+            case JSON:
+            case STRUCT:
+                JsonSchemaResolver jsonSchemaResolver = new JsonSchemaResolver();
+                jsonSchemaResolver.setSchema(this.schema);
+                jsonSchemaResolver.setContentClass(this.contentClass);
+                return jsonSchemaResolver;
+            default:
+                return null;
+        }
+    }
+
+    public String getMimeType() {
+        return mimeType;
+    }
+
+    public void setMimeType(String mimeType) {
+        this.mimeType = mimeType;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+
+    public String getContentClass() {
+        return contentClass;
+    }
+
+    public void setContentClass(String contentClass) {
+        this.contentClass = contentClass;
+    }
+
+    public String getTargetMimeType() {
+        return targetMimeType;
+    }
+
+    public void setTargetMimeType(String targetMimeType) {
+        this.targetMimeType = targetMimeType;
+    }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java
deleted file mode 100644
index 4d09f9d5..00000000
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kamelets.utils.serialization;
-
-import java.io.IOException;
-
-import com.fasterxml.jackson.core.FormatSchema;
-import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchema;
-import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchemaLoader;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.jackson.SchemaResolver;
-
-public class InflightProtobufSchemaResolver implements SchemaResolver {
-
-    @Override
-    public FormatSchema resolve(Exchange exchange) {
-        String schemaStr = (String) exchange.getProperty("schema");
-        try {
-            ProtobufSchema schema = ProtobufSchemaLoader.std.parse(schemaStr);
-            return schema;
-        } catch(IOException e) {
-            throw new RuntimeCamelException("Cannot parse protobuf schema", e);
-        }
-    }
-}
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
index 1cd0ed78..96cc4b09 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
@@ -15,7 +15,11 @@
 # limitations under the License.
 #
 
-org.apache.camel.kamelets.utils.format.converter.standard
+org.apache.camel.kamelets.utils.format.converter.bytes
+org.apache.camel.kamelets.utils.format.converter.text
+org.apache.camel.kamelets.utils.format.converter.pojo
+org.apache.camel.kamelets.utils.format.converter.avro
+org.apache.camel.kamelets.utils.format.converter.json
 org.apache.camel.kamelets.utils.format.converter.aws2.s3
 org.apache.camel.kamelets.utils.format.converter.aws2.ddb
 org.apache.camel.kamelets.utils.format.converter.http
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-application-json
similarity index 100%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-application-json
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-application-cloudevents
similarity index 100%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-application-cloudevents
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json
index edf9a4ca..18aebea0 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.json.JsonDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream
similarity index 90%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream
index 2f725f6a..520b0341 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object
index edf9a4ca..3f807983 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.pojo.JavaObjectDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct
index edf9a4ca..fef2aab8 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.json.JsonStructDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary
index edf9a4ca..69fc5309 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.avro.AvroBinaryDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct
similarity index 90%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct
index edf9a4ca..070e3147 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.avro.AvroStructDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text
similarity index 90%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text
index 8ef25725..3233f427 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.kamelets.utils.format.converter.standard.StringDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.text.StringDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-application-cloudevents
similarity index 100%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-application-cloudevents
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
index b281f314..341ad05d 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
@@ -21,7 +21,7 @@ import java.util.Optional;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
+import org.apache.camel.kamelets.utils.format.converter.json.JsonStructDataType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -49,9 +49,9 @@ class DefaultDataTypeConverterResolverTest {
 
     @Test
     public void shouldResolveDataTypeConverters() throws Exception {
-        Optional<DataTypeConverter> converter = resolver.resolve("jsonObject", camelContext);
+        Optional<DataTypeConverter> converter = resolver.resolve("application-x-struct", camelContext);
         Assertions.assertTrue(converter.isPresent());
-        Assertions.assertEquals(JsonModelDataType.class, converter.get().getClass());
+        Assertions.assertEquals(JsonStructDataType.class, converter.get().getClass());
 
         converter = resolver.resolve("foo", "json", camelContext);
         Assertions.assertTrue(converter.isPresent());
@@ -73,4 +73,4 @@ class DefaultDataTypeConverterResolverTest {
             return "foo";
         }
     }
-}
\ No newline at end of file
+}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
index d83c474b..9061c294 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
@@ -21,9 +21,8 @@ import java.util.Optional;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.StringDataType;
+import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType;
+import org.apache.camel.kamelets.utils.format.converter.text.StringDataType;
 import org.apache.camel.kamelets.utils.format.converter.test.UppercaseDataType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.junit.jupiter.api.Assertions;
@@ -41,15 +40,12 @@ class DefaultDataTypeRegistryTest {
 
     @Test
     public void shouldLookupDefaultDataTypeConverters() throws Exception {
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "jsonObject");
-        Assertions.assertTrue(converter.isPresent());
-        Assertions.assertEquals(JsonModelDataType.class, converter.get().getClass());
-        converter = dataTypeRegistry.lookup( "string");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "plain-text");
         Assertions.assertTrue(converter.isPresent());
         Assertions.assertEquals(StringDataType.class, converter.get().getClass());
-        converter = dataTypeRegistry.lookup( "binary");
+        converter = dataTypeRegistry.lookup( "application-octet-stream");
         Assertions.assertTrue(converter.isPresent());
-        Assertions.assertEquals(BinaryDataType.class, converter.get().getClass());
+        Assertions.assertEquals(ByteArrayDataType.class, converter.get().getClass());
         converter = dataTypeRegistry.lookup( "lowercase");
         Assertions.assertTrue(converter.isPresent());
         converter = dataTypeRegistry.lookup( "uppercase");
@@ -57,4 +53,4 @@ class DefaultDataTypeRegistryTest {
         Assertions.assertEquals(UppercaseDataType.class, converter.get().getClass());
     }
 
-}
\ No newline at end of file
+}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
index 7f1f9e9f..6d2fee23 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
@@ -20,7 +20,6 @@ package org.apache.camel.kamelets.utils.format.converter.aws2.ddb;
 import java.util.Map;
 import java.util.Optional;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Exchange;
@@ -28,6 +27,7 @@ import org.apache.camel.component.aws2.ddb.Ddb2Constants;
 import org.apache.camel.component.aws2.ddb.Ddb2Operations;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.json.Json;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
@@ -42,8 +42,6 @@ public class Ddb2JsonInputTypeTest {
 
     private DefaultCamelContext camelContext;
 
-    private final ObjectMapper mapper = new ObjectMapper();
-
     private final Ddb2JsonInputType inputType = new Ddb2JsonInputType();
 
     private final String keyJson = "{" +
@@ -69,7 +67,7 @@ public class Ddb2JsonInputTypeTest {
     void shouldMapPutItemHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setBody(mapper.readTree(itemJson));
+        exchange.getMessage().setBody(Json.MAPPER.readTree(itemJson));
         exchange.setProperty("operation", Ddb2Operations.PutItem.name());
         inputType.convert(exchange);
 
@@ -85,7 +83,7 @@ public class Ddb2JsonInputTypeTest {
     void shouldMapUpdateItemHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setBody(mapper.readTree("{\"operation\": \"" + Ddb2Operations.UpdateItem.name() + "\", \"key\": "
+        exchange.getMessage().setBody(Json.MAPPER.readTree("{\"operation\": \"" + Ddb2Operations.UpdateItem.name() + "\", \"key\": "
                 + keyJson + ", \"item\": " + itemJson + "}"));
 
         inputType.convert(exchange);
@@ -106,7 +104,7 @@ public class Ddb2JsonInputTypeTest {
     void shouldMapDeleteItemHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + "}"));
+        exchange.getMessage().setBody(Json.MAPPER.readTree("{\"key\": " + keyJson + "}"));
         exchange.setProperty("operation", Ddb2Operations.DeleteItem.name());
 
         inputType.convert(exchange);
@@ -125,7 +123,7 @@ public class Ddb2JsonInputTypeTest {
     void shouldMapNestedObjects() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson + "}"));
+        exchange.getMessage().setBody(Json.MAPPER.readTree("{\"user\":" + itemJson + "}"));
         exchange.setProperty("operation", Ddb2Operations.PutItem.name());
         inputType.convert(exchange);
 
@@ -176,7 +174,7 @@ public class Ddb2JsonInputTypeTest {
     void shouldFailForUnsupportedOperation() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
 
-        exchange.getMessage().setBody(mapper.readTree("{}"));
+        exchange.getMessage().setBody(Json.MAPPER.readTree("{}"));
         exchange.setProperty("operation", Ddb2Operations.BatchGetItems.name());
 
         Assertions.assertThrows(UnsupportedOperationException.class, () -> inputType.convert(exchange));
@@ -186,7 +184,7 @@ public class Ddb2JsonInputTypeTest {
     public void shouldLookupDataType() throws Exception {
         DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
         CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-ddb", "json");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-ddb", "application-json");
         Assertions.assertTrue(converter.isPresent());
     }
 
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
index 53570dd9..f5d8a082 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
@@ -62,7 +62,7 @@ class AWS2S3CloudEventOutputTypeTest {
     public void shouldLookupDataType() throws Exception {
         DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
         CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "cloudevents");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "application-cloudevents");
         Assertions.assertTrue(converter.isPresent());
     }
 }
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
index 01e331e6..0e73cb0a 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
@@ -67,7 +67,7 @@ class HttpCloudEventOutputTypeTest {
     public void shouldLookupDataType() throws Exception {
         DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
         CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("http", "cloudevents");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("http", "application-cloudevents");
         Assertions.assertTrue(converter.isPresent());
     }
 }
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
deleted file mode 100644
index 7785017c..00000000
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kamelets.utils.format.converter.standard;
-
-import java.util.Optional;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.support.DefaultExchange;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class JsonModelDataTypeTest {
-
-    private final DefaultCamelContext camelContext = new DefaultCamelContext();
-
-    private final JsonModelDataType dataType = new JsonModelDataType();
-
-    @BeforeEach
-    public void setup() {
-        dataType.setCamelContext(camelContext);
-    }
-
-    @Test
-    void shouldMapStringToJsonModelWithModelProperty() throws Exception {
-        Exchange exchange = new DefaultExchange(camelContext);
-
-        exchange.getMessage().setBody("{ \"name\": \"Rajesh\", \"age\": 28}");
-        dataType.setModel(Person.class.getName());
-        dataType.convert(exchange);
-
-        assertEquals(Person.class, exchange.getMessage().getBody().getClass());
-        assertEquals("Rajesh", exchange.getMessage().getBody(Person.class).getName());
-    }
-
-    @Test
-    void shouldMapStringToJsonModelWithExchangeProperty() throws Exception {
-        Exchange exchange = new DefaultExchange(camelContext);
-
-        exchange.setProperty(JsonModelDataType.DATA_TYPE_MODEL_PROPERTY, Person.class.getName());
-        exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}");
-        dataType.convert(exchange);
-
-        assertEquals(Person.class, exchange.getMessage().getBody().getClass());
-        assertEquals("Sheldon", exchange.getMessage().getBody(Person.class).getName());
-    }
-
-    @Test
-    public void shouldLookupDataType() throws Exception {
-        DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
-        CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("jsonObject");
-        Assertions.assertTrue(converter.isPresent());
-    }
-
-    public static class Person {
-        @JsonProperty
-        private String name;
-
-        @JsonProperty
-        private Long age;
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public Long getAge() {
-            return age;
-        }
-
-        public void setAge(Long age) {
-            this.age = age;
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java
similarity index 94%
rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java
rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java
index d2dd616a..09ba2b93 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.standard.bytes;
 
 import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
@@ -24,6 +24,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
@@ -31,11 +32,11 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class BinaryDataTypeTest {
+public class ByteArrayDataTypeTest {
 
     private final DefaultCamelContext camelContext = new DefaultCamelContext();
 
-    private final BinaryDataType dataType = new BinaryDataType();
+    private final ByteArrayDataType dataType = new ByteArrayDataType();
 
     @Test
     void shouldRetainBytesModel() throws Exception {
@@ -89,7 +90,7 @@ public class BinaryDataTypeTest {
     public void shouldLookupDataType() throws Exception {
         DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
         CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "binary");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "application-octet-stream");
         Assertions.assertTrue(converter.isPresent());
     }
 
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java
similarity index 97%
rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java
rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java
index 8ee19cba..32a9b569 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.standard.text;
 
 import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
@@ -24,6 +24,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.text.StringDataType;
 import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Assertions;
@@ -77,7 +78,7 @@ public class StringDataTypeTest {
     public void shouldLookupDataType() throws Exception {
         DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
         CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
-        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "string");
+        Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "plain-text");
         Assertions.assertTrue(converter.isPresent());
     }
 
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
index af6c92f2..ca675a7e 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.kamelets.utils.headers;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultExchange;
@@ -28,15 +27,13 @@ class DeDuplicateHeadersTest {
 
     private DefaultCamelContext camelContext;
 
-    private final ObjectMapper mapper = new ObjectMapper();
-
     private DeDuplicateNamingHeaders processor;
 
     @BeforeEach
     void setup() {
         camelContext = new DefaultCamelContext();
     }
-    
+
     @Test
     void shouldDuplicateHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
@@ -56,7 +53,7 @@ class DeDuplicateHeadersTest {
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC"));
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
     }
-    
+
     @Test
     void shouldDuplicateSelectedHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
@@ -78,7 +75,7 @@ class DeDuplicateHeadersTest {
         Assertions.assertFalse(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC"));
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
     }
-    
+
     @Test
     void shouldDeDuplicateSelectedHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
index fdff9401..3cb3ca87 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
@@ -16,10 +16,6 @@
  */
 package org.apache.camel.kamelets.utils.headers;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.HashMap;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultExchange;
@@ -31,15 +27,13 @@ class DuplicateHeadersTest {
 
     private DefaultCamelContext camelContext;
 
-    private final ObjectMapper mapper = new ObjectMapper();
-
     private DuplicateNamingHeaders processor;
 
     @BeforeEach
     void setup() {
         camelContext = new DefaultCamelContext();
     }
-    
+
     @Test
     void shouldDuplicateHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
@@ -57,7 +51,7 @@ class DuplicateHeadersTest {
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("aws.s3.bucket.name"));
         Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
     }
-    
+
     @Test
     void shouldDuplicateSelectedHeaders() throws Exception {
         Exchange exchange = new DefaultExchange(camelContext);
diff --git a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
deleted file mode 100644
index 2f725f6a..00000000
--- a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-class=org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType
\ No newline at end of file
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
index 3bec55f7..ebff6605 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
     title: "Avro Deserialize Action"
     description: "Deserialize payload to Avro"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -54,23 +52,21 @@ spec:
   - "camel:core"
   - "camel:jackson-avro"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+        property:
+          - key: validate
+            value: '{{validate}}'
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
-      - set-property:
-          name: validate
-          constant: "{{validate}}"
       - unmarshal:
           avro:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
-      - remove-property:
-          name: schema
-      - remove-property:
-          name: validate
+            schemaResolver: "#bean:{{schemaResolver}}"
       - remove-header:
           name: "Content-Type"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml
index 06830b27..93a50270 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
     title: "Avro Serialize Action"
     description: "Serialize payload to Avro"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -54,24 +52,22 @@ spec:
   - "camel:core"
   - "camel:jackson-avro"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+        property:
+          - key: validate
+            value: '{{validate}}'
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
-      - set-property:
-          name: validate
-          constant: "{{validate}}"
       - marshal:
           avro:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
-      - remove-property:
-          name: schema
-      - remove-property:
-          name: validate
+            schemaResolver: "#bean:{{schemaResolver}}"
       - set-header:
           name: "Content-Type"
           constant: "application/avro"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml
index bf06dc4f..c0727a5f 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
     title: "Protobuf Deserialize Action"
     description: "Deserialize payload to Protobuf"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -46,18 +44,19 @@ spec:
   - "camel:core"
   - "camel:jackson-protobuf"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+        property:
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
       - unmarshal:
           protobuf:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
-      - remove-property:
-          name: schema
+            schemaResolver: "#bean:{{schemaResolver}}"
       - remove-header:
           name: "Content-Type"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml
index 56f321db..36218aec 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
     title: "Protobuf Serialize Action"
     description: "Serialize payload to Protobuf"
     type: object
-    required:
-    - schema
     properties:
       schema:
         title: Schema
@@ -46,19 +44,20 @@ spec:
   - "camel:core"
   - "camel:jackson-protobuf"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+        property:
+          - key: schema
+            value: '{{schema:}}'
     from:
       uri: kamelet:source
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
       - marshal:
           protobuf:
             library: Jackson
             unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
-      - remove-property:
-          name: schema
+            schemaResolver: "#bean:{{schemaResolver}}"
       - set-header:
           name: "Content-Type"
           constant: "application/protobuf"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml
similarity index 75%
copy from library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
copy to library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml
index 3bec55f7..3bafc090 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml
@@ -18,7 +18,7 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: Kamelet
 metadata:
-  name: avro-deserialize-action
+  name: resolve-pojo-schema-action
   annotations:
     camel.apache.org/kamelet.support.level: "Stable"
     camel.apache.org/catalog.version: "4.0.0-SNAPSHOT"
@@ -30,47 +30,47 @@ metadata:
     camel.apache.org/kamelet.type: "action"
 spec:
   definition:
-    title: "Avro Deserialize Action"
-    description: "Deserialize payload to Avro"
+    title: "Resolve Schema Action"
+    description: "Resolves schema from given mime type and payload. Sets the resolved schema, the schema type and its content class as properties for later reference."
     type: object
-    required:
-    - schema
     properties:
+      mimeType:
+        title: Mime Type
+        description: The mime type to determine the schema resolver implementation that should perform the operation.
+        type: string
+        default: "application/json"
+        example: "application/json"
       schema:
         title: Schema
-        description: The Avro schema to use during serialization (as single-line, using JSON format)
+        description: Optional schema content (as single-line, using JSON format).
+        type: string
+      contentClass:
+        title: Content Class
+        description: Type information of the content object. Fully qualified class name.
+        type: string
+        example: "org.apache.camel.content.Foo"
+      targetMimeType:
+        title: Target Mime Type
+        description: Additional mime type information used to determine the schema resolver. Usually only used in combination with mime type "application/x-java-object"
         type: string
-        example: '{"type": "record", "namespace": "com.example", "name": "FullName", "fields": [{"name": "first", "type": "string"},{"name": "last", "type": "string"}]}'
-      validate:
-        title: Validate
-        description: Indicates if the content must be validated against the schema
-        type: boolean
-        default: true
-        x-descriptors:
-        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        example: "application/json"
   dependencies:
-  - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
-  - "camel:kamelet"
-  - "camel:core"
-  - "camel:jackson-avro"
+    - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
+    - "camel:kamelet"
+    - "camel:core"
+    - "camel:jackson-avro"
+    - "camel:jackson-protobuf"
   template:
+    beans:
+      - name: schemaResolver
+        type: "#class:org.apache.camel.kamelets.utils.format.schema.DelegatingSchemaResolver"
+        properties:
+          mimeType: '{{mimeType}}'
+          schema: '{{schema:}}'
+          contentClass: '{{contentClass:}}'
+          targetMimeType: '{{targetMimeType:}}'
     from:
-      uri: kamelet:source
+      uri: "kamelet:source"
       steps:
-      - set-property:
-          name: schema
-          constant: "{{schema}}"
-      - set-property:
-          name: validate
-          constant: "{{validate}}"
-      - unmarshal:
-          avro:
-            library: Jackson
-            unmarshalType: com.fasterxml.jackson.databind.JsonNode
-            schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
-      - remove-property:
-          name: schema
-      - remove-property:
-          name: validate
-      - remove-header:
-          name: "Content-Type"
+        - process:
+            ref: "{{schemaResolver}}"
diff --git a/test/avro-data-type/README.md b/test/avro-data-type/README.md
new file mode 100644
index 00000000..dcdc29e4
--- /dev/null
+++ b/test/avro-data-type/README.md
@@ -0,0 +1,42 @@
+# Avro data type
+
+This test verifies the Avro data type serialization/deserialization
+
+## Objectives
+
+The test verifies the proper serialization and deserialization of Avro data types `avro/binary` and `avro/x-struct`.
+
+The test uses two KameletBindings that interact with each other. The first binding `json-to-avro` periodically creates a test data event as Json and applies the `avro/binary` data type using the schema in [User.avsc](User.avsc). 
+
+The binary Avro data is then sent to a Http webhook sink that references an Http endpoint that is provided by the 2nd binding `avro-to-log`. The `avro-to-log` binding provides the Http service and deserializes the binary Avro data using the same User schema. The deserialized data is printed to the log output.
+
+The test starts both KameletBindings and is able to verify the proper log output as an expected outcome.
+
+### YAKS Test
+
+The test performs the following high level steps:
+
+*Avro data type feature*
+- Create test data based on the User.avsc Avro schema
+- Load and run the `avro-to-log` KameletBinding
+- Load and run the `json-to-avro` KameletBinding
+- Verify that the bindings do interact with each other and the proper test data is logged in the binding output
+
+## Installation
+
+The test assumes that you have [JBang](https://www.jbang.dev/) installed and the YAKS CLI setup locally.
+
+You can review the installation steps for the tooling in the documentation:
+
+- [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html)
+- [Install YAKS CLI](https://github.com/citrusframework/yaks#installation)
+
+## Run the tests with JBang
+
+To run tests with URI based configuration: 
+
+```shell script
+$ yaks run --local test/avro-data-type/avro-serdes-action.feature
+```
+
+You will be provided with the test log output and the test results.
diff --git a/test/avro-data-type/User.avsc b/test/avro-data-type/User.avsc
new file mode 100644
index 00000000..a2d21f3b
--- /dev/null
+++ b/test/avro-data-type/User.avsc
@@ -0,0 +1,23 @@
+{
+  "name": "User",
+  "type": "record",
+  "namespace": "demo.kamelets",
+  "fields": [
+    {
+      "name": "id",
+      "type": "string"
+    },
+    {
+      "name": "firstname",
+      "type": "string"
+    },
+    {
+      "name": "lastname",
+      "type": "string"
+    },
+    {
+      "name": "age",
+      "type": "int"
+    },
+  ]
+}
diff --git a/test/avro-data-type/avro-data-type.feature b/test/avro-data-type/avro-data-type.feature
new file mode 100644
index 00000000..28765a9d
--- /dev/null
+++ b/test/avro-data-type/avro-data-type.feature
@@ -0,0 +1,22 @@
+Feature: Avro data type
+
+  Scenario: Create Kamelet bindings
+    Given variable uuid is "citrus:randomUUID()"
+    Given variable user is
+    """
+    { "id": "${uuid}", "firstname": "Sheldon", "lastname": "Cooper", "age": 28 }
+    """
+    # Create avro-to-log binding
+    When load KameletBinding avro-to-log-binding.yaml
+    Then Camel K integration avro-to-log-binding should be running
+
+    # Create json-to-avro binding
+    When load KameletBinding json-to-avro-binding.yaml
+    Then Camel K integration json-to-avro-binding should be running
+
+    # Verify output message sent
+    Then Camel K integration avro-to-log-binding should print Body: {  "id" : "${uuid}",  "firstname" : "Sheldon",  "lastname" : "Cooper",  "age" : 28}
+
+  Scenario: Remove resources
+    Given delete KameletBinding avro-to-log-binding
+    Given delete KameletBinding json-to-avro-binding
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-data-type/avro-to-log-binding.yaml
similarity index 69%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-data-type/avro-to-log-binding.yaml
index da02745c..e84a816d 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-data-type/avro-to-log-binding.yaml
@@ -18,31 +18,35 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: KameletBinding
 metadata:
-  name: data-type-action-binding
+  name: avro-to-log-binding
 spec:
   source:
     ref:
       kind: Kamelet
       apiVersion: camel.apache.org/v1alpha1
-      name: timer-source
+      name: webhook-source
     properties:
-      period: 5000
-      contentType: application/json
-      message: >
-        ${input}
+      subpath: user
   steps:
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
-        name: data-type-action
+        name: resolve-pojo-schema-action
       properties:
-        scheme: "http"
-        format: "cloudevents"
+        mimeType: "avro/binary"
+        schema: >
+          { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
-        name: log-action
+        name: data-type-action
       properties:
-        showHeaders: true
+        scheme: "camel"
+        format: "avro-x-struct"
   sink:
-    uri: yaks:resolveURL('test-service')/result
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: log-action
+    properties:
+      showHeaders: true
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-data-type/json-to-avro-binding.yaml
similarity index 71%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-data-type/json-to-avro-binding.yaml
index da02745c..c57c692c 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-data-type/json-to-avro-binding.yaml
@@ -18,7 +18,7 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: KameletBinding
 metadata:
-  name: data-type-action-binding
+  name: json-to-avro-binding
 spec:
   source:
     ref:
@@ -29,20 +29,26 @@ spec:
       period: 5000
       contentType: application/json
       message: >
-        ${input}
+        ${user}
   steps:
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
-        name: data-type-action
+        name: json-deserialize-action
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: resolve-pojo-schema-action
       properties:
-        scheme: "http"
-        format: "cloudevents"
+        mimeType: "avro/binary"
+        schema: >
+          { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
-        name: log-action
+        name: data-type-action
       properties:
-        showHeaders: true
+        scheme: "camel"
+        format: "avro-binary"
   sink:
-    uri: yaks:resolveURL('test-service')/result
+    uri: http://localhost:8080/user
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-data-type/yaks-config.yaml
similarity index 55%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-data-type/yaks-config.yaml
index da02745c..b5739446 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-data-type/yaks-config.yaml
@@ -15,34 +15,33 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-apiVersion: camel.apache.org/v1alpha1
-kind: KameletBinding
-metadata:
-  name: data-type-action-binding
-spec:
-  source:
-    ref:
-      kind: Kamelet
-      apiVersion: camel.apache.org/v1alpha1
-      name: timer-source
-    properties:
-      period: 5000
-      contentType: application/json
-      message: >
-        ${input}
-  steps:
-    - ref:
-        kind: Kamelet
-        apiVersion: camel.apache.org/v1alpha1
-        name: data-type-action
-      properties:
-        scheme: "http"
-        format: "cloudevents"
-    - ref:
-        kind: Kamelet
-        apiVersion: camel.apache.org/v1alpha1
-        name: log-action
-      properties:
-        showHeaders: true
-  sink:
-    uri: yaks:resolveURL('test-service')/result
+config:
+  namespace:
+    temporary: false
+  runtime:
+    env:
+      - name: YAKS_JBANG_KAMELETS_LOCAL_DIR
+        value: "../../../kamelets"
+      - name: YAKS_CAMELK_KAMELET_API_VERSION
+        value: v1alpha1
+      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT
+        value: true
+    settings:
+      loggers:
+        - name: INTEGRATION_STATUS
+          level: INFO
+        - name: INTEGRATION_LOGS
+          level: INFO
+    resources:
+      - json-to-avro-binding.yaml
+      - avro-to-log-binding.yaml
+      - User.avsc
+  dump:
+    enabled: true
+    failedOnly: true
+    includes:
+      - app=camel-k
diff --git a/test/avro-serdes-action/README.md b/test/avro-serdes-action/README.md
new file mode 100644
index 00000000..1a8b1b29
--- /dev/null
+++ b/test/avro-serdes-action/README.md
@@ -0,0 +1,42 @@
+# Avro serialization/deserialization
+
+This test verifies the Avro serialization/deserialization actions
+
+## Objectives
+
+The test verifies the proper Avro serialization and deserialization of Avro.
+
+The test uses two KameletBindings that interact with each other. The first binding `json-to-avro` periodically creates a test data event as Json and applies the `avro/binary` data type using the schema in [User.avsc](User.avsc). 
+
+The binary Avro data is then sent to a Http webhook sink that references an Http endpoint that is provided by the 2nd binding `avro-to-log`. The `avro-to-log` binding provides the Http service and deserializes the binary Avro data using the same User schema. The deserialized data is printed to the log output.
+
+The test starts both KameletBindings and is able to verify the proper log output as an expected outcome.
+
+### YAKS Test
+
+The test performs the following high level steps:
+
+*Avro data type feature*
+- Create test data based on the User.avsc Avro schema
+- Load and run the `avro-to-log` KameletBinding
+- Load and run the `json-to-avro` KameletBinding
+- Verify that the bindings do interact with each other and the proper test data is logged in the binding output
+
+## Installation
+
+The test assumes that you have [JBang](https://www.jbang.dev/) installed and the YAKS CLI setup locally.
+
+You can review the installation steps for the tooling in the documentation:
+
+- [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html)
+- [Install YAKS CLI](https://github.com/citrusframework/yaks#installation)
+
+## Run the tests with JBang
+
+To run tests with URI based configuration: 
+
+```shell script
+$ yaks run --local test/avro-serdes-action/avro-serdes-action.feature
+```
+
+You will be provided with the test log output and the test results.
diff --git a/test/avro-serdes-action/User.avsc b/test/avro-serdes-action/User.avsc
new file mode 100644
index 00000000..a2d21f3b
--- /dev/null
+++ b/test/avro-serdes-action/User.avsc
@@ -0,0 +1,23 @@
+{
+  "name": "User",
+  "type": "record",
+  "namespace": "demo.kamelets",
+  "fields": [
+    {
+      "name": "id",
+      "type": "string"
+    },
+    {
+      "name": "firstname",
+      "type": "string"
+    },
+    {
+      "name": "lastname",
+      "type": "string"
+    },
+    {
+      "name": "age",
+      "type": "int"
+    },
+  ]
+}
diff --git a/test/avro-serdes-action/avro-serdes-action.feature b/test/avro-serdes-action/avro-serdes-action.feature
new file mode 100644
index 00000000..d09e2450
--- /dev/null
+++ b/test/avro-serdes-action/avro-serdes-action.feature
@@ -0,0 +1,22 @@
+Feature: Avro serialize/deserialize action
+
+  Scenario: Create Kamelet bindings
+    Given variable uuid is "citrus:randomUUID()"
+    Given variable user is
+    """
+    { "id": "${uuid}", "firstname": "Sheldon", "lastname": "Cooper", "age": 28 }
+    """
+    # Create avro-to-log binding
+    When load KameletBinding avro-to-log-binding.yaml
+    Then Camel K integration avro-to-log-binding should be running
+
+    # Create json-to-avro binding
+    When load KameletBinding json-to-avro-binding.yaml
+    Then Camel K integration json-to-avro-binding should be running
+
+    # Verify output message sent
+    Then Camel K integration avro-to-log-binding should print Body: {  "id" : "${uuid}",  "firstname" : "Sheldon",  "lastname" : "Cooper",  "age" : 28}
+
+  Scenario: Remove resources
+    Given delete KameletBinding avro-to-log-binding
+    Given delete KameletBinding json-to-avro-binding
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-serdes-action/avro-to-log-binding.yaml
similarity index 70%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-serdes-action/avro-to-log-binding.yaml
index da02745c..72539376 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-serdes-action/avro-to-log-binding.yaml
@@ -18,31 +18,27 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: KameletBinding
 metadata:
-  name: data-type-action-binding
+  name: avro-to-log-binding
 spec:
   source:
     ref:
       kind: Kamelet
       apiVersion: camel.apache.org/v1alpha1
-      name: timer-source
+      name: webhook-source
     properties:
-      period: 5000
-      contentType: application/json
-      message: >
-        ${input}
+      subpath: user
   steps:
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
-        name: data-type-action
+        name: avro-deserialize-action
       properties:
-        scheme: "http"
-        format: "cloudevents"
-    - ref:
-        kind: Kamelet
-        apiVersion: camel.apache.org/v1alpha1
-        name: log-action
-      properties:
-        showHeaders: true
+        schema: >
+          { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
   sink:
-    uri: yaks:resolveURL('test-service')/result
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: log-action
+    properties:
+      showHeaders: true
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-serdes-action/json-to-avro-binding.yaml
similarity index 74%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-serdes-action/json-to-avro-binding.yaml
index da02745c..be438840 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-serdes-action/json-to-avro-binding.yaml
@@ -18,7 +18,7 @@
 apiVersion: camel.apache.org/v1alpha1
 kind: KameletBinding
 metadata:
-  name: data-type-action-binding
+  name: json-to-avro-binding
 spec:
   source:
     ref:
@@ -29,15 +29,19 @@ spec:
       period: 5000
       contentType: application/json
       message: >
-        ${input}
+        ${user}
   steps:
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
-        name: data-type-action
+        name: json-deserialize-action
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1alpha1
+        name: avro-serialize-action
       properties:
-        scheme: "http"
-        format: "cloudevents"
+        schema: >
+          { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
@@ -45,4 +49,4 @@ spec:
       properties:
         showHeaders: true
   sink:
-    uri: yaks:resolveURL('test-service')/result
+    uri: http://localhost:8080/user
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-serdes-action/yaks-config.yaml
similarity index 55%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-serdes-action/yaks-config.yaml
index da02745c..b5739446 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-serdes-action/yaks-config.yaml
@@ -15,34 +15,33 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-apiVersion: camel.apache.org/v1alpha1
-kind: KameletBinding
-metadata:
-  name: data-type-action-binding
-spec:
-  source:
-    ref:
-      kind: Kamelet
-      apiVersion: camel.apache.org/v1alpha1
-      name: timer-source
-    properties:
-      period: 5000
-      contentType: application/json
-      message: >
-        ${input}
-  steps:
-    - ref:
-        kind: Kamelet
-        apiVersion: camel.apache.org/v1alpha1
-        name: data-type-action
-      properties:
-        scheme: "http"
-        format: "cloudevents"
-    - ref:
-        kind: Kamelet
-        apiVersion: camel.apache.org/v1alpha1
-        name: log-action
-      properties:
-        showHeaders: true
-  sink:
-    uri: yaks:resolveURL('test-service')/result
+config:
+  namespace:
+    temporary: false
+  runtime:
+    env:
+      - name: YAKS_JBANG_KAMELETS_LOCAL_DIR
+        value: "../../../kamelets"
+      - name: YAKS_CAMELK_KAMELET_API_VERSION
+        value: v1alpha1
+      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT
+        value: true
+    settings:
+      loggers:
+        - name: INTEGRATION_STATUS
+          level: INFO
+        - name: INTEGRATION_LOGS
+          level: INFO
+    resources:
+      - json-to-avro-binding.yaml
+      - avro-to-log-binding.yaml
+      - User.avsc
+  dump:
+    enabled: true
+    failedOnly: true
+    includes:
+      - app=camel-k
diff --git a/test/aws-s3/aws-s3-knative-broker.feature b/test/aws-s3/aws-s3-knative-broker.feature
index fe935dc7..719f91d2 100644
--- a/test/aws-s3/aws-s3-knative-broker.feature
+++ b/test/aws-s3/aws-s3-knative-broker.feature
@@ -5,7 +5,7 @@ Feature: AWS S3 Kamelet - Knative broker binding
     Given Knative event consumer timeout is 20000 ms
     Given variables
       | aws.s3.scheme | camel |
-      | aws.s3.format | string |
+      | aws.s3.format | plain-text |
       | aws.s3.bucketNameOrArn | mybucket |
       | aws.s3.message | Hello from S3 Kamelet |
       | aws.s3.key | hello.txt |
diff --git a/test/aws-s3/aws-s3-knative-cloudevents.feature b/test/aws-s3/aws-s3-knative-cloudevents.feature
index 647c8717..0af41639 100644
--- a/test/aws-s3/aws-s3-knative-cloudevents.feature
+++ b/test/aws-s3/aws-s3-knative-cloudevents.feature
@@ -5,7 +5,7 @@ Feature: AWS S3 Kamelet - cloud events data type
     Given Knative event consumer timeout is 20000 ms
     Given variables
       | aws.s3.scheme | aws2-s3 |
-      | aws.s3.format | cloudevents |
+      | aws.s3.format | application-cloudevents |
       | aws.s3.bucketNameOrArn | mybucket |
       | aws.s3.message | Hello from S3 Kamelet |
       | aws.s3.key | hello.txt |
diff --git a/test/aws-s3/aws-s3-to-http.yaml b/test/aws-s3/aws-s3-to-http.yaml
index d0e2bd41..1a7b931d 100644
--- a/test/aws-s3/aws-s3-to-http.yaml
+++ b/test/aws-s3/aws-s3-to-http.yaml
@@ -39,14 +39,14 @@ spec:
         name: data-type-action
       properties:
         scheme: "aws2-s3"
-        format: "cloudevents"
+        format: "application-cloudevents"
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
         name: data-type-action
       properties:
         scheme: "http"
-        format: "cloudevents"
+        format: "application-cloudevents"
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/data-type-action/data-type-action-binding.yaml
index da02745c..09719574 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/data-type-action/data-type-action-binding.yaml
@@ -37,7 +37,7 @@ spec:
         name: data-type-action
       properties:
         scheme: "http"
-        format: "cloudevents"
+        format: "application-cloudevents"
     - ref:
         kind: Kamelet
         apiVersion: camel.apache.org/v1alpha1