You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/09 17:52:12 UTC
[camel-kafka-connector] 01/01: Add the basis for Camel Type
Converter transformations
This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch support-camel-type-converter
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f15b53b9beef270736c25a633decb51cc4dbec9b
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Mon Dec 9 18:51:45 2019 +0100
Add the basis for Camel Type Converter transformations
---
core/pom.xml | 5 +
.../transforms/CamelTransformSupport.java | 15 +++
.../transforms/CamelTypeConverterTransform.java | 114 +++++++++++++++++++++
.../camel/kafkaconnector/utils/SchemaHelper.java | 59 +++++++++++
.../CamelTypeConverterTransformTest.java | 35 +++++++
parent/pom.xml | 6 ++
6 files changed, 234 insertions(+)
diff --git a/core/pom.xml b/core/pom.xml
index fe67cf3..e803fb0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -97,6 +97,11 @@
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-transforms</artifactId>
+ <scope>provided</scope>
+ </dependency>
<!-- Test -->
<dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
new file mode 100644
index 0000000..19fae92
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
@@ -0,0 +1,15 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+public abstract class CamelTransformSupport<R extends ConnectRecord<R>> implements Transformation<R> {
+
+ private final CamelContext camelContext = new DefaultCamelContext();
+
+ protected CamelContext getCamelContext() {
+ return camelContext;
+ }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
new file mode 100644
index 0000000..07351e7
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
@@ -0,0 +1,114 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Map;
+
+import org.apache.camel.TypeConverter;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> {
+
+ private interface ConfigName {
+ String FIELD_TARGET_TYPE = "target.type";
+ }
+
+ private Class<?> fieldTargetType;
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(ConfigName.FIELD_TARGET_TYPE, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
+ "The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map");
+
+ @Override
+ public R apply(R record) {
+ final Schema schema = operatingSchema(record);
+ final Object value = operatingValue(record);
+
+ final Object convertedValue = convertValueWithCamelTypeConverter(value);
+ final Schema updatedSchema = getOrBuildRecordSchema(schema, convertedValue);
+
+ return newRecord(record, updatedSchema, convertedValue);
+ }
+
+ private Object convertValueWithCamelTypeConverter(final Object originalValue) {
+ final TypeConverter converter = getCamelContext().getTypeConverter();
+ final Object convertedValue = converter.tryConvertTo(fieldTargetType, originalValue);
+
+ if (convertedValue == null) {
+ throw new DataException(String.format("CamelTypeConverter was not able to converter value `%s` to target type of `%s`", originalValue, fieldTargetType.getSimpleName()));
+ }
+
+ return convertedValue;
+ }
+
+ private Schema getOrBuildRecordSchema(final Schema originalSchema, final Object value) {
+ final SchemaBuilder builder = SchemaUtil.copySchemaBasics(originalSchema, SchemaHelper.buildSchemaBuilderForType(value));
+
+ if (originalSchema.isOptional())
+ builder.optional();
+ if (originalSchema.defaultValue() != null)
+ builder.defaultValue(convertValueWithCamelTypeConverter(originalSchema.defaultValue()));
+
+ return builder.build();
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(Map<String, ?> props) {
+ final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+ fieldTargetType = config.getClass(ConfigName.FIELD_TARGET_TYPE);
+ }
+
+ protected abstract Schema operatingSchema(R record);
+
+ protected abstract Object operatingValue(R record);
+
+ protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+ public static final class Key<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> {
+ @Override
+ protected Schema operatingSchema(R record) {
+ return record.keySchema();
+ }
+
+ @Override
+ protected Object operatingValue(R record) {
+ return record.key();
+ }
+
+ @Override
+ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+ return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
+ }
+ }
+
+ public static final class Value<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> {
+ @Override
+ protected Schema operatingSchema(R record) {
+ return record.valueSchema();
+ }
+
+ @Override
+ protected Object operatingValue(R record) {
+ return record.value();
+ }
+
+ @Override
+ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+ return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
new file mode 100644
index 0000000..6b4960d
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -0,0 +1,59 @@
+package org.apache.camel.kafkaconnector.utils;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+public final class SchemaHelper {
+
+ /**
+ * Try to build a {@link SchemaBuilder} for a value of type {@link Object}
+ * However, this will only build the schema only for known types, in case it can not return the precise SchemaBuilder type
+ * it will return an optional {@link SchemaBuilder.BYTE}
+ * @param value to return the SchemaBuilder for
+ *
+ * @return {@link SchemaBuilder} instance
+ */
+ public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
+ if (value instanceof Byte) {
+ return SchemaBuilder.bytes();
+ }
+ if (value instanceof Short) {
+ return SchemaBuilder.int16();
+ }
+ if (value instanceof Integer) {
+ return SchemaBuilder.int32();
+ }
+ if (value instanceof Long) {
+ return SchemaBuilder.int64();
+ }
+ if (value instanceof Float) {
+ return SchemaBuilder.float32();
+ }
+ if (value instanceof Double) {
+ return SchemaBuilder.float64();
+ }
+ if (value instanceof Boolean) {
+ return SchemaBuilder.bool();
+ }
+ if (value instanceof String) {
+ return SchemaBuilder.string();
+ }
+ if (value instanceof Map) {
+ // Note: optimally we should define the schema better for map, however for now we will keep it abstract
+ return new SchemaBuilder(Schema.Type.MAP);
+ }
+ if (value instanceof Iterable) {
+ // Note: optimally we should define the schema better for Iterable, however for now we will keep it abstract
+ return new SchemaBuilder(Schema.Type.ARRAY);
+ }
+ if (value instanceof Struct) {
+ return SchemaBuilder.struct();
+ }
+
+ // if we do not fine any of schema out of the above, we just return an an optional byte schema
+ return SchemaBuilder.bytes().optional();
+ }
+}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
new file mode 100644
index 0000000..8973903
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -0,0 +1,35 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.junit.Test;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+import static org.junit.Assert.*;
+
+public class CamelTypeConverterTransformTest {
+
+ @Test
+ public void testIfItConvertsConnectRecordCorrectly() {
+ final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "TRUE");
+ final Map<String, Object> props = new HashMap<>();
+ props.put("target.type", "java.lang.Boolean");
+
+ final Transformation<SourceRecord> transformation = new CamelTypeConverterTransform.Value<>();
+
+ transformation.configure(props);
+
+ final SourceRecord transformedSourceRecord = transformation.apply(connectRecord);
+
+ assertEquals(true, transformedSourceRecord.value());
+ assertEquals(Schema.BOOLEAN_SCHEMA, transformedSourceRecord.valueSchema());
+ }
+
+}
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index fa2a06f..b62f7f6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -175,6 +175,12 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
+ <artifactId>connect-transforms</artifactId>
+ <version>${kafka.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>