You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/09 17:52:12 UTC

[camel-kafka-connector] 01/01: Add the basis for Camel Type Converter transformations

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch support-camel-type-converter
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f15b53b9beef270736c25a633decb51cc4dbec9b
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Mon Dec 9 18:51:45 2019 +0100

    Add the basis for Camel Type Converter transformations
---
 core/pom.xml                                       |   5 +
 .../transforms/CamelTransformSupport.java          |  15 +++
 .../transforms/CamelTypeConverterTransform.java    | 114 +++++++++++++++++++++
 .../camel/kafkaconnector/utils/SchemaHelper.java   |  59 +++++++++++
 .../CamelTypeConverterTransformTest.java           |  35 +++++++
 parent/pom.xml                                     |   6 ++
 6 files changed, 234 insertions(+)

diff --git a/core/pom.xml b/core/pom.xml
index fe67cf3..e803fb0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -97,6 +97,11 @@
             <artifactId>connect-api</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-transforms</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <!-- Test -->
         <dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
new file mode 100644
index 0000000..19fae92
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
@@ -0,0 +1,15 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+public abstract class CamelTransformSupport<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    private final CamelContext camelContext = new DefaultCamelContext();
+
+    protected CamelContext getCamelContext() {
+        return camelContext;
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
new file mode 100644
index 0000000..07351e7
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
@@ -0,0 +1,114 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Map;
+
+import org.apache.camel.TypeConverter;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> {
+
+    private interface ConfigName {
+        String FIELD_TARGET_TYPE = "target.type";
+    }
+
+    private Class<?> fieldTargetType;
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.FIELD_TARGET_TYPE, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
+                    "The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map");
+
+    @Override
+    public R apply(R record) {
+        final Schema schema = operatingSchema(record);
+        final Object value = operatingValue(record);
+
+        final Object convertedValue = convertValueWithCamelTypeConverter(value);
+        final Schema updatedSchema = getOrBuildRecordSchema(schema, convertedValue);
+
+        return newRecord(record, updatedSchema, convertedValue);
+    }
+
+    private Object convertValueWithCamelTypeConverter(final Object originalValue) {
+        final TypeConverter converter = getCamelContext().getTypeConverter();
+        final Object convertedValue = converter.tryConvertTo(fieldTargetType, originalValue);
+
+        if (convertedValue == null) {
+            throw new DataException(String.format("CamelTypeConverter was not able to converter value `%s` to target type of `%s`", originalValue, fieldTargetType.getSimpleName()));
+        }
+
+        return convertedValue;
+    }
+
+    private Schema getOrBuildRecordSchema(final Schema originalSchema, final Object value) {
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(originalSchema, SchemaHelper.buildSchemaBuilderForType(value));
+
+        if (originalSchema.isOptional())
+            builder.optional();
+        if (originalSchema.defaultValue() != null)
+            builder.defaultValue(convertValueWithCamelTypeConverter(originalSchema.defaultValue()));
+
+        return builder.build();
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fieldTargetType = config.getClass(ConfigName.FIELD_TARGET_TYPE);
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    public static final class Key<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+    }
+
+    public static final class Value<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
new file mode 100644
index 0000000..6b4960d
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -0,0 +1,59 @@
+package org.apache.camel.kafkaconnector.utils;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+public final class SchemaHelper {
+
+    /**
+     * Try to build a {@link SchemaBuilder} for a value of type {@link Object}
+     * However, this will only build the schema only for known types, in case it can not return the precise SchemaBuilder type
+     * it will return an optional {@link SchemaBuilder.BYTE}
+     * @param value to return the SchemaBuilder for
+     *
+     * @return {@link SchemaBuilder} instance
+     */
+    public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
+        if (value instanceof Byte) {
+            return SchemaBuilder.bytes();
+        }
+        if (value instanceof Short) {
+            return SchemaBuilder.int16();
+        }
+        if (value instanceof Integer) {
+            return SchemaBuilder.int32();
+        }
+        if (value instanceof Long) {
+            return SchemaBuilder.int64();
+        }
+        if (value instanceof Float) {
+            return SchemaBuilder.float32();
+        }
+        if (value instanceof Double) {
+            return SchemaBuilder.float64();
+        }
+        if (value instanceof Boolean) {
+            return SchemaBuilder.bool();
+        }
+        if (value instanceof String) {
+            return SchemaBuilder.string();
+        }
+        if (value instanceof Map) {
+            // Note: optimally we should define the schema better for map, however for now we will keep it abstract
+            return new SchemaBuilder(Schema.Type.MAP);
+        }
+        if (value instanceof Iterable) {
+            // Note: optimally we should define the schema better for Iterable, however for now we will keep it abstract
+            return new SchemaBuilder(Schema.Type.ARRAY);
+        }
+        if (value instanceof Struct) {
+            return SchemaBuilder.struct();
+        }
+
+        // if we do not fine any of schema out of the above, we just return an an optional byte schema
+        return SchemaBuilder.bytes().optional();
+    }
+}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
new file mode 100644
index 0000000..8973903
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -0,0 +1,35 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.junit.Test;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+import static org.junit.Assert.*;
+
+public class CamelTypeConverterTransformTest {
+
+    @Test
+    public void testIfItConvertsConnectRecordCorrectly() {
+        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "TRUE");
+        final Map<String, Object> props = new HashMap<>();
+        props.put("target.type", "java.lang.Boolean");
+
+        final Transformation<SourceRecord> transformation = new CamelTypeConverterTransform.Value<>();
+
+        transformation.configure(props);
+
+        final SourceRecord transformedSourceRecord = transformation.apply(connectRecord);
+
+        assertEquals(true, transformedSourceRecord.value());
+        assertEquals(Schema.BOOLEAN_SCHEMA, transformedSourceRecord.valueSchema());
+    }
+
+}
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index fa2a06f..b62f7f6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -175,6 +175,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-transforms</artifactId>
+                <version>${kafka.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka_2.12</artifactId>
                 <version>${kafka.version}</version>
                 <scope>test</scope>