You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/09 17:52:11 UTC

[camel-kafka-connector] branch support-camel-type-converter created (now f15b53b)

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a change to branch support-camel-type-converter
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at f15b53b  Add the basis for Camel Type Converter transformations

This branch includes the following new commits:

     new f15b53b  Add the basis for Camel Type Converter transformations

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/01: Add the basis for Camel Type Converter transformations

Posted by oa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch support-camel-type-converter
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f15b53b9beef270736c25a633decb51cc4dbec9b
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Mon Dec 9 18:51:45 2019 +0100

    Add the basis for Camel Type Converter transformations
---
 core/pom.xml                                       |   5 +
 .../transforms/CamelTransformSupport.java          |  15 +++
 .../transforms/CamelTypeConverterTransform.java    | 114 +++++++++++++++++++++
 .../camel/kafkaconnector/utils/SchemaHelper.java   |  59 +++++++++++
 .../CamelTypeConverterTransformTest.java           |  35 +++++++
 parent/pom.xml                                     |   6 ++
 6 files changed, 234 insertions(+)

diff --git a/core/pom.xml b/core/pom.xml
index fe67cf3..e803fb0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -97,6 +97,11 @@
             <artifactId>connect-api</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-transforms</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <!-- Test -->
         <dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
new file mode 100644
index 0000000..19fae92
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
@@ -0,0 +1,15 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+public abstract class CamelTransformSupport<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    private final CamelContext camelContext = new DefaultCamelContext();
+
+    protected CamelContext getCamelContext() {
+        return camelContext;
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
new file mode 100644
index 0000000..07351e7
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
@@ -0,0 +1,114 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Map;
+
+import org.apache.camel.TypeConverter;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> {
+
+    private interface ConfigName {
+        String FIELD_TARGET_TYPE = "target.type";
+    }
+
+    private Class<?> fieldTargetType;
+
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(ConfigName.FIELD_TARGET_TYPE, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
+                    "The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map");
+
+    @Override
+    public R apply(R record) {
+        final Schema schema = operatingSchema(record);
+        final Object value = operatingValue(record);
+
+        final Object convertedValue = convertValueWithCamelTypeConverter(value);
+        final Schema updatedSchema = getOrBuildRecordSchema(schema, convertedValue);
+
+        return newRecord(record, updatedSchema, convertedValue);
+    }
+
+    private Object convertValueWithCamelTypeConverter(final Object originalValue) {
+        final TypeConverter converter = getCamelContext().getTypeConverter();
+        final Object convertedValue = converter.tryConvertTo(fieldTargetType, originalValue);
+
+        if (convertedValue == null) {
+            throw new DataException(String.format("CamelTypeConverter was not able to converter value `%s` to target type of `%s`", originalValue, fieldTargetType.getSimpleName()));
+        }
+
+        return convertedValue;
+    }
+
+    private Schema getOrBuildRecordSchema(final Schema originalSchema, final Object value) {
+        final SchemaBuilder builder = SchemaUtil.copySchemaBasics(originalSchema, SchemaHelper.buildSchemaBuilderForType(value));
+
+        if (originalSchema.isOptional())
+            builder.optional();
+        if (originalSchema.defaultValue() != null)
+            builder.defaultValue(convertValueWithCamelTypeConverter(originalSchema.defaultValue()));
+
+        return builder.build();
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+        fieldTargetType = config.getClass(ConfigName.FIELD_TARGET_TYPE);
+    }
+
+    protected abstract Schema operatingSchema(R record);
+
+    protected abstract Object operatingValue(R record);
+
+    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+    public static final class Key<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.keySchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.key();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
+        }
+    }
+
+    public static final class Value<R extends ConnectRecord<R>> extends CamelTypeConverterTransform<R> {
+        @Override
+        protected Schema operatingSchema(R record) {
+            return record.valueSchema();
+        }
+
+        @Override
+        protected Object operatingValue(R record) {
+            return record.value();
+        }
+
+        @Override
+        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
+        }
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
new file mode 100644
index 0000000..6b4960d
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -0,0 +1,59 @@
+package org.apache.camel.kafkaconnector.utils;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+public final class SchemaHelper {
+
+    /**
+     * Try to build a {@link SchemaBuilder} for a value of type {@link Object}
+     * However, this will only build the schema only for known types, in case it can not return the precise SchemaBuilder type
+     * it will return an optional {@link SchemaBuilder.BYTE}
+     * @param value to return the SchemaBuilder for
+     *
+     * @return {@link SchemaBuilder} instance
+     */
+    public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
+        if (value instanceof Byte) {
+            return SchemaBuilder.bytes();
+        }
+        if (value instanceof Short) {
+            return SchemaBuilder.int16();
+        }
+        if (value instanceof Integer) {
+            return SchemaBuilder.int32();
+        }
+        if (value instanceof Long) {
+            return SchemaBuilder.int64();
+        }
+        if (value instanceof Float) {
+            return SchemaBuilder.float32();
+        }
+        if (value instanceof Double) {
+            return SchemaBuilder.float64();
+        }
+        if (value instanceof Boolean) {
+            return SchemaBuilder.bool();
+        }
+        if (value instanceof String) {
+            return SchemaBuilder.string();
+        }
+        if (value instanceof Map) {
+            // Note: optimally we should define the schema better for map, however for now we will keep it abstract
+            return new SchemaBuilder(Schema.Type.MAP);
+        }
+        if (value instanceof Iterable) {
+            // Note: optimally we should define the schema better for Iterable, however for now we will keep it abstract
+            return new SchemaBuilder(Schema.Type.ARRAY);
+        }
+        if (value instanceof Struct) {
+            return SchemaBuilder.struct();
+        }
+
+        // if we do not fine any of schema out of the above, we just return an an optional byte schema
+        return SchemaBuilder.bytes().optional();
+    }
+}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
new file mode 100644
index 0000000..8973903
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -0,0 +1,35 @@
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.junit.Test;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+import static org.junit.Assert.*;
+
+public class CamelTypeConverterTransformTest {
+
+    @Test
+    public void testIfItConvertsConnectRecordCorrectly() {
+        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "TRUE");
+        final Map<String, Object> props = new HashMap<>();
+        props.put("target.type", "java.lang.Boolean");
+
+        final Transformation<SourceRecord> transformation = new CamelTypeConverterTransform.Value<>();
+
+        transformation.configure(props);
+
+        final SourceRecord transformedSourceRecord = transformation.apply(connectRecord);
+
+        assertEquals(true, transformedSourceRecord.value());
+        assertEquals(Schema.BOOLEAN_SCHEMA, transformedSourceRecord.valueSchema());
+    }
+
+}
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index fa2a06f..b62f7f6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -175,6 +175,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-transforms</artifactId>
+                <version>${kafka.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka_2.12</artifactId>
                 <version>${kafka.version}</version>
                 <scope>test</scope>