You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/10 17:18:35 UTC

[camel-kafka-connector] branch support-camel-type-converter updated: Final touches for Camel type converter transform

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch support-camel-type-converter
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/support-camel-type-converter by this push:
     new f23fb46  Final touches for Camel type converter transform
f23fb46 is described below

commit f23fb463248f8c0cc9d6c9b78c3eebae0e358409
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Tue Dec 10 18:18:03 2019 +0100

    Final touches for Camel type converter transform
---
 core/pom.xml                                       |  6 ++
 .../transforms/CamelTransformSupport.java          | 16 ++++
 .../transforms/CamelTypeConverterTransform.java    | 42 +++++++--
 .../camel/kafkaconnector/utils/SchemaHelper.java   | 82 +++++++++---------
 .../CamelTypeConverterTransformTest.java           | 99 +++++++++++++++++++---
 .../kafkaconnector/utils/SchemaHelperTest.java     | 83 ++++++++++++++++++
 parent/pom.xml                                     |  6 ++
 7 files changed, 276 insertions(+), 58 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index e803fb0..1763235 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -134,6 +134,12 @@
             <artifactId>camel-hl7</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-debezium-common</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
index 19fae92..4f318b0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.kafkaconnector.transforms;
 
 import org.apache.camel.CamelContext;
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
index 07351e7..2727d61 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.kafkaconnector.transforms;
 
 import java.util.Map;
