You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cd...@apache.org on 2023/05/26 08:32:09 UTC
[camel-kamelets] branch main updated: chore: Enhance Kamelet data type implementations
This is an automated email from the ASF dual-hosted git repository.
cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/main by this push:
new abf5c423 chore: Enhance Kamelet data type implementations
abf5c423 is described below
commit abf5c423eab77985827f8787fbb3f99ca5e101c3
Author: Christoph Deppisch <cd...@redhat.com>
AuthorDate: Wed May 24 20:14:04 2023 +0200
chore: Enhance Kamelet data type implementations
- Refactor existing data type converter names to align with respective mime type (e.g. application/json -> application-json, plain/text -> plain-text)
- Add data type converter implementations for avro-binary, avro-x-struct, application-x-struct, application-json, application-x-java-object
- Enhance Avro/Protobuf schema resolver to resolve schemas from given exchange property, content class, schema file classpath reference or explicit schema content
- Add Json schema resolver
- Add resolve-pojo-schema-action.kamelet to resolve either Json, Avro or Protobuf schemas delegating to respective schema resolver implementations
- Use new schema resolver in avro/protobuf serialize/deserialize action Kamelets
---
.github/workflows/yaks-tests.yaml | 3 +
kamelets/avro-deserialize-action.kamelet.yaml | 22 ++-
kamelets/avro-serialize-action.kamelet.yaml | 22 ++-
kamelets/protobuf-deserialize-action.kamelet.yaml | 15 +-
kamelets/protobuf-serialize-action.kamelet.yaml | 15 +-
.../resolve-pojo-schema-action.kamelet.yaml | 72 +++++-----
.../utils/format/DefaultDataTypeRegistry.java | 8 +-
.../camel/kamelets/utils/format/MimeType.java | 53 +++++++
.../BinaryDataType.java => SchemaType.java} | 37 +++--
.../converter/avro/Avro.java} | 22 +--
.../format/converter/avro/AvroBinaryDataType.java | 81 +++++++++++
.../format/converter/avro/AvroSchemaResolver.java | 160 +++++++++++++++++++++
.../format/converter/avro/AvroStructDataType.java | 70 +++++++++
.../converter/aws2/ddb/Ddb2JsonInputType.java | 6 +-
.../aws2/s3/AWS2S3CloudEventOutputType.java | 2 +-
.../ByteArrayDataType.java} | 15 +-
.../converter/http/HttpCloudEventOutputType.java | 2 +-
.../StringDataType.java => json/Json.java} | 21 +--
.../utils/format/converter/json/JsonDataType.java | 49 +++++++
.../format/converter/json/JsonFormatSchema.java | 22 +++
.../format/converter/json/JsonSchemaResolver.java | 155 ++++++++++++++++++++
.../format/converter/json/JsonStructDataType.java | 59 ++++++++
.../format/converter/pojo/JavaObjectDataType.java | 106 ++++++++++++++
.../StringDataType.java => protobuf/Protobuf.java} | 21 +--
.../converter/protobuf/ProtobufSchemaResolver.java | 159 ++++++++++++++++++++
.../converter/standard/JsonModelDataType.java | 97 -------------
.../StringDataType.java} | 21 ++-
.../utils/format/converter/utils/PojoHelper.java | 89 ++++++++++++
.../utils/format/converter/utils/SchemaHelper.java | 34 +++++
.../format/schema/DelegatingSchemaResolver.java | 119 +++++++++++++++
.../InflightProtobufSchemaResolver.java | 41 ------
.../services/org/apache/camel/DataTypeConverter | 6 +-
.../{aws2-ddb-json => aws2-ddb-application-json} | 0
...cloudevents => aws2-s3-application-cloudevents} | 0
.../{camel-binary => camel-application-json} | 2 +-
...l-jsonObject => camel-application-octet-stream} | 2 +-
...amel-binary => camel-application-x-java-object} | 2 +-
.../{camel-binary => camel-application-x-struct} | 2 +-
.../converter/{camel-binary => camel-avro-binary} | 2 +-
.../{camel-binary => camel-avro-x-struct} | 2 +-
.../converter/{camel-string => camel-plain-text} | 2 +-
...tp-cloudevents => http-application-cloudevents} | 0
.../DefaultDataTypeConverterResolverTest.java | 8 +-
.../utils/format/DefaultDataTypeRegistryTest.java | 16 +--
.../converter/aws2/ddb/Ddb2JsonInputTypeTest.java | 16 +--
.../aws2/s3/AWS2S3CloudEventOutputTypeTest.java | 2 +-
.../http/HttpCloudEventOutputTypeTest.java | 2 +-
.../converter/standard/JsonModelDataTypeTest.java | 102 -------------
.../ByteArrayDataTypeTest.java} | 9 +-
.../standard/{ => text}/StringDataTypeTest.java | 5 +-
.../utils/headers/DeDuplicateHeadersTest.java | 9 +-
.../utils/headers/DuplicateHeadersTest.java | 10 +-
.../camel/datatype/converter/camel-jsonObject | 18 ---
.../kamelets/avro-deserialize-action.kamelet.yaml | 22 ++-
.../kamelets/avro-serialize-action.kamelet.yaml | 22 ++-
.../protobuf-deserialize-action.kamelet.yaml | 15 +-
.../protobuf-serialize-action.kamelet.yaml | 15 +-
...aml => resolve-pojo-schema-action.kamelet.yaml} | 72 +++++-----
test/avro-data-type/README.md | 42 ++++++
test/avro-data-type/User.avsc | 23 +++
test/avro-data-type/avro-data-type.feature | 22 +++
.../avro-to-log-binding.yaml} | 28 ++--
.../json-to-avro-binding.yaml} | 22 +--
.../yaks-config.yaml} | 61 ++++----
test/avro-serdes-action/README.md | 42 ++++++
test/avro-serdes-action/User.avsc | 23 +++
test/avro-serdes-action/avro-serdes-action.feature | 22 +++
.../avro-to-log-binding.yaml} | 28 ++--
.../json-to-avro-binding.yaml} | 16 ++-
.../yaks-config.yaml} | 61 ++++----
test/aws-s3/aws-s3-knative-broker.feature | 2 +-
test/aws-s3/aws-s3-knative-cloudevents.feature | 2 +-
test/aws-s3/aws-s3-to-http.yaml | 4 +-
.../data-type-action/data-type-action-binding.yaml | 2 +-
74 files changed, 1705 insertions(+), 656 deletions(-)
diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml
index 2d52f915..1994831a 100644
--- a/.github/workflows/yaks-tests.yaml
+++ b/.github/workflows/yaks-tests.yaml
@@ -91,6 +91,9 @@ jobs:
yaks run test/aws-ddb-sink $YAKS_RUN_OPTIONS
yaks run test/aws-s3 $YAKS_RUN_OPTIONS
+ yaks run test/avro-data-type $YAKS_RUN_OPTIONS
+ yaks run test/avro-serdes-action $YAKS_RUN_OPTIONS
+
yaks run test/extract-field-action $YAKS_RUN_OPTIONS
yaks run test/insert-field-action $YAKS_RUN_OPTIONS
diff --git a/kamelets/avro-deserialize-action.kamelet.yaml b/kamelets/avro-deserialize-action.kamelet.yaml
index 3bec55f7..ebff6605 100644
--- a/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/kamelets/avro-deserialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
title: "Avro Deserialize Action"
description: "Deserialize payload to Avro"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -54,23 +52,21 @@ spec:
- "camel:core"
- "camel:jackson-avro"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+ property:
+ - key: validate
+ value: '{{validate}}'
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- - set-property:
- name: validate
- constant: "{{validate}}"
- unmarshal:
avro:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- - remove-property:
- name: schema
- - remove-property:
- name: validate
+ schemaResolver: "#bean:{{schemaResolver}}"
- remove-header:
name: "Content-Type"
diff --git a/kamelets/avro-serialize-action.kamelet.yaml b/kamelets/avro-serialize-action.kamelet.yaml
index 06830b27..93a50270 100644
--- a/kamelets/avro-serialize-action.kamelet.yaml
+++ b/kamelets/avro-serialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
title: "Avro Serialize Action"
description: "Serialize payload to Avro"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -54,24 +52,22 @@ spec:
- "camel:core"
- "camel:jackson-avro"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+ property:
+ - key: validate
+ value: '{{validate}}'
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- - set-property:
- name: validate
- constant: "{{validate}}"
- marshal:
avro:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- - remove-property:
- name: schema
- - remove-property:
- name: validate
+ schemaResolver: "#bean:{{schemaResolver}}"
- set-header:
name: "Content-Type"
constant: "application/avro"
diff --git a/kamelets/protobuf-deserialize-action.kamelet.yaml b/kamelets/protobuf-deserialize-action.kamelet.yaml
index bf06dc4f..c0727a5f 100644
--- a/kamelets/protobuf-deserialize-action.kamelet.yaml
+++ b/kamelets/protobuf-deserialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
title: "Protobuf Deserialize Action"
description: "Deserialize payload to Protobuf"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -46,18 +44,19 @@ spec:
- "camel:core"
- "camel:jackson-protobuf"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+ property:
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- unmarshal:
protobuf:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
- - remove-property:
- name: schema
+ schemaResolver: "#bean:{{schemaResolver}}"
- remove-header:
name: "Content-Type"
diff --git a/kamelets/protobuf-serialize-action.kamelet.yaml b/kamelets/protobuf-serialize-action.kamelet.yaml
index 56f321db..36218aec 100644
--- a/kamelets/protobuf-serialize-action.kamelet.yaml
+++ b/kamelets/protobuf-serialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
title: "Protobuf Serialize Action"
description: "Serialize payload to Protobuf"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -46,19 +44,20 @@ spec:
- "camel:core"
- "camel:jackson-protobuf"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+ property:
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- marshal:
protobuf:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
- - remove-property:
- name: schema
+ schemaResolver: "#bean:{{schemaResolver}}"
- set-header:
name: "Content-Type"
constant: "application/protobuf"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml b/kamelets/resolve-pojo-schema-action.kamelet.yaml
similarity index 75%
copy from library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
copy to kamelets/resolve-pojo-schema-action.kamelet.yaml
index 3bec55f7..3bafc090 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/kamelets/resolve-pojo-schema-action.kamelet.yaml
@@ -18,7 +18,7 @@
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
- name: avro-deserialize-action
+ name: resolve-pojo-schema-action
annotations:
camel.apache.org/kamelet.support.level: "Stable"
camel.apache.org/catalog.version: "4.0.0-SNAPSHOT"
@@ -30,47 +30,47 @@ metadata:
camel.apache.org/kamelet.type: "action"
spec:
definition:
- title: "Avro Deserialize Action"
- description: "Deserialize payload to Avro"
+ title: "Resolve Schema Action"
+ description: "Resolves schema from given mime type and payload. Sets the resolved schema, the schema type and its content class as properties for later reference."
type: object
- required:
- - schema
properties:
+ mimeType:
+ title: Mime Type
+ description: The mime type to determine the schema resolver implementation that should perform the operation.
+ type: string
+ default: "application/json"
+ example: "application/json"
schema:
title: Schema
- description: The Avro schema to use during serialization (as single-line, using JSON format)
+ description: Optional schema content (as single-line, using JSON format).
+ type: string
+ contentClass:
+ title: Content Class
+ description: Type information of the content object. Fully qualified class name.
+ type: string
+ example: "org.apache.camel.content.Foo"
+ targetMimeType:
+ title: Target Mime Type
+ description: Additional mime type information used to determine the schema resolver. Usually only used in combination with mime type "application/x-java-object"
type: string
- example: '{"type": "record", "namespace": "com.example", "name": "FullName", "fields": [{"name": "first", "type": "string"},{"name": "last", "type": "string"}]}'
- validate:
- title: Validate
- description: Indicates if the content must be validated against the schema
- type: boolean
- default: true
- x-descriptors:
- - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+ example: "application/json"
dependencies:
- - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
- - "camel:kamelet"
- - "camel:core"
- - "camel:jackson-avro"
+ - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
+ - "camel:kamelet"
+ - "camel:core"
+ - "camel:jackson-avro"
+ - "camel:jackson-protobuf"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.schema.DelegatingSchemaResolver"
+ properties:
+ mimeType: '{{mimeType}}'
+ schema: '{{schema:}}'
+ contentClass: '{{contentClass:}}'
+ targetMimeType: '{{targetMimeType:}}'
from:
- uri: kamelet:source
+ uri: "kamelet:source"
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- - set-property:
- name: validate
- constant: "{{validate}}"
- - unmarshal:
- avro:
- library: Jackson
- unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- - remove-property:
- name: schema
- - remove-property:
- name: validate
- - remove-header:
- name: "Content-Type"
+ - process:
+ ref: "{{schemaResolver}}"
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
index 24c77b70..cb01358a 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java
@@ -26,9 +26,8 @@ import java.util.Optional;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.StringDataType;
+import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType;
+import org.apache.camel.kamelets.utils.format.converter.text.StringDataType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverterResolver;
import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
@@ -105,9 +104,8 @@ public class DefaultDataTypeRegistry extends ServiceSupport implements DataTypeR
if (classpathScan) {
dataTypeLoaders.add(new AnnotationDataTypeLoader());
} else if (useDefaultConverters) {
- addDataTypeConverter(new BinaryDataType());
+ addDataTypeConverter(new ByteArrayDataType());
addDataTypeConverter(new StringDataType());
- addDataTypeConverter(new JsonModelDataType());
}
for (DataTypeLoader loader : dataTypeLoaders) {
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java
new file mode 100644
index 00000000..7c205439
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format;
+
+import java.util.Objects;
+
+public enum MimeType {
+ JSON("application/json"),
+ PROTOBUF("application/protobuf"),
+ AVRO("application/avro"),
+ AVRO_BINARY("avro/binary"),
+ AVRO_STRUCT("avro/x-struct"),
+ BINARY("application/octet-stream"),
+ TEXT("text/plain"),
+ JAVA_OBJECT("application/x-java-object"),
+ STRUCT("application/x-struct");
+
+ private static final MimeType[] VALUES = values();
+ private final String type;
+
+ MimeType(String type) {
+ this.type = type;
+ }
+
+ public String type() {
+ return type;
+ }
+
+ public static MimeType of(String type) {
+ for (MimeType mt : VALUES) {
+ if (Objects.equals(type, mt.type)) {
+ return mt;
+ }
+ }
+
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java
similarity index 51%
copy from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
copy to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java
index 532e998b..10fa2c58 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java
@@ -15,24 +15,35 @@
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format;
-import org.apache.camel.Exchange;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import java.util.Arrays;
+import java.util.Objects;
/**
- * Binary data type.
+ * Supported schema type for Java object serialization/deserialization
*/
-@DataType(name = "binary", mediaType = "application/octet-stream")
-public class BinaryDataType implements DataTypeConverter {
+public enum SchemaType {
+ PROTOBUF("protobuf"),
+ AVRO("avsc"),
+ JSON("json");
- private static final DataTypeConverter DELEGATE =
- new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary", "application/octet-stream", byte[].class);
+ private static final SchemaType[] VALUES = values();
- @Override
- public void convert(Exchange exchange) {
- DELEGATE.convert(exchange);
+ private final String schemaType;
+
+ SchemaType(String type) {
+ this.schemaType = type;
+ }
+
+ public String type() {
+ return schemaType;
+ }
+
+ public static SchemaType of(String type) {
+ return Arrays.stream(VALUES)
+ .filter(s -> Objects.equals(s.schemaType, type))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(String.format("Unsupported schema type '%s'", type)));
}
}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java
similarity index 52%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java
index a75df4d1..cfadf972 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java
@@ -14,24 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.serialization;
-import com.fasterxml.jackson.core.FormatSchema;
-import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+package org.apache.camel.kamelets.utils.format.converter.avro;
-import org.apache.avro.Schema;
-import org.apache.camel.Exchange;
-import org.apache.camel.component.jackson.SchemaResolver;
+import com.fasterxml.jackson.dataformat.avro.AvroMapper;
-public class InflightAvroSchemaResolver implements SchemaResolver {
+public final class Avro {
- @Override
- public FormatSchema resolve(Exchange exchange) {
- String schemaJson = (String) exchange.getProperty("schema");
- Boolean validate = Boolean.valueOf((String) exchange.getProperty("validate"));
- Schema raw = new Schema.Parser().setValidate(validate).parse(schemaJson);
- AvroSchema schema = new AvroSchema(raw);
- return schema;
- }
+ public static final AvroMapper MAPPER = new AvroMapper();
+ private Avro() {
+ // prevent instantiation of utility class
+ }
}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java
new file mode 100644
index 00000000..465da568
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Jackson Avro data format to marshal given JsonNode in Exchange body to a binary (byte array) representation.
+ * Uses given Avro schema from the Exchange properties when marshalling the payload (usually already resolved via schema
+ * resolver Kamelet action).
+ */
+@DataType(name = "avro-binary", mediaType = "avro/binary")
+public class AvroBinaryDataType implements DataTypeConverter {
+
+ @Override
+ public void convert(Exchange exchange) {
+ AvroSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+ if (schema == null) {
+ throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange);
+ }
+
+ try {
+ byte[] marshalled = Avro.MAPPER.writer().forType(JsonNode.class).with(schema)
+ .writeValueAsBytes(getBodyAsJsonNode(exchange, schema));
+ exchange.getMessage().setBody(marshalled);
+
+ exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.AVRO_BINARY.type());
+ exchange.getMessage().setHeader(SchemaHelper.CONTENT_SCHEMA,
+ exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, "", String.class));
+ } catch (InvalidPayloadException | IOException e) {
+ throw new CamelExecutionException("Failed to apply Avro binary data type on exchange", exchange, e);
+ }
+ }
+
+ private JsonNode getBodyAsJsonNode(Exchange exchange, AvroSchema schema) throws InvalidPayloadException, IOException {
+ if (exchange.getMessage().getBody() instanceof JsonNode) {
+ return (JsonNode) exchange.getMessage().getBody();
+ }
+
+ return Avro.MAPPER.reader().forType(JsonNode.class).with(schema)
+ .readValue(getBodyAsStream(exchange));
+ }
+
+ private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+ InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+ if (bodyStream == null) {
+ bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+ }
+
+ return bodyStream;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java
new file mode 100644
index 00000000..e6a71ad0
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.avro;
+
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import org.apache.avro.Schema;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jackson.SchemaResolver;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.spi.Resource;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.util.ObjectHelper;
+
+public class AvroSchemaResolver implements SchemaResolver, Processor {
+ private final ConcurrentMap<String, AvroSchema> schemes;
+
+ private AvroSchema schema;
+ private String contentClass;
+
+ private boolean validate = true;
+
+ public AvroSchemaResolver() {
+ this.schemes = new ConcurrentHashMap<>();
+ }
+
+ public String getSchema() {
+ if (this.schema != null) {
+ return this.schema.getAvroSchema().toString();
+ }
+
+ return null;
+ }
+
+ public void setSchema(String schema) {
+ if (ObjectHelper.isNotEmpty(schema)) {
+ this.schema = new AvroSchema(new Schema.Parser().setValidate(validate).parse(schema));
+ } else {
+ this.schema = null;
+ }
+ }
+
+ public boolean isValidate() {
+ return validate;
+ }
+
+ public void setValidate(boolean validate) {
+ this.validate = validate;
+ }
+
+ public String getContentClass() {
+ return contentClass;
+ }
+
+ public void setContentClass(String contentClass) {
+ if (ObjectHelper.isNotEmpty(contentClass)) {
+ this.contentClass = contentClass;
+ } else {
+ this.contentClass = null;
+ }
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Object payload = exchange.getMessage().getBody();
+ if (payload == null) {
+ return;
+ }
+
+ AvroSchema answer = computeIfAbsent(exchange);
+
+ if (answer != null) {
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer);
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.AVRO.type());
+ exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass));
+ }
+ }
+
+ @Override
+ public FormatSchema resolve(Exchange exchange) {
+ AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+ if (answer == null) {
+ answer = computeIfAbsent(exchange);
+ }
+
+ return answer;
+ }
+
+ private AvroSchema computeIfAbsent(Exchange exchange) {
+ if (this.schema != null) {
+ return this.schema;
+ }
+
+ AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+ if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) {
+ String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class);
+ Schema raw = new Schema.Parser().setValidate(validate).parse(schemaJson);
+ answer = new AvroSchema(raw);
+ }
+
+ if (answer == null) {
+ String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass);
+ if (contentClass != null) {
+ answer = this.schemes.computeIfAbsent(contentClass, t -> {
+ Resource res = PluginHelper.getResourceLoader(exchange.getContext())
+ .resolveResource("classpath:schemas/" + SchemaType.AVRO.type() + "/" + t + "." + SchemaType.AVRO.type());
+
+ try {
+ if (res.exists()) {
+ try (InputStream is = res.getInputStream()) {
+ if (is != null) {
+ return Avro.MAPPER.schemaFrom(is);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to load Avro schema for type: " + t + ", resource: " + res.getLocation(), e);
+ }
+
+ try {
+ return Avro.MAPPER.schemaFor(Class.forName(contentClass));
+ } catch (JsonMappingException | ClassNotFoundException e) {
+ throw new RuntimeException(
+ "Unable to compute Avro schema for type: " + t, e);
+ }
+ });
+ }
+ }
+
+ if (answer != null) {
+ this.schema = answer;
+ }
+
+ return answer;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java
new file mode 100644
index 00000000..c1997b04
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Avro Jackson data format to unmarshal Exchange body to generic JsonNode.
+ * Uses given Avro schema from the Exchange properties when unmarshalling the payload (usually already resolved via schema
+ * resolver Kamelet action).
+ */
+@DataType(name = "avro-x-struct", mediaType = "application/x-struct")
+public class AvroStructDataType implements DataTypeConverter {
+
+ @Override
+ public void convert(Exchange exchange) {
+ AvroSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+ if (schema == null) {
+ throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange);
+ }
+
+ try {
+ Object unmarshalled = Avro.MAPPER.reader().forType(JsonNode.class).with(schema)
+ .readValue(getBodyAsStream(exchange));
+ exchange.getMessage().setBody(unmarshalled);
+
+ exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type());
+ } catch (InvalidPayloadException | IOException e) {
+ throw new CamelExecutionException("Failed to apply Avro x-struct data type on exchange", exchange, e);
+ }
+ }
+
+ private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+ InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+ if (bodyStream == null) {
+ bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+ }
+
+ return bodyStream;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
index 471e569f..0f6e979e 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
@@ -27,12 +27,12 @@ import java.util.stream.Stream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.component.aws2.ddb.Ddb2Constants;
import org.apache.camel.component.aws2.ddb.Ddb2Operations;
import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.kamelets.utils.format.converter.json.Json;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
@@ -77,10 +77,10 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
* In case key and item attribute value maps are identical you can omit the special top level properties completely. The
* converter will map the whole Json body as is then and use it as source for the attribute value map.
*/
-@DataType(scheme = "aws2-ddb", name = "json", mediaType = "application/json")
+@DataType(scheme = "aws2-ddb", name = "application-json", mediaType = "application/json")
public class Ddb2JsonInputType implements DataTypeConverter {
- private final JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), JsonNode.class);
+ private final JacksonDataFormat dataFormat = new JacksonDataFormat(Json.MAPPER, JsonNode.class);
@Override
public void convert(Exchange exchange) {
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
index d8847886..241840e8 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java
@@ -29,7 +29,7 @@ import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
* Output data type represents AWS S3 get object response as CloudEvent V1. The data type sets Camel specific
* CloudEvent headers on the exchange.
*/
-@DataType(scheme = "aws2-s3", name = "cloudevents", mediaType = "application/octet-stream")
+@DataType(scheme = "aws2-s3", name = "application-cloudevents", mediaType = "application/octet-stream")
public class AWS2S3CloudEventOutputType implements DataTypeConverter {
@Override
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java
similarity index 64%
copy from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
copy to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java
index d60b2aaa..cdacabbd 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java
@@ -15,24 +15,27 @@
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.bytes;
import org.apache.camel.Exchange;
import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
+import org.apache.camel.kamelets.utils.format.MimeType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
/**
- * String data type.
+ * Generic binary data type uses Camel message body converter mechanism to convert content to byte array representation.
*/
-@DataType(name = "string", mediaType = "text/plain")
-public class StringDataType implements DataTypeConverter {
+@DataType(name = "application-octet-stream", mediaType = "application/octet-stream")
+public class ByteArrayDataType implements DataTypeConverter {
- private static final DataTypeConverter DELEGATE =
- new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", "text/plain", String.class);
+ private static final DataTypeConverter DELEGATE = new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary",
+ MimeType.BINARY.type(), byte[].class);
@Override
public void convert(Exchange exchange) {
DELEGATE.convert(exchange);
+
+ exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.BINARY.type());
}
}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
index bbad4637..05e937ff 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java
@@ -30,7 +30,7 @@ import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
*
* By default, sets the Http content type header to application/json when not set explicitly.
*/
-@DataType(scheme = "http", name = "cloudevents", mediaType = "application/json")
+@DataType(scheme = "http", name = "application-cloudevents", mediaType = "application/json")
public class HttpCloudEventOutputType implements DataTypeConverter {
@Override
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java
similarity index 53%
copy from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
copy to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java
index d60b2aaa..cbcbe57c 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java
@@ -15,24 +15,15 @@
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.json;
-import org.apache.camel.Exchange;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import com.fasterxml.jackson.databind.ObjectMapper;
-/**
- * String data type.
- */
-@DataType(name = "string", mediaType = "text/plain")
-public class StringDataType implements DataTypeConverter {
+public final class Json {
- private static final DataTypeConverter DELEGATE =
- new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", "text/plain", String.class);
+ public static final ObjectMapper MAPPER = new ObjectMapper();
- @Override
- public void convert(Exchange exchange) {
- DELEGATE.convert(exchange);
+ private Json() {
+ // prevent instantiation of utility class
}
}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java
new file mode 100644
index 00000000..42a20a2a
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Jackson data format to marshal given Exchange payload to a Json (binary byte array representation).
+ * Requires Exchange payload as JsonNode representation.
+ */
+@DataType(name = "application-json", mediaType = "application/json")
+public class JsonDataType implements DataTypeConverter {
+
+ @Override
+ public void convert(Exchange exchange) {
+ try {
+ byte[] marshalled = Json.MAPPER.writer().forType(JsonNode.class).writeValueAsBytes(exchange.getMessage().getBody());
+ exchange.getMessage().setBody(marshalled);
+
+ exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.JSON.type());
+ exchange.getMessage().setHeader(SchemaHelper.CONTENT_SCHEMA,
+ exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, "", String.class));
+ } catch (JsonProcessingException e) {
+ throw new CamelExecutionException("Failed to apply Json output data type on exchange", exchange, e);
+ }
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java
new file mode 100644
index 00000000..cea76aa6
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java
@@ -0,0 +1,22 @@
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+
+public class JsonFormatSchema implements FormatSchema {
+ private final JsonNode schema;
+
+ public JsonFormatSchema(JsonNode schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public String getSchemaType() {
+ return SchemaType.JSON.type();
+ }
+
+ public JsonNode getSchema() {
+ return schema;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java
new file mode 100644
index 00000000..74a42a33
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jackson.SchemaResolver;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.spi.Resource;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.util.ObjectHelper;
+
+public class JsonSchemaResolver implements SchemaResolver, Processor {
+ private final ConcurrentMap<String, JsonNode> schemes;
+
+ private JsonNode schema;
+ private String contentClass;
+
+ public JsonSchemaResolver() {
+ this.schemes = new ConcurrentHashMap<>();
+ }
+
+ public String getSchema() {
+ if (this.schema != null) {
+ try {
+ return Json.MAPPER.writeValueAsString(this.schema);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return null;
+ }
+
+ public void setSchema(String schema) {
+ if (ObjectHelper.isNotEmpty(schema)) {
+ try {
+ this.schema = Json.MAPPER.readTree(schema);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ this.schema = null;
+ }
+ }
+
+ public String getContentClass() {
+ return contentClass;
+ }
+
+ public void setContentClass(String contentClass) {
+ if (ObjectHelper.isNotEmpty(contentClass)) {
+ this.contentClass = contentClass;
+ } else {
+ this.contentClass = null;
+ }
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Object payload = exchange.getMessage().getBody();
+ if (payload == null) {
+ return;
+ }
+
+ JsonNode answer = computeIfAbsent(exchange);
+
+ if (answer != null) {
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer);
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.JSON.type());
+ exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass));
+ }
+ }
+
+ private JsonNode computeIfAbsent(Exchange exchange) {
+ if (this.schema != null) {
+ return this.schema;
+ }
+
+ JsonNode answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, JsonNode.class);
+
+ if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) {
+ String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class);
+ try {
+ answer = Json.MAPPER.readTree(schemaJson);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Unable to load Json schema", e);
+ }
+ }
+
+ if (answer == null) {
+ String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass);
+ if (contentClass != null) {
+ answer = this.schemes.computeIfAbsent(contentClass, t -> {
+ Resource res = PluginHelper.getResourceLoader(exchange.getContext())
+ .resolveResource("classpath:schemas/" + SchemaType.JSON.type() + "/" + t + "." + SchemaType.JSON.type());
+
+ try {
+ if (res.exists()) {
+ try (InputStream is = res.getInputStream()) {
+ if (is != null) {
+ return Json.MAPPER.readTree(is);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to load Json schema for type: " + t + ", resource: " + res.getLocation(), e);
+ }
+
+ return null;
+ });
+ }
+ }
+
+ if (answer != null) {
+ this.schema = answer;
+ }
+
+ return answer;
+ }
+
+ @Override
+ public FormatSchema resolve(Exchange exchange) {
+ JsonNode answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, JsonNode.class);
+ if (answer == null) {
+ answer = computeIfAbsent(exchange);
+ }
+
+ return new JsonFormatSchema(answer);
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java
new file mode 100644
index 00000000..56543746
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.json;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+
+/**
+ * Data type uses Jackson data format to unmarshal Exchange body to generic JsonNode representation.
+ */
+@DataType(name = "application-x-struct", mediaType = "application/x-struct")
+public class JsonStructDataType implements DataTypeConverter {
+
+ @Override
+ public void convert(Exchange exchange) {
+ try {
+ Object unmarshalled = Json.MAPPER.reader().forType(JsonNode.class).readValue(getBodyAsStream(exchange));
+ exchange.getMessage().setBody(unmarshalled);
+
+ exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type());
+ } catch (InvalidPayloadException | IOException e) {
+ throw new CamelExecutionException("Failed to apply Json input data type on exchange", exchange, e);
+ }
+ }
+
+ private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+ InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+ if (bodyStream == null) {
+ bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+ }
+
+ return bodyStream;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java
new file mode 100644
index 00000000..1fcbc869
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.pojo;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.avro.Avro;
+import org.apache.camel.kamelets.utils.format.converter.json.Json;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Data type able to unmarshal Exchange body to Java object. Supports both Avro and Json schema types and uses respective
+ * Jackson implementation for the unmarshal operation.
+ * Requires proper setting of content schema, class and schema type in Exchange properties
+ * (usually resolved via Avro or Json schema resolver Kamelet action).
+ */
+@DataType(name = "application-x-java-object", mediaType = "application/x-java-object")
+public class JavaObjectDataType implements DataTypeConverter, CamelContextAware {
+
+ private CamelContext camelContext;
+
+ @Override
+ public void convert(Exchange exchange) {
+ ObjectHelper.notNull(camelContext, "camelContext");
+
+ FormatSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, FormatSchema.class);
+ if (schema == null) {
+ throw new CamelExecutionException("Missing proper schema for Java object data type processing", exchange);
+ }
+
+ String contentClass = SchemaHelper.resolveContentClass(exchange, null);
+ if (contentClass == null) {
+ throw new CamelExecutionException("Missing content class information for Java object data type processing",
+ exchange);
+ }
+
+ SchemaType schemaType = SchemaType.of(exchange.getProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.JSON.type(), String.class));
+
+ try {
+ Class<?> contentType = camelContext.getClassResolver().resolveMandatoryClass(contentClass);
+ Object unmarshalled;
+
+ if (schemaType == SchemaType.AVRO) {
+ unmarshalled = Avro.MAPPER.reader().forType(contentType).with(schema).readValue(getBodyAsStream(exchange));
+ } else if (schemaType == SchemaType.JSON) {
+ unmarshalled = Json.MAPPER.reader().forType(contentType).with(schema).readValue(getBodyAsStream(exchange));
+ } else {
+ throw new CamelExecutionException(String.format("Unsupported schema type '%s'", schemaType), exchange);
+ }
+
+ exchange.getMessage().setBody(unmarshalled);
+
+ exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type());
+ } catch (InvalidPayloadException | IOException | ClassNotFoundException e) {
+ throw new CamelExecutionException("Failed to apply Java object data type on exchange", exchange, e);
+ }
+ }
+
+ private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
+ InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
+
+ if (bodyStream == null) {
+ bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
+ }
+
+ return bodyStream;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java
similarity index 53%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java
index d60b2aaa..1bb5d1c1 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java
@@ -15,24 +15,15 @@
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.protobuf;
-import org.apache.camel.Exchange;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
+import com.fasterxml.jackson.dataformat.protobuf.ProtobufMapper;
-/**
- * String data type.
- */
-@DataType(name = "string", mediaType = "text/plain")
-public class StringDataType implements DataTypeConverter {
+public final class Protobuf {
- private static final DataTypeConverter DELEGATE =
- new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", "text/plain", String.class);
+ public static final ProtobufMapper MAPPER = new ProtobufMapper();
- @Override
- public void convert(Exchange exchange) {
- DELEGATE.convert(exchange);
+ private Protobuf() {
+ // prevent instantiation of utility class
}
}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java
new file mode 100644
index 00000000..9fba1091
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.protobuf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.fasterxml.jackson.core.FormatSchema;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchema;
+import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchemaLoader;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.jackson.SchemaResolver;
+import org.apache.camel.kamelets.utils.format.SchemaType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.spi.Resource;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.util.ObjectHelper;
+
+public class ProtobufSchemaResolver implements SchemaResolver, Processor {
+ private final ConcurrentMap<String, ProtobufSchema> schemes;
+
+ private ProtobufSchema schema;
+ private String contentClass;
+
+ public ProtobufSchemaResolver() {
+ this.schemes = new ConcurrentHashMap<>();
+ }
+
+ public String getSchema() {
+ if (this.schema != null) {
+ return this.schema.getSource().toString();
+ }
+
+ return null;
+ }
+
+ public void setSchema(String schema) {
+ if (ObjectHelper.isNotEmpty(schema)) {
+ try {
+ this.schema = ProtobufSchemaLoader.std.parse(schema);
+ } catch (IOException e) {
+ throw new RuntimeCamelException("Cannot parse protobuf schema", e);
+ }
+ } else {
+ this.schema = null;
+ }
+ }
+
+ public String getContentClass() {
+ return contentClass;
+ }
+
+ public void setContentClass(String contentClass) {
+ if (ObjectHelper.isNotEmpty(contentClass)) {
+ this.contentClass = contentClass;
+ } else {
+ this.contentClass = null;
+ }
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Object payload = exchange.getMessage().getBody();
+ if (payload == null) {
+ return;
+ }
+
+ ProtobufSchema answer = computeIfAbsent(exchange);
+
+ if (answer != null) {
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer);
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.PROTOBUF.type());
+ exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass));
+ }
+ }
+
+ @Override
+ public FormatSchema resolve(Exchange exchange) {
+ ProtobufSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, ProtobufSchema.class);
+ if (answer == null) {
+ answer = computeIfAbsent(exchange);
+ }
+
+ return answer;
+ }
+
+ private ProtobufSchema computeIfAbsent(Exchange exchange) {
+ if (this.schema != null) {
+ return this.schema;
+ }
+
+ ProtobufSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, ProtobufSchema.class);
+
+ if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) {
+ String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class);
+ try {
+ answer = ProtobufSchemaLoader.std.parse(schemaJson);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to parse Protobuf schema", e);
+ }
+ }
+
+ if (answer == null) {
+ String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass);
+ if (contentClass != null) {
+ answer = this.schemes.computeIfAbsent(contentClass, t -> {
+ Resource res = PluginHelper.getResourceLoader(exchange.getContext())
+ .resolveResource("classpath:schemas/" + SchemaType.AVRO.type() + "/" + t + "." + SchemaType.AVRO.type());
+
+ try {
+ if (res.exists()) {
+ try (InputStream is = res.getInputStream()) {
+ if (is != null) {
+ return Protobuf.MAPPER.schemaLoader().load(is);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to load Protobuf schema for type: " + t + ", resource: " + res.getLocation(), e);
+ }
+
+ try {
+ return Protobuf.MAPPER.generateSchemaFor(Class.forName(contentClass));
+ } catch (JsonMappingException | ClassNotFoundException e) {
+ throw new RuntimeException(
+ "Unable to compute Protobuf schema for type: " + t, e);
+ }
+ });
+ }
+ }
+
+ if (answer != null) {
+ this.schema = answer;
+ }
+
+ return answer;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
deleted file mode 100644
index 183f1112..00000000
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kamelets.utils.format.converter.standard;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.Exchange;
-import org.apache.camel.InvalidPayloadException;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
-import org.apache.camel.util.ObjectHelper;
-
-/**
- * Data type converter able to unmarshal to given unmarshalType using jackson data format.
- * <p/>
- * Unmarshal type should be given as a fully qualified class name in the exchange properties.
- */
-@DataType(name = "jsonObject", mediaType = "application/json")
-public class JsonModelDataType implements DataTypeConverter, CamelContextAware {
-
- public static final String DATA_TYPE_MODEL_PROPERTY = "CamelDataTypeModel";
-
- private String model;
-
- private CamelContext camelContext;
-
- private static final ObjectMapper mapper = new ObjectMapper();
-
- @Override
- public void convert(Exchange exchange) {
- String type;
- if (exchange.hasProperties() && exchange.getProperties().containsKey(DATA_TYPE_MODEL_PROPERTY)) {
- type = exchange.getProperty(DATA_TYPE_MODEL_PROPERTY, String.class);
- } else {
- type = model;
- }
-
- if (type == null) {
- // neither model property nor exchange property defines proper type - do nothing
- return;
- }
-
- ObjectHelper.notNull(camelContext, "camelContext");
-
- try {
- Object unmarshalled = mapper.reader().forType(camelContext.getClassResolver().resolveMandatoryClass(type)).readValue(getBodyAsStream(exchange));
- exchange.getMessage().setBody(unmarshalled);
- } catch (Exception e) {
- throw new CamelExecutionException(
- String.format("Failed to load Json unmarshalling type '%s'", type), exchange, e);
- }
- }
-
- private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException {
- InputStream bodyStream = exchange.getMessage().getBody(InputStream.class);
-
- if (bodyStream == null) {
- bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class));
- }
-
- return bodyStream;
- }
-
- @Override
- public CamelContext getCamelContext() {
- return camelContext;
- }
-
- public void setModel(String model) {
- this.model = model;
- }
-
- @Override
- public void setCamelContext(CamelContext camelContext) {
- this.camelContext = camelContext;
- }
-}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java
similarity index 57%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java
index 532e998b..9569e1d1 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java
@@ -15,24 +15,33 @@
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.text;
+
+import java.nio.charset.StandardCharsets;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter;
+import org.apache.camel.kamelets.utils.format.MimeType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
/**
- * Binary data type.
+ * Generic String data type converts Exchange payload to String representation using the Camel message body converter mechanism.
+ * By default, uses UTF-8 charset as encoding.
*/
-@DataType(name = "binary", mediaType = "application/octet-stream")
-public class BinaryDataType implements DataTypeConverter {
+@DataType(name = "text-plain", mediaType = "text/plain")
+public class StringDataType implements DataTypeConverter {
- private static final DataTypeConverter DELEGATE =
- new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary", "application/octet-stream", byte[].class);
+ private static final DataTypeConverter DELEGATE = new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string",
+ MimeType.TEXT.type(), String.class);
@Override
public void convert(Exchange exchange) {
+ exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, StandardCharsets.UTF_8.name());
+
DELEGATE.convert(exchange);
+
+ exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.TEXT.type());
}
}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java
new file mode 100644
index 00000000..17f22280
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.utils;
+
+import java.util.Objects;
+
+import org.apache.camel.Exchange;
+
+public final class PojoHelper {
+ private PojoHelper() {
+ }
+
+ public static boolean isString(Class<?> type) {
+ return String.class.isAssignableFrom(type);
+ }
+
+ public static boolean isNumber(Class<?> type) {
+ return Number.class.isAssignableFrom(type)
+ || int.class.isAssignableFrom(type)
+ || long.class.isAssignableFrom(type)
+ || short.class.isAssignableFrom(type)
+ || char.class.isAssignableFrom(type)
+ || float.class.isAssignableFrom(type)
+ || double.class.isAssignableFrom(type);
+ }
+
+ public static boolean isPrimitive(Class<?> type) {
+ return type.isPrimitive()
+ || (type.isArray() && type.getComponentType().isPrimitive())
+ || char.class.isAssignableFrom(type) || Character.class.isAssignableFrom(type)
+ || byte.class.isAssignableFrom(type) || Byte.class.isAssignableFrom(type)
+ || boolean.class.isAssignableFrom(type) || Boolean.class.isAssignableFrom(type);
+ }
+
+ public static boolean isPojo(Class<?> type) {
+ Package pkg = type.getPackage();
+ if (pkg != null) {
+ if (pkg.getName().startsWith("java")
+ || pkg.getName().startsWith("javax")
+ || pkg.getName().startsWith("com.sun")
+ || pkg.getName().startsWith("com.oracle")) {
+ return false;
+ }
+ }
+
+ if (isNumber(type)) {
+ return false;
+ }
+ if (isPrimitive(type)) {
+ return false;
+ }
+ if (isString(type)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public static boolean hasProperty(Exchange exchange, String name) {
+ return exchange.getProperties().containsKey(name);
+ }
+
+ public static boolean hasProperty(Exchange exchange, String name, Object value) {
+ return Objects.equals(
+ value,
+ exchange.getProperty(name, value.getClass()));
+ }
+
+ public static boolean hasHeader(Exchange exchange, String name, Object value) {
+ return Objects.equals(
+ value,
+ exchange.getMessage().getHeader(name, value.getClass()));
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java
new file mode 100644
index 00000000..87dea3f6
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java
@@ -0,0 +1,34 @@
+package org.apache.camel.kamelets.utils.format.converter.utils;
+
+import org.apache.camel.Exchange;
+
+public class SchemaHelper {
+
+ public static final String SCHEMA = "schema";
+ public static final String VALIDATE = "validate";
+ public static final String CONTENT_SCHEMA = "X-Content-Schema";
+ public static final String CONTENT_SCHEMA_TYPE = "X-Content-Schema-Type";
+ public static final String CONTENT_CLASS = "X-Content-Class";
+
+ private SchemaHelper() {
+ }
+
+ /**
+ * Helper resolves content class from exchange properties and as a fallback tries to retrieve the content class
+ * from the payload body type.
+ * @param exchange the Camel exchange eventually holding content class information in its properties.
+ * @param fallback the fallback content class information when no exchange property is set.
+ * @return the content class as String representation.
+ */
+ public static String resolveContentClass(Exchange exchange, String fallback) {
+ String contentClass = exchange.getProperty(CONTENT_CLASS, fallback, String.class);
+ if (contentClass == null) {
+ Object payload = exchange.getMessage().getBody();
+ if (payload != null && PojoHelper.isPojo(payload.getClass())) {
+ contentClass = payload.getClass().getName();
+ }
+ }
+
+ return contentClass;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java
new file mode 100644
index 00000000..26b541ee
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.schema;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver;
+import org.apache.camel.kamelets.utils.format.converter.json.JsonSchemaResolver;
+import org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Schema resolver processor delegates to either Avro or Json schema resolver based on the given mimetype property.
+ * When mimetype is of type application/x-java-object uses additional target mimetype (usually the produces mimetype) to
+ * determine the schema resolver (Avro or Json).
+ * Delegates to schema resolver and sets proper content class and schema properties on the delegate.
+ */
+public class DelegatingSchemaResolver implements Processor {
+ private String mimeType;
+ private String targetMimeType;
+
+ private String schema;
+ private String contentClass;
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (ObjectHelper.isEmpty(mimeType)) {
+ return;
+ }
+
+ MimeType mimeType = MimeType.of(this.mimeType);
+ Processor resolver;
+ if (mimeType.equals(MimeType.JAVA_OBJECT)) {
+ if (ObjectHelper.isEmpty(targetMimeType)) {
+ return;
+ }
+ resolver = fromMimeType(MimeType.of(targetMimeType));
+ } else {
+ resolver = fromMimeType(mimeType);
+ }
+
+ if (resolver != null) {
+ resolver.process(exchange);
+ }
+ }
+
+ private Processor fromMimeType(MimeType mimeType) {
+ switch (mimeType) {
+ case PROTOBUF:
+ ProtobufSchemaResolver protobufSchemaResolver = new ProtobufSchemaResolver();
+ protobufSchemaResolver.setSchema(this.schema);
+ protobufSchemaResolver.setContentClass(this.contentClass);
+ return protobufSchemaResolver;
+ case AVRO:
+ case AVRO_BINARY:
+ case AVRO_STRUCT:
+ AvroSchemaResolver avroSchemaResolver = new AvroSchemaResolver();
+ avroSchemaResolver.setSchema(this.schema);
+ avroSchemaResolver.setContentClass(this.contentClass);
+ return avroSchemaResolver;
+ case JSON:
+ case STRUCT:
+ JsonSchemaResolver jsonSchemaResolver = new JsonSchemaResolver();
+ jsonSchemaResolver.setSchema(this.schema);
+ jsonSchemaResolver.setContentClass(this.contentClass);
+ return jsonSchemaResolver;
+ default:
+ return null;
+ }
+ }
+
+ public String getMimeType() {
+ return mimeType;
+ }
+
+ public void setMimeType(String mimeType) {
+ this.mimeType = mimeType;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+
+ public String getContentClass() {
+ return contentClass;
+ }
+
+ public void setContentClass(String contentClass) {
+ this.contentClass = contentClass;
+ }
+
+ public String getTargetMimeType() {
+ return targetMimeType;
+ }
+
+ public void setTargetMimeType(String targetMimeType) {
+ this.targetMimeType = targetMimeType;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java
deleted file mode 100644
index 4d09f9d5..00000000
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kamelets.utils.serialization;
-
-import java.io.IOException;
-
-import com.fasterxml.jackson.core.FormatSchema;
-import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchema;
-import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchemaLoader;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.jackson.SchemaResolver;
-
-public class InflightProtobufSchemaResolver implements SchemaResolver {
-
- @Override
- public FormatSchema resolve(Exchange exchange) {
- String schemaStr = (String) exchange.getProperty("schema");
- try {
- ProtobufSchema schema = ProtobufSchemaLoader.std.parse(schemaStr);
- return schema;
- } catch(IOException e) {
- throw new RuntimeCamelException("Cannot parse protobuf schema", e);
- }
- }
-}
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
index 1cd0ed78..96cc4b09 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter
@@ -15,7 +15,11 @@
# limitations under the License.
#
-org.apache.camel.kamelets.utils.format.converter.standard
+org.apache.camel.kamelets.utils.format.converter.bytes
+org.apache.camel.kamelets.utils.format.converter.text
+org.apache.camel.kamelets.utils.format.converter.pojo
+org.apache.camel.kamelets.utils.format.converter.avro
+org.apache.camel.kamelets.utils.format.converter.json
org.apache.camel.kamelets.utils.format.converter.aws2.s3
org.apache.camel.kamelets.utils.format.converter.aws2.ddb
org.apache.camel.kamelets.utils.format.converter.http
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-application-json
similarity index 100%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-application-json
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-application-cloudevents
similarity index 100%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-application-cloudevents
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json
index edf9a4ca..18aebea0 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.json.JsonDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream
similarity index 90%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream
index 2f725f6a..520b0341 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object
index edf9a4ca..3f807983 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.pojo.JavaObjectDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct
index edf9a4ca..fef2aab8 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.json.JsonStructDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary
similarity index 90%
copy from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
copy to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary
index edf9a4ca..69fc5309 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.avro.AvroBinaryDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct
similarity index 90%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct
index edf9a4ca..070e3147 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.avro.AvroStructDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text
similarity index 90%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text
index 8ef25725..3233f427 100644
--- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string
+++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.kamelets.utils.format.converter.standard.StringDataType
\ No newline at end of file
+class=org.apache.camel.kamelets.utils.format.converter.text.StringDataType
diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-application-cloudevents
similarity index 100%
rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents
rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-application-cloudevents
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
index b281f314..341ad05d 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java
@@ -21,7 +21,7 @@ import java.util.Optional;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
+import org.apache.camel.kamelets.utils.format.converter.json.JsonStructDataType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -49,9 +49,9 @@ class DefaultDataTypeConverterResolverTest {
@Test
public void shouldResolveDataTypeConverters() throws Exception {
- Optional<DataTypeConverter> converter = resolver.resolve("jsonObject", camelContext);
+ Optional<DataTypeConverter> converter = resolver.resolve("application-x-struct", camelContext);
Assertions.assertTrue(converter.isPresent());
- Assertions.assertEquals(JsonModelDataType.class, converter.get().getClass());
+ Assertions.assertEquals(JsonStructDataType.class, converter.get().getClass());
converter = resolver.resolve("foo", "json", camelContext);
Assertions.assertTrue(converter.isPresent());
@@ -73,4 +73,4 @@ class DefaultDataTypeConverterResolverTest {
return "foo";
}
}
-}
\ No newline at end of file
+}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
index d83c474b..9061c294 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java
@@ -21,9 +21,8 @@ import java.util.Optional;
import org.apache.camel.CamelContextAware;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
-import org.apache.camel.kamelets.utils.format.converter.standard.StringDataType;
+import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType;
+import org.apache.camel.kamelets.utils.format.converter.text.StringDataType;
import org.apache.camel.kamelets.utils.format.converter.test.UppercaseDataType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.junit.jupiter.api.Assertions;
@@ -41,15 +40,12 @@ class DefaultDataTypeRegistryTest {
@Test
public void shouldLookupDefaultDataTypeConverters() throws Exception {
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "jsonObject");
- Assertions.assertTrue(converter.isPresent());
- Assertions.assertEquals(JsonModelDataType.class, converter.get().getClass());
- converter = dataTypeRegistry.lookup( "string");
+ Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "plain-text");
Assertions.assertTrue(converter.isPresent());
Assertions.assertEquals(StringDataType.class, converter.get().getClass());
- converter = dataTypeRegistry.lookup( "binary");
+ converter = dataTypeRegistry.lookup( "application-octet-stream");
Assertions.assertTrue(converter.isPresent());
- Assertions.assertEquals(BinaryDataType.class, converter.get().getClass());
+ Assertions.assertEquals(ByteArrayDataType.class, converter.get().getClass());
converter = dataTypeRegistry.lookup( "lowercase");
Assertions.assertTrue(converter.isPresent());
converter = dataTypeRegistry.lookup( "uppercase");
@@ -57,4 +53,4 @@ class DefaultDataTypeRegistryTest {
Assertions.assertEquals(UppercaseDataType.class, converter.get().getClass());
}
-}
\ No newline at end of file
+}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
index 7f1f9e9f..6d2fee23 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java
@@ -20,7 +20,6 @@ package org.apache.camel.kamelets.utils.format.converter.aws2.ddb;
import java.util.Map;
import java.util.Optional;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
@@ -28,6 +27,7 @@ import org.apache.camel.component.aws2.ddb.Ddb2Constants;
import org.apache.camel.component.aws2.ddb.Ddb2Operations;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.json.Json;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Assertions;
@@ -42,8 +42,6 @@ public class Ddb2JsonInputTypeTest {
private DefaultCamelContext camelContext;
- private final ObjectMapper mapper = new ObjectMapper();
-
private final Ddb2JsonInputType inputType = new Ddb2JsonInputType();
private final String keyJson = "{" +
@@ -69,7 +67,7 @@ public class Ddb2JsonInputTypeTest {
void shouldMapPutItemHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.getMessage().setBody(mapper.readTree(itemJson));
+ exchange.getMessage().setBody(Json.MAPPER.readTree(itemJson));
exchange.setProperty("operation", Ddb2Operations.PutItem.name());
inputType.convert(exchange);
@@ -85,7 +83,7 @@ public class Ddb2JsonInputTypeTest {
void shouldMapUpdateItemHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.getMessage().setBody(mapper.readTree("{\"operation\": \"" + Ddb2Operations.UpdateItem.name() + "\", \"key\": "
+ exchange.getMessage().setBody(Json.MAPPER.readTree("{\"operation\": \"" + Ddb2Operations.UpdateItem.name() + "\", \"key\": "
+ keyJson + ", \"item\": " + itemJson + "}"));
inputType.convert(exchange);
@@ -106,7 +104,7 @@ public class Ddb2JsonInputTypeTest {
void shouldMapDeleteItemHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + "}"));
+ exchange.getMessage().setBody(Json.MAPPER.readTree("{\"key\": " + keyJson + "}"));
exchange.setProperty("operation", Ddb2Operations.DeleteItem.name());
inputType.convert(exchange);
@@ -125,7 +123,7 @@ public class Ddb2JsonInputTypeTest {
void shouldMapNestedObjects() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson + "}"));
+ exchange.getMessage().setBody(Json.MAPPER.readTree("{\"user\":" + itemJson + "}"));
exchange.setProperty("operation", Ddb2Operations.PutItem.name());
inputType.convert(exchange);
@@ -176,7 +174,7 @@ public class Ddb2JsonInputTypeTest {
void shouldFailForUnsupportedOperation() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.getMessage().setBody(mapper.readTree("{}"));
+ exchange.getMessage().setBody(Json.MAPPER.readTree("{}"));
exchange.setProperty("operation", Ddb2Operations.BatchGetItems.name());
Assertions.assertThrows(UnsupportedOperationException.class, () -> inputType.convert(exchange));
@@ -186,7 +184,7 @@ public class Ddb2JsonInputTypeTest {
public void shouldLookupDataType() throws Exception {
DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-ddb", "json");
+ Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-ddb", "application-json");
Assertions.assertTrue(converter.isPresent());
}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
index 53570dd9..f5d8a082 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java
@@ -62,7 +62,7 @@ class AWS2S3CloudEventOutputTypeTest {
public void shouldLookupDataType() throws Exception {
DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "cloudevents");
+ Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("aws2-s3", "application-cloudevents");
Assertions.assertTrue(converter.isPresent());
}
}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
index 01e331e6..0e73cb0a 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java
@@ -67,7 +67,7 @@ class HttpCloudEventOutputTypeTest {
public void shouldLookupDataType() throws Exception {
DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("http", "cloudevents");
+ Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("http", "application-cloudevents");
Assertions.assertTrue(converter.isPresent());
}
}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
deleted file mode 100644
index 7785017c..00000000
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kamelets.utils.format.converter.standard;
-
-import java.util.Optional;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
-import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
-import org.apache.camel.support.DefaultExchange;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class JsonModelDataTypeTest {
-
- private final DefaultCamelContext camelContext = new DefaultCamelContext();
-
- private final JsonModelDataType dataType = new JsonModelDataType();
-
- @BeforeEach
- public void setup() {
- dataType.setCamelContext(camelContext);
- }
-
- @Test
- void shouldMapStringToJsonModelWithModelProperty() throws Exception {
- Exchange exchange = new DefaultExchange(camelContext);
-
- exchange.getMessage().setBody("{ \"name\": \"Rajesh\", \"age\": 28}");
- dataType.setModel(Person.class.getName());
- dataType.convert(exchange);
-
- assertEquals(Person.class, exchange.getMessage().getBody().getClass());
- assertEquals("Rajesh", exchange.getMessage().getBody(Person.class).getName());
- }
-
- @Test
- void shouldMapStringToJsonModelWithExchangeProperty() throws Exception {
- Exchange exchange = new DefaultExchange(camelContext);
-
- exchange.setProperty(JsonModelDataType.DATA_TYPE_MODEL_PROPERTY, Person.class.getName());
- exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}");
- dataType.convert(exchange);
-
- assertEquals(Person.class, exchange.getMessage().getBody().getClass());
- assertEquals("Sheldon", exchange.getMessage().getBody(Person.class).getName());
- }
-
- @Test
- public void shouldLookupDataType() throws Exception {
- DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
- CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup("jsonObject");
- Assertions.assertTrue(converter.isPresent());
- }
-
- public static class Person {
- @JsonProperty
- private String name;
-
- @JsonProperty
- private Long age;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public Long getAge() {
- return age;
- }
-
- public void setAge(Long age) {
- this.age = age;
- }
- }
-
-}
\ No newline at end of file
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java
similarity index 94%
rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java
rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java
index d2dd616a..09ba2b93 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.standard.bytes;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
@@ -24,6 +24,7 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Assertions;
@@ -31,11 +32,11 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class BinaryDataTypeTest {
+public class ByteArrayDataTypeTest {
private final DefaultCamelContext camelContext = new DefaultCamelContext();
- private final BinaryDataType dataType = new BinaryDataType();
+ private final ByteArrayDataType dataType = new ByteArrayDataType();
@Test
void shouldRetainBytesModel() throws Exception {
@@ -89,7 +90,7 @@ public class BinaryDataTypeTest {
public void shouldLookupDataType() throws Exception {
DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "binary");
+ Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "application-octet-stream");
Assertions.assertTrue(converter.isPresent());
}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java
similarity index 97%
rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java
rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java
index 8ee19cba..32a9b569 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.format.converter.standard;
+package org.apache.camel.kamelets.utils.format.converter.standard.text;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
@@ -24,6 +24,7 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry;
+import org.apache.camel.kamelets.utils.format.converter.text.StringDataType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Assertions;
@@ -77,7 +78,7 @@ public class StringDataTypeTest {
public void shouldLookupDataType() throws Exception {
DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry();
CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext);
- Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "string");
+ Optional<DataTypeConverter> converter = dataTypeRegistry.lookup( "plain-text");
Assertions.assertTrue(converter.isPresent());
}
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
index af6c92f2..ca675a7e 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.kamelets.utils.headers;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultExchange;
@@ -28,15 +27,13 @@ class DeDuplicateHeadersTest {
private DefaultCamelContext camelContext;
- private final ObjectMapper mapper = new ObjectMapper();
-
private DeDuplicateNamingHeaders processor;
@BeforeEach
void setup() {
camelContext = new DefaultCamelContext();
}
-
+
@Test
void shouldDuplicateHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
@@ -56,7 +53,7 @@ class DeDuplicateHeadersTest {
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC"));
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
}
-
+
@Test
void shouldDuplicateSelectedHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
@@ -78,7 +75,7 @@ class DeDuplicateHeadersTest {
Assertions.assertFalse(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC"));
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
}
-
+
@Test
void shouldDeDuplicateSelectedHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
index fdff9401..3cb3ca87 100644
--- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
+++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java
@@ -16,10 +16,6 @@
*/
package org.apache.camel.kamelets.utils.headers;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.HashMap;
-
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultExchange;
@@ -31,15 +27,13 @@ class DuplicateHeadersTest {
private DefaultCamelContext camelContext;
- private final ObjectMapper mapper = new ObjectMapper();
-
private DuplicateNamingHeaders processor;
@BeforeEach
void setup() {
camelContext = new DefaultCamelContext();
}
-
+
@Test
void shouldDuplicateHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
@@ -57,7 +51,7 @@ class DuplicateHeadersTest {
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("aws.s3.bucket.name"));
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header"));
}
-
+
@Test
void shouldDuplicateSelectedHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
diff --git a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject b/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
deleted file mode 100644
index 2f725f6a..00000000
--- a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-class=org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType
\ No newline at end of file
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
index 3bec55f7..ebff6605 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
title: "Avro Deserialize Action"
description: "Deserialize payload to Avro"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -54,23 +52,21 @@ spec:
- "camel:core"
- "camel:jackson-avro"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+ property:
+ - key: validate
+ value: '{{validate}}'
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- - set-property:
- name: validate
- constant: "{{validate}}"
- unmarshal:
avro:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- - remove-property:
- name: schema
- - remove-property:
- name: validate
+ schemaResolver: "#bean:{{schemaResolver}}"
- remove-header:
name: "Content-Type"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml
index 06830b27..93a50270 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml
@@ -33,8 +33,6 @@ spec:
title: "Avro Serialize Action"
description: "Serialize payload to Avro"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -54,24 +52,22 @@ spec:
- "camel:core"
- "camel:jackson-avro"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
+ property:
+ - key: validate
+ value: '{{validate}}'
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- - set-property:
- name: validate
- constant: "{{validate}}"
- marshal:
avro:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- - remove-property:
- name: schema
- - remove-property:
- name: validate
+ schemaResolver: "#bean:{{schemaResolver}}"
- set-header:
name: "Content-Type"
constant: "application/avro"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml
index bf06dc4f..c0727a5f 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
title: "Protobuf Deserialize Action"
description: "Deserialize payload to Protobuf"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -46,18 +44,19 @@ spec:
- "camel:core"
- "camel:jackson-protobuf"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+ property:
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- unmarshal:
protobuf:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
- - remove-property:
- name: schema
+ schemaResolver: "#bean:{{schemaResolver}}"
- remove-header:
name: "Content-Type"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml
index 56f321db..36218aec 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml
@@ -32,8 +32,6 @@ spec:
title: "Protobuf Serialize Action"
description: "Serialize payload to Protobuf"
type: object
- required:
- - schema
properties:
schema:
title: Schema
@@ -46,19 +44,20 @@ spec:
- "camel:core"
- "camel:jackson-protobuf"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
+ property:
+ - key: schema
+ value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- marshal:
protobuf:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
- - remove-property:
- name: schema
+ schemaResolver: "#bean:{{schemaResolver}}"
- set-header:
name: "Content-Type"
constant: "application/protobuf"
diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml
similarity index 75%
copy from library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
copy to library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml
index 3bec55f7..3bafc090 100644
--- a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml
+++ b/library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml
@@ -18,7 +18,7 @@
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
- name: avro-deserialize-action
+ name: resolve-pojo-schema-action
annotations:
camel.apache.org/kamelet.support.level: "Stable"
camel.apache.org/catalog.version: "4.0.0-SNAPSHOT"
@@ -30,47 +30,47 @@ metadata:
camel.apache.org/kamelet.type: "action"
spec:
definition:
- title: "Avro Deserialize Action"
- description: "Deserialize payload to Avro"
+ title: "Resolve Schema Action"
+ description: "Resolves schema from given mime type and payload. Sets the resolved schema, the schema type and its content class as properties for later reference."
type: object
- required:
- - schema
properties:
+ mimeType:
+ title: Mime Type
+ description: The mime type to determine the schema resolver implementation that should perform the operation.
+ type: string
+ default: "application/json"
+ example: "application/json"
schema:
title: Schema
- description: The Avro schema to use during serialization (as single-line, using JSON format)
+ description: Optional schema content (as single-line, using JSON format).
+ type: string
+ contentClass:
+ title: Content Class
+ description: Type information of the content object. Fully qualified class name.
+ type: string
+ example: "org.apache.camel.content.Foo"
+ targetMimeType:
+ title: Target Mime Type
+ description: Additional mime type information used to determine the schema resolver. Usually only used in combination with mime type "application/x-java-object"
type: string
- example: '{"type": "record", "namespace": "com.example", "name": "FullName", "fields": [{"name": "first", "type": "string"},{"name": "last", "type": "string"}]}'
- validate:
- title: Validate
- description: Indicates if the content must be validated against the schema
- type: boolean
- default: true
- x-descriptors:
- - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+ example: "application/json"
dependencies:
- - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
- - "camel:kamelet"
- - "camel:core"
- - "camel:jackson-avro"
+ - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT"
+ - "camel:kamelet"
+ - "camel:core"
+ - "camel:jackson-avro"
+ - "camel:jackson-protobuf"
template:
+ beans:
+ - name: schemaResolver
+ type: "#class:org.apache.camel.kamelets.utils.format.schema.DelegatingSchemaResolver"
+ properties:
+ mimeType: '{{mimeType}}'
+ schema: '{{schema:}}'
+ contentClass: '{{contentClass:}}'
+ targetMimeType: '{{targetMimeType:}}'
from:
- uri: kamelet:source
+ uri: "kamelet:source"
steps:
- - set-property:
- name: schema
- constant: "{{schema}}"
- - set-property:
- name: validate
- constant: "{{validate}}"
- - unmarshal:
- avro:
- library: Jackson
- unmarshalType: com.fasterxml.jackson.databind.JsonNode
- schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- - remove-property:
- name: schema
- - remove-property:
- name: validate
- - remove-header:
- name: "Content-Type"
+ - process:
+ ref: "{{schemaResolver}}"
diff --git a/test/avro-data-type/README.md b/test/avro-data-type/README.md
new file mode 100644
index 00000000..dcdc29e4
--- /dev/null
+++ b/test/avro-data-type/README.md
@@ -0,0 +1,42 @@
+# Avro data type
+
+This test verifies the Avro data type serialization/deserialization
+
+## Objectives
+
+The test verifies the proper serialization and deserialization of Avro data types `avro/binary` and `avro/x-struct`.
+
+The test uses two KameletBindings that interact with each other. The first binding `json-to-avro` periodically creates a test data event as Json and applies the `avro/binary` data type using the schema in [User.avsc](User.avsc).
+
+The binary Avro data is then sent to a Http webhook sink that references an Http endpoint that is provided by the 2nd binding `avro-to-log`. The `avro-to-log` binding provides the Http service and deserializes the binary Avro data using the same User schema. The deserialized data is printed to the log output.
+
+The test starts both KameletBindings and is able to verify the proper log output as an expected outcome.
+
+### YAKS Test
+
+The test performs the following high level steps:
+
+*Avro data type feature*
+- Create test data based on the User.avsc Avro schema
+- Load and run the `avro-to-log` KameletBinding
+- Load and run the `json-to-avro` KameletBinding
+- Verify that the bindings do interact with each other and the proper test data is logged in the binding output
+
+## Installation
+
+The test assumes that you have [JBang](https://www.jbang.dev/) installed and the YAKS CLI setup locally.
+
+You can review the installation steps for the tooling in the documentation:
+
+- [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html)
+- [Install YAKS CLI](https://github.com/citrusframework/yaks#installation)
+
+## Run the tests with JBang
+
+To run tests with URI based configuration:
+
+```shell script
+$ yaks run --local test/avro-data-type/avro-serdes-action.feature
+```
+
+You will be provided with the test log output and the test results.
diff --git a/test/avro-data-type/User.avsc b/test/avro-data-type/User.avsc
new file mode 100644
index 00000000..a2d21f3b
--- /dev/null
+++ b/test/avro-data-type/User.avsc
@@ -0,0 +1,23 @@
+{
+ "name": "User",
+ "type": "record",
+ "namespace": "demo.kamelets",
+ "fields": [
+ {
+ "name": "id",
+ "type": "string"
+ },
+ {
+ "name": "firstname",
+ "type": "string"
+ },
+ {
+ "name": "lastname",
+ "type": "string"
+ },
+ {
+ "name": "age",
+ "type": "int"
+ },
+ ]
+}
diff --git a/test/avro-data-type/avro-data-type.feature b/test/avro-data-type/avro-data-type.feature
new file mode 100644
index 00000000..28765a9d
--- /dev/null
+++ b/test/avro-data-type/avro-data-type.feature
@@ -0,0 +1,22 @@
+Feature: Avro data type
+
+ Scenario: Create Kamelet bindings
+ Given variable uuid is "citrus:randomUUID()"
+ Given variable user is
+ """
+ { "id": "${uuid}", "firstname": "Sheldon", "lastname": "Cooper", "age": 28 }
+ """
+ # Create avro-to-log binding
+ When load KameletBinding avro-to-log-binding.yaml
+ Then Camel K integration avro-to-log-binding should be running
+
+ # Create json-to-avro binding
+ When load KameletBinding json-to-avro-binding.yaml
+ Then Camel K integration json-to-avro-binding should be running
+
+ # Verify output message sent
+ Then Camel K integration avro-to-log-binding should print Body: { "id" : "${uuid}", "firstname" : "Sheldon", "lastname" : "Cooper", "age" : 28}
+
+ Scenario: Remove resources
+ Given delete KameletBinding avro-to-log-binding
+ Given delete KameletBinding json-to-avro-binding
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-data-type/avro-to-log-binding.yaml
similarity index 69%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-data-type/avro-to-log-binding.yaml
index da02745c..e84a816d 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-data-type/avro-to-log-binding.yaml
@@ -18,31 +18,35 @@
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
- name: data-type-action-binding
+ name: avro-to-log-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: timer-source
+ name: webhook-source
properties:
- period: 5000
- contentType: application/json
- message: >
- ${input}
+ subpath: user
steps:
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: data-type-action
+ name: resolve-pojo-schema-action
properties:
- scheme: "http"
- format: "cloudevents"
+ mimeType: "avro/binary"
+ schema: >
+ { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: log-action
+ name: data-type-action
properties:
- showHeaders: true
+ scheme: "camel"
+ format: "avro-x-struct"
sink:
- uri: yaks:resolveURL('test-service')/result
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1alpha1
+ name: log-action
+ properties:
+ showHeaders: true
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-data-type/json-to-avro-binding.yaml
similarity index 71%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-data-type/json-to-avro-binding.yaml
index da02745c..c57c692c 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-data-type/json-to-avro-binding.yaml
@@ -18,7 +18,7 @@
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
- name: data-type-action-binding
+ name: json-to-avro-binding
spec:
source:
ref:
@@ -29,20 +29,26 @@ spec:
period: 5000
contentType: application/json
message: >
- ${input}
+ ${user}
steps:
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: data-type-action
+ name: json-deserialize-action
+ - ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1alpha1
+ name: resolve-pojo-schema-action
properties:
- scheme: "http"
- format: "cloudevents"
+ mimeType: "avro/binary"
+ schema: >
+ { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: log-action
+ name: data-type-action
properties:
- showHeaders: true
+ scheme: "camel"
+ format: "avro-binary"
sink:
- uri: yaks:resolveURL('test-service')/result
+ uri: http://localhost:8080/user
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-data-type/yaks-config.yaml
similarity index 55%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-data-type/yaks-config.yaml
index da02745c..b5739446 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-data-type/yaks-config.yaml
@@ -15,34 +15,33 @@
# limitations under the License.
# ---------------------------------------------------------------------------
-apiVersion: camel.apache.org/v1alpha1
-kind: KameletBinding
-metadata:
- name: data-type-action-binding
-spec:
- source:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1alpha1
- name: timer-source
- properties:
- period: 5000
- contentType: application/json
- message: >
- ${input}
- steps:
- - ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1alpha1
- name: data-type-action
- properties:
- scheme: "http"
- format: "cloudevents"
- - ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1alpha1
- name: log-action
- properties:
- showHeaders: true
- sink:
- uri: yaks:resolveURL('test-service')/result
+config:
+ namespace:
+ temporary: false
+ runtime:
+ env:
+ - name: YAKS_JBANG_KAMELETS_LOCAL_DIR
+ value: "../../../kamelets"
+ - name: YAKS_CAMELK_KAMELET_API_VERSION
+ value: v1alpha1
+ - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+ value: false
+ - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+ value: false
+ - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT
+ value: true
+ settings:
+ loggers:
+ - name: INTEGRATION_STATUS
+ level: INFO
+ - name: INTEGRATION_LOGS
+ level: INFO
+ resources:
+ - json-to-avro-binding.yaml
+ - avro-to-log-binding.yaml
+ - User.avsc
+ dump:
+ enabled: true
+ failedOnly: true
+ includes:
+ - app=camel-k
diff --git a/test/avro-serdes-action/README.md b/test/avro-serdes-action/README.md
new file mode 100644
index 00000000..1a8b1b29
--- /dev/null
+++ b/test/avro-serdes-action/README.md
@@ -0,0 +1,42 @@
+# Avro serialization/deserialization
+
+This test verifies the Avro serialization/deserialization actions
+
+## Objectives
+
+The test verifies the proper Avro serialization and deserialization of Avro.
+
+The test uses two KameletBindings that interact with each other. The first binding `json-to-avro` periodically creates a test data event as Json and applies the `avro/binary` data type using the schema in [User.avsc](User.avsc).
+
+The binary Avro data is then sent to a Http webhook sink that references an Http endpoint that is provided by the 2nd binding `avro-to-log`. The `avro-to-log` binding provides the Http service and deserializes the binary Avro data using the same User schema. The deserialized data is printed to the log output.
+
+The test starts both KameletBindings and is able to verify the proper log output as an expected outcome.
+
+### YAKS Test
+
+The test performs the following high level steps:
+
+*Avro data type feature*
+- Create test data based on the User.avsc Avro schema
+- Load and run the `avro-to-log` KameletBinding
+- Load and run the `json-to-avro` KameletBinding
+- Verify that the bindings do interact with each other and the proper test data is logged in the binding output
+
+## Installation
+
+The test assumes that you have [JBang](https://www.jbang.dev/) installed and the YAKS CLI setup locally.
+
+You can review the installation steps for the tooling in the documentation:
+
+- [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html)
+- [Install YAKS CLI](https://github.com/citrusframework/yaks#installation)
+
+## Run the tests with JBang
+
+To run tests with URI based configuration:
+
+```shell script
+$ yaks run --local test/avro-serdes-action/avro-serdes-action.feature
+```
+
+You will be provided with the test log output and the test results.
diff --git a/test/avro-serdes-action/User.avsc b/test/avro-serdes-action/User.avsc
new file mode 100644
index 00000000..a2d21f3b
--- /dev/null
+++ b/test/avro-serdes-action/User.avsc
@@ -0,0 +1,23 @@
+{
+ "name": "User",
+ "type": "record",
+ "namespace": "demo.kamelets",
+ "fields": [
+ {
+ "name": "id",
+ "type": "string"
+ },
+ {
+ "name": "firstname",
+ "type": "string"
+ },
+ {
+ "name": "lastname",
+ "type": "string"
+ },
+ {
+ "name": "age",
+ "type": "int"
+ },
+ ]
+}
diff --git a/test/avro-serdes-action/avro-serdes-action.feature b/test/avro-serdes-action/avro-serdes-action.feature
new file mode 100644
index 00000000..d09e2450
--- /dev/null
+++ b/test/avro-serdes-action/avro-serdes-action.feature
@@ -0,0 +1,22 @@
+Feature: Avro serialize/deserialize action
+
+ Scenario: Create Kamelet bindings
+ Given variable uuid is "citrus:randomUUID()"
+ Given variable user is
+ """
+ { "id": "${uuid}", "firstname": "Sheldon", "lastname": "Cooper", "age": 28 }
+ """
+ # Create avro-to-log binding
+ When load KameletBinding avro-to-log-binding.yaml
+ Then Camel K integration avro-to-log-binding should be running
+
+ # Create json-to-avro binding
+ When load KameletBinding json-to-avro-binding.yaml
+ Then Camel K integration json-to-avro-binding should be running
+
+ # Verify output message sent
+ Then Camel K integration avro-to-log-binding should print Body: { "id" : "${uuid}", "firstname" : "Sheldon", "lastname" : "Cooper", "age" : 28}
+
+ Scenario: Remove resources
+ Given delete KameletBinding avro-to-log-binding
+ Given delete KameletBinding json-to-avro-binding
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-serdes-action/avro-to-log-binding.yaml
similarity index 70%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-serdes-action/avro-to-log-binding.yaml
index da02745c..72539376 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-serdes-action/avro-to-log-binding.yaml
@@ -18,31 +18,27 @@
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
- name: data-type-action-binding
+ name: avro-to-log-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: timer-source
+ name: webhook-source
properties:
- period: 5000
- contentType: application/json
- message: >
- ${input}
+ subpath: user
steps:
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: data-type-action
+ name: avro-deserialize-action
properties:
- scheme: "http"
- format: "cloudevents"
- - ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1alpha1
- name: log-action
- properties:
- showHeaders: true
+ schema: >
+ { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
sink:
- uri: yaks:resolveURL('test-service')/result
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1alpha1
+ name: log-action
+ properties:
+ showHeaders: true
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-serdes-action/json-to-avro-binding.yaml
similarity index 74%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-serdes-action/json-to-avro-binding.yaml
index da02745c..be438840 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-serdes-action/json-to-avro-binding.yaml
@@ -18,7 +18,7 @@
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
- name: data-type-action-binding
+ name: json-to-avro-binding
spec:
source:
ref:
@@ -29,15 +29,19 @@ spec:
period: 5000
contentType: application/json
message: >
- ${input}
+ ${user}
steps:
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
- name: data-type-action
+ name: json-deserialize-action
+ - ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1alpha1
+ name: avro-serialize-action
properties:
- scheme: "http"
- format: "cloudevents"
+ schema: >
+ { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
@@ -45,4 +49,4 @@ spec:
properties:
showHeaders: true
sink:
- uri: yaks:resolveURL('test-service')/result
+ uri: http://localhost:8080/user
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/avro-serdes-action/yaks-config.yaml
similarity index 55%
copy from test/data-type-action/data-type-action-binding.yaml
copy to test/avro-serdes-action/yaks-config.yaml
index da02745c..b5739446 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/avro-serdes-action/yaks-config.yaml
@@ -15,34 +15,33 @@
# limitations under the License.
# ---------------------------------------------------------------------------
-apiVersion: camel.apache.org/v1alpha1
-kind: KameletBinding
-metadata:
- name: data-type-action-binding
-spec:
- source:
- ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1alpha1
- name: timer-source
- properties:
- period: 5000
- contentType: application/json
- message: >
- ${input}
- steps:
- - ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1alpha1
- name: data-type-action
- properties:
- scheme: "http"
- format: "cloudevents"
- - ref:
- kind: Kamelet
- apiVersion: camel.apache.org/v1alpha1
- name: log-action
- properties:
- showHeaders: true
- sink:
- uri: yaks:resolveURL('test-service')/result
+config:
+ namespace:
+ temporary: false
+ runtime:
+ env:
+ - name: YAKS_JBANG_KAMELETS_LOCAL_DIR
+ value: "../../../kamelets"
+ - name: YAKS_CAMELK_KAMELET_API_VERSION
+ value: v1alpha1
+ - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+ value: false
+ - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+ value: false
+ - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT
+ value: true
+ settings:
+ loggers:
+ - name: INTEGRATION_STATUS
+ level: INFO
+ - name: INTEGRATION_LOGS
+ level: INFO
+ resources:
+ - json-to-avro-binding.yaml
+ - avro-to-log-binding.yaml
+ - User.avsc
+ dump:
+ enabled: true
+ failedOnly: true
+ includes:
+ - app=camel-k
diff --git a/test/aws-s3/aws-s3-knative-broker.feature b/test/aws-s3/aws-s3-knative-broker.feature
index fe935dc7..719f91d2 100644
--- a/test/aws-s3/aws-s3-knative-broker.feature
+++ b/test/aws-s3/aws-s3-knative-broker.feature
@@ -5,7 +5,7 @@ Feature: AWS S3 Kamelet - Knative broker binding
Given Knative event consumer timeout is 20000 ms
Given variables
| aws.s3.scheme | camel |
- | aws.s3.format | string |
+ | aws.s3.format | plain-text |
| aws.s3.bucketNameOrArn | mybucket |
| aws.s3.message | Hello from S3 Kamelet |
| aws.s3.key | hello.txt |
diff --git a/test/aws-s3/aws-s3-knative-cloudevents.feature b/test/aws-s3/aws-s3-knative-cloudevents.feature
index 647c8717..0af41639 100644
--- a/test/aws-s3/aws-s3-knative-cloudevents.feature
+++ b/test/aws-s3/aws-s3-knative-cloudevents.feature
@@ -5,7 +5,7 @@ Feature: AWS S3 Kamelet - cloud events data type
Given Knative event consumer timeout is 20000 ms
Given variables
| aws.s3.scheme | aws2-s3 |
- | aws.s3.format | cloudevents |
+ | aws.s3.format | application-cloudevents |
| aws.s3.bucketNameOrArn | mybucket |
| aws.s3.message | Hello from S3 Kamelet |
| aws.s3.key | hello.txt |
diff --git a/test/aws-s3/aws-s3-to-http.yaml b/test/aws-s3/aws-s3-to-http.yaml
index d0e2bd41..1a7b931d 100644
--- a/test/aws-s3/aws-s3-to-http.yaml
+++ b/test/aws-s3/aws-s3-to-http.yaml
@@ -39,14 +39,14 @@ spec:
name: data-type-action
properties:
scheme: "aws2-s3"
- format: "cloudevents"
+ format: "application-cloudevents"
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: data-type-action
properties:
scheme: "http"
- format: "cloudevents"
+ format: "application-cloudevents"
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/data-type-action/data-type-action-binding.yaml
index da02745c..09719574 100644
--- a/test/data-type-action/data-type-action-binding.yaml
+++ b/test/data-type-action/data-type-action-binding.yaml
@@ -37,7 +37,7 @@ spec:
name: data-type-action
properties:
scheme: "http"
- format: "cloudevents"
+ format: "application-cloudevents"
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1