You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/09 20:53:49 UTC

[camel-kafka-connector] branch master updated: Added PojoToSchemaAndStructTransform to infer POJO kafka connec schema and Struct.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 430b438  Added PojoToSchemaAndStructTransform to infer POJO kafka connec schema and Struct.
430b438 is described below

commit 430b43891d195346ff9e0ce05d23d3ef5d5338a1
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sun Nov 8 00:04:33 2020 +0100

    Added PojoToSchemaAndStructTransform to infer POJO kafka connec schema and Struct.
---
 core/pom.xml                                       |  17 +++
 .../transforms/PojoToSchemaAndStructTransform.java |  96 +++++++++++++++++
 .../PojoToSchemaAndStructTransformTest.java        | 117 +++++++++++++++++++++
 3 files changed, 230 insertions(+)

diff --git a/core/pom.xml b/core/pom.xml
index 843fd37..c7ea38d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -64,6 +64,18 @@
             <scope>provided</scope>
         </dependency>
 
+        <!-- Registry -->
+        <dependency>
+            <groupId>io.apicurio</groupId>
+            <artifactId>apicurio-registry-utils-converter</artifactId>
+            <version>${apicurio.registry.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-avro</artifactId>
+            <version>${jackson2-version}</version>
+        </dependency>
+
         <!-- Test -->
         <dependency>
             <groupId>org.apache.camel</groupId>
@@ -105,6 +117,11 @@
             <artifactId>camel-log</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-slack</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.junit.jupiter</groupId>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
new file mode 100644
index 0000000..d458636
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.avro.AvroFactory;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
+import io.apicurio.registry.utils.converter.avro.AvroData;
+import io.apicurio.registry.utils.converter.avro.AvroDataConfig;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> implements Transformation<R> {
+    private static final Logger LOG = LoggerFactory.getLogger(PojoToSchemaAndStructTransform.class);
+    private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
+
+    private AvroData avroData;
+
+    @Override
+    public R apply(R r) {
+        LOG.debug("Incoming record: " + r);
+
+        AvroSchemaGenerator gen = new AvroSchemaGenerator();
+
+        try {
+            MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
+        } catch (JsonMappingException e) {
+            throw new ConnectException("Error in generating POJO schema.", e);
+        }
+
+        AvroSchema schemaWrapper = gen.getGeneratedSchema();
+        org.apache.avro.Schema avroSchema = schemaWrapper.getAvroSchema();
+        LOG.debug("Generated avro schema: " + avroSchema.toString(true));
+
+        SchemaAndValue connectSchemaAndData = null;
+        try {
+            byte[] avroDataByte = MAPPER.writer(schemaWrapper).writeValueAsBytes(r.value());
+            Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
+            DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
+            GenericRecord genericAvroData = datumReader.read(null, decoder);
+
+            connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
+        } catch (IOException e) {
+            throw new ConnectException("Error in generating POJO Struct.", e);
+        }
+
+        LOG.debug("Generate kafka connect schema: " + connectSchemaAndData.schema());
+        LOG.debug("Generate kafka connect value (as Struct): " + connectSchemaAndData.value());
+        return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
+                connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
+    }
+
+    @Override
+    public ConfigDef config() {
+        return AvroDataConfig.baseConfigDef();
+    }
+
+    @Override
+    public void close() {
+        //NOOP
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.avroData = new AvroData(new AvroDataConfig(configs));
+    }
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
new file mode 100644
index 0000000..8712145
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.component.slack.helper.SlackMessage;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PojoToSchemaAndStructTransformTest {
+
+    @Test
+    public void testRecordValueConversion() {
+        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        SlackMessage sm = new SlackMessage();
+
+        SlackMessage.Attachment at1 = sm.new Attachment();
+        SlackMessage.Attachment.Field at1f1 = at1.new Field();
+        at1f1.setTitle("ciao");
+        at1f1.setShortValue(true);
+        at1.setFields(new ArrayList<SlackMessage.Attachment.Field>(Collections.singleton(at1f1)));
+        at1.setAuthorName("Andrea");
+
+        SlackMessage.Attachment at2 = sm.new Attachment();
+        at2.setColor("green");
+
+        ArrayList<SlackMessage.Attachment> attachments = new ArrayList<>();
+        attachments.add(at1);
+        attachments.add(at2);
+
+        sm.setAttachments(attachments);
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, sm);
+
+        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+        Schema transformedSchema = transformedCr.valueSchema();
+        assertEquals(Schema.Type.STRUCT, transformedSchema.type());
+        assertEquals(Schema.Type.ARRAY, transformedSchema.field("attachments").schema().type());
+        assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("attachments").schema().valueSchema().field("title").schema().type());
+        assertEquals(Struct.class, transformedCr.value().getClass());
+        Struct transformedValue = (Struct)transformedCr.value();
+        assertTrue(ArrayList.class.isAssignableFrom(transformedValue.get("attachments").getClass()));
+    }
+
+    @Test
+    public void testMapValueConversion() {
+        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        PojoWithMap pwm = new PojoWithMap();
+        pwm.addToMap("ciao", 9);
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, pwm);
+
+        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+        Schema transformedSchema = transformedCr.valueSchema();
+        assertEquals(Schema.Type.STRUCT, transformedSchema.type());
+        assertEquals(Schema.Type.MAP, transformedSchema.field("map").schema().type());
+        assertEquals(Struct.class, transformedCr.value().getClass());
+        Struct transformedValue = (Struct)transformedCr.value();
+        assertTrue(Map.class.isAssignableFrom(transformedValue.get("map").getClass()));
+    }
+
+    public class PojoWithMap {
+        private Map<String, Integer> map = new HashMap<>();
+
+        public Map<String, Integer> getMap() {
+            return map;
+        }
+
+        public void setMap(Map<String, Integer> map) {
+            this.map = map;
+        }
+
+        public void addToMap(String key, Integer value) {
+            map.put(key, value);
+        }
+    }
+}