You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2019/12/10 17:18:35 UTC
[camel-kafka-connector] branch support-camel-type-converter
updated: Final touches for Camel type converter transform
This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch support-camel-type-converter
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/support-camel-type-converter by this push:
new f23fb46 Final touches for Camel type converter transform
f23fb46 is described below
commit f23fb463248f8c0cc9d6c9b78c3eebae0e358409
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Tue Dec 10 18:18:03 2019 +0100
Final touches for Camel type converter transform
---
core/pom.xml | 6 ++
.../transforms/CamelTransformSupport.java | 16 ++++
.../transforms/CamelTypeConverterTransform.java | 42 +++++++--
.../camel/kafkaconnector/utils/SchemaHelper.java | 82 +++++++++---------
.../CamelTypeConverterTransformTest.java | 99 +++++++++++++++++++---
.../kafkaconnector/utils/SchemaHelperTest.java | 83 ++++++++++++++++++
parent/pom.xml | 6 ++
7 files changed, 276 insertions(+), 58 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index e803fb0..1763235 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -134,6 +134,12 @@
<artifactId>camel-hl7</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-debezium-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
index 19fae92..4f318b0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTransformSupport.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.kafkaconnector.transforms;
import org.apache.camel.CamelContext;
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
index 07351e7..2727d61 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.kafkaconnector.transforms;
import java.util.Map;
@@ -5,6 +21,7 @@ import java.util.Map;
import org.apache.camel.TypeConverter;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -14,14 +31,13 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> extends CamelTransformSupport<R> {
- private interface ConfigName {
- String FIELD_TARGET_TYPE = "target.type";
- }
+ public static final String FIELD_TARGET_TYPE_CONFIG = "target.type";
+ private static TypeConverter typeConverter;
private Class<?> fieldTargetType;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(ConfigName.FIELD_TARGET_TYPE, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
+ .define(FIELD_TARGET_TYPE_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.HIGH,
"The target field type to convert the value from, this is full qualified Java class, e.g: java.util.Map");
@Override
@@ -36,8 +52,7 @@ public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> ex
}
private Object convertValueWithCamelTypeConverter(final Object originalValue) {
- final TypeConverter converter = getCamelContext().getTypeConverter();
- final Object convertedValue = converter.tryConvertTo(fieldTargetType, originalValue);
+ final Object convertedValue = typeConverter.tryConvertTo(fieldTargetType, originalValue);
if (convertedValue == null) {
throw new DataException(String.format("CamelTypeConverter was not able to converter value `%s` to target type of `%s`", originalValue, fieldTargetType.getSimpleName()));
@@ -49,10 +64,12 @@ public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> ex
private Schema getOrBuildRecordSchema(final Schema originalSchema, final Object value) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(originalSchema, SchemaHelper.buildSchemaBuilderForType(value));
- if (originalSchema.isOptional())
+ if (originalSchema.isOptional()) {
builder.optional();
- if (originalSchema.defaultValue() != null)
+ }
+ if (originalSchema.defaultValue() != null) {
builder.defaultValue(convertValueWithCamelTypeConverter(originalSchema.defaultValue()));
+ }
return builder.build();
}
@@ -69,7 +86,14 @@ public abstract class CamelTypeConverterTransform<R extends ConnectRecord<R>> ex
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
- fieldTargetType = config.getClass(ConfigName.FIELD_TARGET_TYPE);
+ fieldTargetType = config.getClass(FIELD_TARGET_TYPE_CONFIG);
+
+ if (fieldTargetType == null) {
+ throw new ConfigException("Configuration 'target.type' can not be empty!");
+ }
+
+ // initialize type converter from camel context
+ typeConverter = getCamelContext().getTypeConverter();
}
protected abstract Schema operatingSchema(R record);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
index 6b4960d..5c0c295 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/SchemaHelper.java
@@ -1,10 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.kafkaconnector.utils;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
import java.util.Map;
+import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
public final class SchemaHelper {
@@ -17,43 +38,28 @@ public final class SchemaHelper {
* @return {@link SchemaBuilder} instance
*/
public static SchemaBuilder buildSchemaBuilderForType(final Object value) {
- if (value instanceof Byte) {
- return SchemaBuilder.bytes();
- }
- if (value instanceof Short) {
- return SchemaBuilder.int16();
- }
- if (value instanceof Integer) {
- return SchemaBuilder.int32();
- }
- if (value instanceof Long) {
- return SchemaBuilder.int64();
- }
- if (value instanceof Float) {
- return SchemaBuilder.float32();
- }
- if (value instanceof Double) {
- return SchemaBuilder.float64();
- }
- if (value instanceof Boolean) {
- return SchemaBuilder.bool();
- }
- if (value instanceof String) {
- return SchemaBuilder.string();
- }
- if (value instanceof Map) {
- // Note: optimally we should define the schema better for map, however for now we will keep it abstract
- return new SchemaBuilder(Schema.Type.MAP);
- }
- if (value instanceof Iterable) {
- // Note: optimally we should define the schema better for Iterable, however for now we will keep it abstract
- return new SchemaBuilder(Schema.Type.ARRAY);
- }
- if (value instanceof Struct) {
- return SchemaBuilder.struct();
- }
+ // gracefully try to infer the schema
+ final Schema knownSchema = Values.inferSchema(value);
- // if we do not fine any of schema out of the above, we just return an an optional byte schema
- return SchemaBuilder.bytes().optional();
+ if (knownSchema == null) {
+ // let's now check for other types
+ if (value instanceof Date) {
+ return org.apache.kafka.connect.data.Date.builder();
+ }
+ if (value instanceof BigDecimal) {
+ return Decimal.builder(((BigDecimal) value).scale());
+ }
+ // we re-check map and list since inferSchema function is not tolerant against map and list
+ // for now we rely on inferSchema, however it makes some point to build a specific inferSchema method only for this connector
+ if (value instanceof Map) {
+ return new SchemaBuilder(Schema.Type.MAP);
+ }
+ if (value instanceof List) {
+ return new SchemaBuilder(Schema.Type.ARRAY);
+ }
+ // if we do not fine any of schema out of the above, we just return an an optional byte schema
+ return SchemaBuilder.bytes().optional();
+ }
+ return SchemaUtil.copySchemaBasics(knownSchema);
}
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
index 8973903..92badd0 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransformTest.java
@@ -1,35 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.kafkaconnector.transforms;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.camel.kafkaconnector.utils.SchemaHelper;
-import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.junit.Test;
-import org.apache.kafka.connect.connector.ConnectRecord;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class CamelTypeConverterTransformTest {
@Test
public void testIfItConvertsConnectRecordCorrectly() {
- final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "TRUE");
+ final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", Schema.STRING_SCHEMA, "TRUE");
+
+ final Map<String, Object> propsForKeySmt = new HashMap<>();
+ propsForKeySmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, Integer.class.getName());
+
+ final Map<String, Object> propsForValueSmt = new HashMap<>();
+ propsForValueSmt.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, "java.lang.Boolean");
+
+ final Transformation<SourceRecord> transformationKey = new CamelTypeConverterTransform.Key<>();
+ final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+ transformationKey.configure(propsForKeySmt);
+ transformationValue.configure(propsForValueSmt);
+
+ final SourceRecord transformedKeySourceRecord = transformationKey.apply(connectRecord);
+ final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
+
+ assertEquals(1234, transformedKeySourceRecord.key());
+ assertEquals(Schema.INT32_SCHEMA, transformedKeySourceRecord.keySchema());
+
+ assertEquals(true, transformedValueSourceRecord.value());
+ assertEquals(Schema.BOOLEAN_SCHEMA, transformedValueSourceRecord.valueSchema());
+ }
+
+ @Test
+ public void testIfHandlesTypeConvertersFromCamelComponents() {
+ // we know we have a type converter from struct to map in dbz component, so we use this for testing
+ final Schema schema = SchemaBuilder.struct()
+ .field("id", Schema.INT32_SCHEMA)
+ .field("name", Schema.STRING_SCHEMA)
+ .field("valid", Schema.BOOLEAN_SCHEMA)
+ .field("extra", Schema.STRING_SCHEMA)
+ .build();
+
+ final Struct value = new Struct(schema);
+ value.put("id", 12);
+ value.put("name", "test-name");
+ value.put("valid", true);
+
+ final SourceRecord connectRecord = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "topic", Schema.STRING_SCHEMA, "1234", schema, value);
+
final Map<String, Object> props = new HashMap<>();
- props.put("target.type", "java.lang.Boolean");
+ props.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, Map.class.getName());
+
+ final Transformation<SourceRecord> transformationValue = new CamelTypeConverterTransform.Value<>();
+
+ transformationValue.configure(props);
- final Transformation<SourceRecord> transformation = new CamelTypeConverterTransform.Value<>();
+ final SourceRecord transformedValueSourceRecord = transformationValue.apply(connectRecord);
- transformation.configure(props);
+ // assert
+ assertNotNull(transformedValueSourceRecord);
- final SourceRecord transformedSourceRecord = transformation.apply(connectRecord);
+ final Map<String, Object> outputValue = (Map<String, Object>) transformedValueSourceRecord.value();
+
+ assertEquals(12, outputValue.get("id"));
+ assertEquals("test-name", outputValue.get("name"));
+ assertNull(outputValue.get("extra"));
+ assertTrue((boolean)outputValue.get("valid"));
+ assertEquals(Schema.Type.MAP, transformedValueSourceRecord.valueSchema().type());
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testIfItCanHandleEmptyKeyProps() {
+ final Transformation<SourceRecord> transformationKey = new CamelTypeConverterTransform.Key<>();
+
+ final Map<String, Object> props = new HashMap<>();
+ props.put(CamelTypeConverterTransform.FIELD_TARGET_TYPE_CONFIG, Map.class.getName());
- assertEquals(true, transformedSourceRecord.value());
- assertEquals(Schema.BOOLEAN_SCHEMA, transformedSourceRecord.valueSchema());
+ transformationKey.configure(Collections.emptyMap());
}
}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/SchemaHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SchemaHelperTest.java
new file mode 100644
index 0000000..b1b881b
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SchemaHelperTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.utils;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SchemaHelperTest {
+
+ @Test
+ public void testBuildSchemaBuilderForAllKnownTypes() {
+ assertEquals(SchemaBuilder.STRING_SCHEMA, SchemaHelper.buildSchemaBuilderForType("test").build());
+ assertEquals(SchemaBuilder.BOOLEAN_SCHEMA, SchemaHelper.buildSchemaBuilderForType(false).build());
+ assertEquals(SchemaBuilder.INT8_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Byte("1")).build());
+ assertEquals(SchemaBuilder.INT16_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Short("1")).build());
+ assertEquals(SchemaBuilder.INT32_SCHEMA, SchemaHelper.buildSchemaBuilderForType(1).build());
+ assertEquals(SchemaBuilder.INT64_SCHEMA, SchemaHelper.buildSchemaBuilderForType(1L).build());
+ assertEquals(SchemaBuilder.FLOAT32_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Float("1")).build());
+ assertEquals(SchemaBuilder.FLOAT64_SCHEMA, SchemaHelper.buildSchemaBuilderForType(new Double("1")).build());
+ assertEquals(SchemaBuilder.BYTES_SCHEMA, SchemaHelper.buildSchemaBuilderForType("test".getBytes()).build());
+ }
+
+ @Test
+ public void testBuildSchemaBuilderForAllSpecialTypes() {
+ // test map
+ final Map<String, Object> inputMap = new LinkedHashMap<>();
+ inputMap.put("test", "value being tested");
+
+ assertEquals(Schema.Type.MAP, SchemaHelper.buildSchemaBuilderForType(inputMap).type());
+
+ // test list
+ final List<String> inputList = Arrays.asList("test1", "test2");
+ assertEquals(Schema.Type.ARRAY, SchemaHelper.buildSchemaBuilderForType(inputList).type());
+
+ // test date/time
+ final Date date = new Date();
+ assertEquals(org.apache.kafka.connect.data.Date.SCHEMA, SchemaHelper.buildSchemaBuilderForType(date).build());
+
+ // test big decimal
+ final BigDecimal bigDecimal = new BigDecimal("1");
+ assertEquals(Decimal.schema(bigDecimal.scale()), SchemaHelper.buildSchemaBuilderForType(bigDecimal).build());
+
+ // test struct
+ final Schema schema = SchemaBuilder.struct()
+ .field("id", Schema.INT32_SCHEMA)
+ .build();
+ final Struct structValue = new Struct(schema);
+ structValue.put("id", 1);
+ assertEquals(Schema.Type.STRUCT, SchemaHelper.buildSchemaBuilderForType(structValue).type());
+
+ // finally how to handle if we have no idea about the value
+ final S3ObjectInputStream s3ObjectInputStream = new S3ObjectInputStream(System.in, new HttpDelete());
+ assertEquals(Schema.Type.BYTES, SchemaHelper.buildSchemaBuilderForType(s3ObjectInputStream).type());
+ }
+}
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index b62f7f6..b719242 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -248,6 +248,12 @@
<version>${camel.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-debezium-common</artifactId>
+ <version>${camel.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>