You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/09 20:53:49 UTC
[camel-kafka-connector] branch master updated: Added
PojoToSchemaAndStructTransform to infer POJO kafka connec schema and
Struct.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 430b438 Added PojoToSchemaAndStructTransform to infer POJO kafka connec schema and Struct.
430b438 is described below
commit 430b43891d195346ff9e0ce05d23d3ef5d5338a1
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sun Nov 8 00:04:33 2020 +0100
Added PojoToSchemaAndStructTransform to infer POJO kafka connec schema and Struct.
---
core/pom.xml | 17 +++
.../transforms/PojoToSchemaAndStructTransform.java | 96 +++++++++++++++++
.../PojoToSchemaAndStructTransformTest.java | 117 +++++++++++++++++++++
3 files changed, 230 insertions(+)
diff --git a/core/pom.xml b/core/pom.xml
index 843fd37..c7ea38d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -64,6 +64,18 @@
<scope>provided</scope>
</dependency>
+ <!-- Registry -->
+ <dependency>
+ <groupId>io.apicurio</groupId>
+ <artifactId>apicurio-registry-utils-converter</artifactId>
+ <version>${apicurio.registry.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-avro</artifactId>
+ <version>${jackson2-version}</version>
+ </dependency>
+
<!-- Test -->
<dependency>
<groupId>org.apache.camel</groupId>
@@ -105,6 +117,11 @@
<artifactId>camel-log</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-slack</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
new file mode 100644
index 0000000..d458636
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.avro.AvroFactory;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
+import io.apicurio.registry.utils.converter.avro.AvroData;
+import io.apicurio.registry.utils.converter.avro.AvroDataConfig;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> implements Transformation<R> {
+ private static final Logger LOG = LoggerFactory.getLogger(PojoToSchemaAndStructTransform.class);
+ private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
+
+ private AvroData avroData;
+
+ @Override
+ public R apply(R r) {
+ LOG.debug("Incoming record: " + r);
+
+ AvroSchemaGenerator gen = new AvroSchemaGenerator();
+
+ try {
+ MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
+ } catch (JsonMappingException e) {
+ throw new ConnectException("Error in generating POJO schema.", e);
+ }
+
+ AvroSchema schemaWrapper = gen.getGeneratedSchema();
+ org.apache.avro.Schema avroSchema = schemaWrapper.getAvroSchema();
+ LOG.debug("Generated avro schema: " + avroSchema.toString(true));
+
+ SchemaAndValue connectSchemaAndData = null;
+ try {
+ byte[] avroDataByte = MAPPER.writer(schemaWrapper).writeValueAsBytes(r.value());
+ Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
+ GenericRecord genericAvroData = datumReader.read(null, decoder);
+
+ connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
+ } catch (IOException e) {
+ throw new ConnectException("Error in generating POJO Struct.", e);
+ }
+
+ LOG.debug("Generate kafka connect schema: " + connectSchemaAndData.schema());
+ LOG.debug("Generate kafka connect value (as Struct): " + connectSchemaAndData.value());
+ return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
+ connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
+ }
+
+ @Override
+ public ConfigDef config() {
+ return AvroDataConfig.baseConfigDef();
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ this.avroData = new AvroData(new AvroDataConfig(configs));
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
new file mode 100644
index 0000000..8712145
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.component.slack.helper.SlackMessage;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PojoToSchemaAndStructTransformTest {
+
+ @Test
+ public void testRecordValueConversion() {
+ PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+ pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+ SlackMessage sm = new SlackMessage();
+
+ SlackMessage.Attachment at1 = sm.new Attachment();
+ SlackMessage.Attachment.Field at1f1 = at1.new Field();
+ at1f1.setTitle("ciao");
+ at1f1.setShortValue(true);
+ at1.setFields(new ArrayList<SlackMessage.Attachment.Field>(Collections.singleton(at1f1)));
+ at1.setAuthorName("Andrea");
+
+ SlackMessage.Attachment at2 = sm.new Attachment();
+ at2.setColor("green");
+
+ ArrayList<SlackMessage.Attachment> attachments = new ArrayList<>();
+ attachments.add(at1);
+ attachments.add(at2);
+
+ sm.setAttachments(attachments);
+
+ ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+ Schema.STRING_SCHEMA, "testKeyValue",
+ Schema.BYTES_SCHEMA, sm);
+
+ ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+
+ assertEquals("testTopic", transformedCr.topic());
+ assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+ assertEquals("testKeyValue", transformedCr.key());
+ Schema transformedSchema = transformedCr.valueSchema();
+ assertEquals(Schema.Type.STRUCT, transformedSchema.type());
+ assertEquals(Schema.Type.ARRAY, transformedSchema.field("attachments").schema().type());
+ assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("attachments").schema().valueSchema().field("title").schema().type());
+ assertEquals(Struct.class, transformedCr.value().getClass());
+ Struct transformedValue = (Struct)transformedCr.value();
+ assertTrue(ArrayList.class.isAssignableFrom(transformedValue.get("attachments").getClass()));
+ }
+
+ @Test
+ public void testMapValueConversion() {
+ PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+ pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+ PojoWithMap pwm = new PojoWithMap();
+ pwm.addToMap("ciao", 9);
+
+ ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+ Schema.STRING_SCHEMA, "testKeyValue",
+ Schema.BYTES_SCHEMA, pwm);
+
+ ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+
+ assertEquals("testTopic", transformedCr.topic());
+ assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+ assertEquals("testKeyValue", transformedCr.key());
+ Schema transformedSchema = transformedCr.valueSchema();
+ assertEquals(Schema.Type.STRUCT, transformedSchema.type());
+ assertEquals(Schema.Type.MAP, transformedSchema.field("map").schema().type());
+ assertEquals(Struct.class, transformedCr.value().getClass());
+ Struct transformedValue = (Struct)transformedCr.value();
+ assertTrue(Map.class.isAssignableFrom(transformedValue.get("map").getClass()));
+ }
+
+ public class PojoWithMap {
+ private Map<String, Integer> map = new HashMap<>();
+
+ public Map<String, Integer> getMap() {
+ return map;
+ }
+
+ public void setMap(Map<String, Integer> map) {
+ this.map = map;
+ }
+
+ public void addToMap(String key, Integer value) {
+ map.put(key, value);
+ }
+ }
+}