@@ -5,6 +21,7 @@ import java.util.Map;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
@@ -14,14 +31,13 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
 
 public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> {
 
-    private interface ConfigName {
-        String FIELD_TARGET_TYPE = "target.type";
-    }
+    public static final String FIELD_TARGET_TYPE_CONFIG = "target.type";
 
+    private static TypeConverter typeConverter;
     private Class<?> fieldTargetType;
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(ConfigName.FIELD_TARGET_TYPE, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
+            .define(FIELD_TARGET_TYPE_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
                     "The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map");
 
     @Override
@@ -36,8 +52,7 @@ public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> ex
     }
 
     private Object convertValueWithCamelTypeConverter(final Object originalValue) {
-        final TypeConverter converter = getCamelContext().getTypeConverter();
-        final Object convertedValue = converter.tryConvertTo(fieldTargetType, originalValue);
+        final Object convertedValue = typeConverter.tryConvertTo(fieldTargetType, originalValue);
 
         if (convertedValue == null) {
             throw new DataException(String.format("CamelTypeConverter was not able to converter value `%s` to target type of `%s`", originalValue, fieldTargetType.getSimpleName()));
@@ -49,10 +64,12 @@ public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> ex
     private Schema getOrBuildRecordSchema(final Schema originalSchema, final Object value) {
         final SchemaBuilder builder = SchemaUtil.copySchemaBasics(originalSchema, SchemaHelper.buildSchemaBuilderForType(value));
 
-        if (originalSchema.isOptional())
+        if (originalSchema.isOptional()) {
             builder.optional();
-        if (originalSchema.defaultValue() != null)
+        }
+        if (originalSchema.defaultValue() != null) {
             builder.defaultValue(convertValueWithCamelTypeConverter(originalSchema.defaultValue()));
+        }
 
         return builder.build();
     }
@@ -69,7 +86,14 @@ public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> ex
     @Override
     public void configure(Map<String, ?> props) {
         final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
-        fieldTargetType = config.getClass(ConfigName.FIELD_TARGET_TYPE);
+        fieldTargetType = config.getClass(FIELD_TARGET_TYPE_CONFIG);
+
+        if (fieldTargetType == null) {
+            throw new ConfigException("Configuration 'target.type' can not be empty!");
+        }
+
+        // initialize type converter from camel context
+        typeConverter = getCamelContext().getTypeConverter();
     }
 
     protected abstract Schema operatingSchema(R record);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
index 6b4960d..5c0c295 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -1,10 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.kafkaconnector.utils;
 
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
 
 public final class SchemaHelper {
 
@@ -17,43 +38,28 @@ public final class SchemaHelper {
      * @return {@link SchemaBuilder} instance
      */
     public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
-        if (value instanceof Byte) {
-            return SchemaBuilder.bytes();
-        }
-        if (value instanceof Short) {
-            return SchemaBuilder.int16();
-        }
-        if (value instanceof Integer) {
-            return SchemaBuilder.int32();
-        }
-        if (value instanceof Long) {
-            return SchemaBuilder.int64();
-        }
-        if (value instanceof Float) {
-            return SchemaBuilder.float32();
-        }
-        if (value instanceof Double) {
-            return SchemaBuilder.float64();
-        }
-        if (value instanceof Boolean) {
-            return SchemaBuilder.bool();
-        }
-        if (value instanceof String) {
-            return SchemaBuilder.string();
-        }
-        if (value instanceof Map) {
-            // Note: optimally we should define the schema better for map, however for now we will keep it abstract
-            return new SchemaBuilder(Schema.Type.MAP);
-        }
-        if (value instanceof Iterable) {
-            // Note: optimally we should define the schema better for Iterable, however for now we will keep it abstract
-            return new SchemaBuilder(Schema.Type.ARRAY);
-        }
-        if (value instanceof Struct) {
-            return SchemaBuilder.struct();
-        }
+        // gracefully try to infer the schema
+        final Schema knownSchema = Values.inferSchema(value);
 
-        // if we do not fine any of schema out of the above, we just return an an optional byte schema
-        return SchemaBuilder.bytes().optional();
+        if (knownSchema == null) {
+            // let's now check for other types
+            if (value instanceof Date) {
+                return org.apache.kafka.connect.data.Date.builder();
+            }
+            if (value instanceof BigDecimal) {
+                return Decimal.builder(((BigDecimal) value).scale());
+            }
+            // we re-check map and list since inferSchema function is not tolerant against map and list
+            // for now we rely on inferSchema, however it makes some point to build a specific inferSchema method only for this connector
+            if (value instanceof Map) {
+                return new SchemaBuilder(Schema.Type.MAP);
+            }
+            if (value instanceof List) {
+                return new SchemaBuilder(Schema.Type.ARRAY);
+            }
+            // if we do not fine any of schema out of the above, we just return an an optional byte schema
+            return SchemaBuilder.bytes().optional();
+        }
+        return SchemaUtil.copySchemaBasics(knownSchema);
     }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index 8973903..92badd0 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -1,35 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.kafkaconnector.transforms;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.camel.kafkaconnector.utils.SchemaHelper;
-import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.junit.Test;
-import org.apache.kafka.connect.connector.ConnectRecord;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class CamelTypeConverterTransformTest {
 
     @Test
     public void testIfItConvertsConnectRecordCorrectly() {
-        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "TRUE");
+        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", Schema.STRING_SCHEMA, "TRUE");
+
+        final Map<String, Object> propsForKeySmt = new HashMap<>();
+        propsForKeySmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, Integer.class.getName());
+
+        final Map<String, Object> propsForValueSmt = new HashMap<>();
+        propsForValueSmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, "java.lang.Boolean");
+
+        final Transformation<SourceRecord> transformationKey = new CamelTypeConverterTransform.Key<>();
+        final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+        transformationKey.configure(propsForKeySmt);
+        transformationValue.configure(propsForValueSmt);
+
+        final SourceRecord transformedKeySourceRecord = transformationKey.apply(connectRecord);
+        final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
+
+        assertEquals(1234, transformedKeySourceRecord.key());
+        assertEquals(Schema.INT32_SCHEMA, transformedKeySourceRecord.keySchema());
+
+        assertEquals(true, transformedValueSourceRecord.value());
+        assertEquals(Schema.BOOLEAN_SCHEMA, transformedValueSourceRecord.valueSchema());
+    }
+
+    @Test
+    public void testIfHandlesTypeConvertersFromCamelComponents() {
+        // we know we have a type converter from struct to map in dbz component, so we use this for testing
+        final Schema schema = SchemaBuilder.struct()
+                .field("id", Schema.INT32_SCHEMA)
+                .field("name", Schema.STRING_SCHEMA)
+                .field("valid", Schema.BOOLEAN_SCHEMA)
+                .field("extra", Schema.STRING_SCHEMA)
+                .build();
+
+        final Struct value = new Struct(schema);
+        value.put("id", 12);
+        value.put("name", "test-name");
+        value.put("valid", true);
+
+        final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", schema, value);
+
         final Map<String, Object> props = new HashMap<>();
-        props.put("target.type", "java.lang.Boolean");
+        props.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, Map.class.getName());
+
+        final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+        transformationValue.configure(props);
 
-        final Transformation<SourceRecord> transformation = new CamelTypeConverterTransform.Value<>();
+        final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
 
-        transformation.configure(props);
+        // assert
+        assertNotNull(transformedValueSourceRecord);
 
-        final SourceRecord transformedSourceRecord = transformation.apply(connectRecord);
+        final Map<String, Object> outputValue = (Map<String, Object>) transformedValueSourceRecord.value();
+
+        assertEquals(12, outputValue.get("id"));
+        assertEquals("test-name", outputValue.get("name"));
+        assertNull(outputValue.get("extra"));
+        assertTrue((boolean)outputValue.get("valid"));
+        assertEquals(Schema.Type.MAP, transformedValueSourceRecord.valueSchema().type());
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testIfItCanHandleEmptyKeyProps() {
+        final Transformation<SourceRecord> transformationKey = new CamelTypeConverterTransform.Key<>();
+
+        final Map<String, Object> props = new HashMap<>();
+        props.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, Map.class.getName());
 
-        assertEquals(true, transformedSourceRecord.value());
-        assertEquals(Schema.BOOLEAN_SCHEMA, transformedSourceRecord.valueSchema());
+        transformationKey.configure(Collections.emptyMap());
     }
 
 }
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/SchemaHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SchemaHelperTest.java
new file mode 100644
index 0000000..b1b881b
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SchemaHelperTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.utils;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SchemaHelperTest {
+
+    @Test
+    public void testBuildSchemaBuilderForAllKnownTypes() {
+        assertEquals(SchemaBuilder.STRING_SCHEMA, SchemaHelper.buildSchemaBuilderForType("test").build());
+        assertEquals(SchemaBuilder.BOOLEAN_SCHEMA, SchemaHelper.buildSchemaBuilderForType(false).build());
+        assertEquals(SchemaBuilder.INT8_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Byte("1")).build());
+        assertEquals(SchemaBuilder.INT16_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Short("1")).build());
+        assertEquals(SchemaBuilder.INT32_SCHEMA, SchemaHelper.buildSchemaBuilderForType(1).build());
+        assertEquals(SchemaBuilder.INT64_SCHEMA, SchemaHelper.buildSchemaBuilderForType(1L).build());
+        assertEquals(SchemaBuilder.FLOAT32_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Float("1")).build());
+        assertEquals(SchemaBuilder.FLOAT64_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Double("1")).build());
+        assertEquals(SchemaBuilder.BYTES_SCHEMA, SchemaHelper.buildSchemaBuilderForType("test".getBytes()).build());
+    }
+
+    @Test
+    public void testBuildSchemaBuilderForAllSpecialTypes() {
+        // test map
+        final Map<String, Object> inputMap = new LinkedHashMap<>();
+        inputMap.put("test", "value being tested");
+
+        assertEquals(Schema.Type.MAP, SchemaHelper.buildSchemaBuilderForType(inputMap).type());
+
+        // test list
+        final List<String> inputList = Arrays.asList("test1", "test2");
+        assertEquals(Schema.Type.ARRAY, SchemaHelper.buildSchemaBuilderForType(inputList).type());
+
+        // test date/time
+        final Date date = new Date();
+        assertEquals(org.apache.kafka.connect.data.Date.SCHEMA, SchemaHelper.buildSchemaBuilderForType(date).build());
+
+        // test big decimal
+        final BigDecimal bigDecimal = new BigDecimal("1");
+        assertEquals(Decimal.schema(bigDecimal.scale()), SchemaHelper.buildSchemaBuilderForType(bigDecimal).build());
+
+        // test struct
+        final Schema schema = SchemaBuilder.struct()
+                .field("id", Schema.INT32_SCHEMA)
+                .build();
+        final Struct structValue = new Struct(schema);
+        structValue.put("id", 1);
+        assertEquals(Schema.Type.STRUCT, SchemaHelper.buildSchemaBuilderForType(structValue).type());
+
+        // finally how to handle if we have no idea about the value
+        final S3ObjectInputStream s3ObjectInputStream = new S3ObjectInputStream(System.in, new HttpDelete());
+        assertEquals(Schema.Type.BYTES, SchemaHelper.buildSchemaBuilderForType(s3ObjectInputStream).type());
+     }
+}
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index b62f7f6..b719242 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -248,6 +248,12 @@
                 <version>${camel.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-debezium-common</artifactId>
+                <version>${camel.version}</version>
+                <scope>test</scope>
+            </dependency>
 
         </dependencies>
     </dependencyManagement>