You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/23 07:40:35 UTC

[camel-kafka-connector] branch master updated (2384af5 -> d99e233)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 2384af5  Updated CHANGELOG.md
     new 58e0d6e  related to #715: renamed PojoToSchemaAndStructTransform to SourcePojoToSchemaAndStructTransform
     new d99e233  fixed #715: Create the sink counterpart of the PojoToSchemaAndStructTransform.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../SinkPojoToSchemaAndStructTransform.java        | 118 ++++++++++
 ...a => SourcePojoToSchemaAndStructTransform.java} |   6 +-
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |   2 +-
 .../kafkaconnector/transforms/PojoWithMap.java     |  21 +-
 .../SinkPojoToSchemaAndStructTransformTest.java    | 129 ++++++++++
 .../kafkaconnector/transforms/SlackMessage.java    | 259 +++++++++++++++++++++
 ... SourcePojoToSchemaAndStructTransformTest.java} |  59 ++---
 7 files changed, 542 insertions(+), 52 deletions(-)
 create mode 100644 core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java
 rename core/src/main/java/org/apache/camel/kafkaconnector/transforms/{PojoToSchemaAndStructTransform.java => SourcePojoToSchemaAndStructTransform.java} (96%)
 copy tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/services/RemoteMongoDBService.java => core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java (67%)
 create mode 100644 core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java
 create mode 100644 core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java
 rename core/src/test/java/org/apache/camel/kafkaconnector/transforms/{PojoToSchemaAndStructTransformTest.java => SourcePojoToSchemaAndStructTransformTest.java} (73%)


[camel-kafka-connector] 01/02: related to #715: renamed PojoToSchemaAndStructTransform to SourcePojoToSchemaAndStructTransform

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 58e0d6ed4f7f551c87da07116bccdb41ffbace00
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Tue Nov 17 17:13:35 2020 +0100

    related to #715: renamed PojoToSchemaAndStructTransform to SourcePojoToSchemaAndStructTransform
---
 ...a => SourcePojoToSchemaAndStructTransform.java} |  4 +--
 ... SourcePojoToSchemaAndStructTransformTest.java} | 42 +++++++++++-----------
 2 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
similarity index 96%
rename from core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
rename to core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
index 1ecf48f..6f0f850 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
@@ -43,8 +43,8 @@ import org.apache.kafka.connect.transforms.Transformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> implements Transformation<R> {
-    private static final Logger LOG = LoggerFactory.getLogger(PojoToSchemaAndStructTransform.class);
+public class SourcePojoToSchemaAndStructTransform<R extends ConnectRecord<R>> implements Transformation<R> {
+    private static final Logger LOG = LoggerFactory.getLogger(SourcePojoToSchemaAndStructTransform.class);
     private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
 
     private AvroData avroData;
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
similarity index 76%
rename from core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
rename to core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
index e378eab..529c45f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
@@ -34,12 +34,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class PojoToSchemaAndStructTransformTest {
+public class SourcePojoToSchemaAndStructTransformTest {
 
     @Test
     public void testRecordValueConversion() {
-        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
-        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+        SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
 
         SlackMessage sm = new SlackMessage();
 
@@ -63,7 +63,7 @@ public class PojoToSchemaAndStructTransformTest {
                 Schema.STRING_SCHEMA, "testKeyValue",
                 Schema.BYTES_SCHEMA, sm);
 
-        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+        ConnectRecord transformedCr = sourcePojoToSchemaAndStructTransform.apply(cr);
 
         assertEquals("testTopic", transformedCr.topic());
         assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
@@ -85,8 +85,8 @@ public class PojoToSchemaAndStructTransformTest {
 
     @Test
     public void testMapValueConversion() {
-        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
-        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+        SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
 
         PojoWithMap pwm = new PojoWithMap();
         pwm.addToMap("ciao", 9);
@@ -95,7 +95,7 @@ public class PojoToSchemaAndStructTransformTest {
                 Schema.STRING_SCHEMA, "testKeyValue",
                 Schema.BYTES_SCHEMA, pwm);
 
-        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+        ConnectRecord transformedCr = sourcePojoToSchemaAndStructTransform.apply(cr);
 
         assertEquals("testTopic", transformedCr.topic());
         assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
@@ -113,8 +113,8 @@ public class PojoToSchemaAndStructTransformTest {
 
     @Test()
     public void testNotPojoConversion() {
-        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
-        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+        SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
 
         Map map = Collections.singletonMap("ciao", 9);
 
@@ -123,27 +123,27 @@ public class PojoToSchemaAndStructTransformTest {
                 Schema.BYTES_SCHEMA, map);
 
         assertThrows(ConnectException.class, () -> {
-            pojoToSchemaAndStructTransform.apply(cr);
+            sourcePojoToSchemaAndStructTransform.apply(cr);
         });
     }
 
     @Test()
     public void testNullValueConversion() {
-        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
-        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+        SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
 
         ConnectRecord cr = new SourceRecord(null, null, "testTopic",
                 Schema.STRING_SCHEMA, "testKeyValue",
                 Schema.BYTES_SCHEMA, null);
 
-        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+        ConnectRecord transformedCr = sourcePojoToSchemaAndStructTransform.apply(cr);
         assertEquals(cr, transformedCr);
     }
 
     @Test()
     public void testConversionCache() {
-        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
-        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+        SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
 
         PojoWithMap pwm = new PojoWithMap();
         pwm.addToMap("ciao", 9);
@@ -152,12 +152,12 @@ public class PojoToSchemaAndStructTransformTest {
                 Schema.STRING_SCHEMA, "testKeyValue",
                 Schema.BYTES_SCHEMA, pwm);
 
-        assertEquals(0, pojoToSchemaAndStructTransform.getCache().keySet().size());
-        pojoToSchemaAndStructTransform.apply(cr);
-        assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size());
-        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
-        assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size());
-        assertTrue(pojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName()));
+        assertEquals(0, sourcePojoToSchemaAndStructTransform.getCache().keySet().size());
+        sourcePojoToSchemaAndStructTransform.apply(cr);
+        assertEquals(1, sourcePojoToSchemaAndStructTransform.getCache().keySet().size());
+        ConnectRecord transformedCr = sourcePojoToSchemaAndStructTransform.apply(cr);
+        assertEquals(1, sourcePojoToSchemaAndStructTransform.getCache().keySet().size());
+        assertTrue(sourcePojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName()));
     }
 
     private void atLeastOneFieldWithGivenValueExists(List structs, String fieldName, String fieldExpectedValue) {


[camel-kafka-connector] 02/02: fixed #715: Create the sink counterpart of the PojoToSchemaAndStructTransform.

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d99e233e2ab466ae3c776e7c6dfbfcbf184da26a
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Sun Nov 22 23:27:44 2020 +0100

    fixed #715: Create the sink counterpart of the PojoToSchemaAndStructTransform.
---
 .../SinkPojoToSchemaAndStructTransform.java        | 118 ++++++++++
 .../SourcePojoToSchemaAndStructTransform.java      |   2 +-
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |   2 +-
 .../kafkaconnector/transforms/PojoWithMap.java     |  36 +++
 .../SinkPojoToSchemaAndStructTransformTest.java    | 129 ++++++++++
 .../kafkaconnector/transforms/SlackMessage.java    | 259 +++++++++++++++++++++
 .../SourcePojoToSchemaAndStructTransformTest.java  |  19 +-
 7 files changed, 545 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java
new file mode 100644
index 0000000..30ef9ca
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.dataformat.avro.AvroFactory;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import io.apicurio.registry.utils.converter.avro.AvroData;
+import io.apicurio.registry.utils.converter.avro.AvroDataConfig;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class SinkPojoToSchemaAndStructTransform<R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY = "camel.transformer.sink.pojo.class";
+
+    private static final Logger LOG = LoggerFactory.getLogger(SinkPojoToSchemaAndStructTransform.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
+    private static final String CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC = "Full qualified class name of the pojo you want your record value converted to";
+    private static final Object CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT = ConfigDef.NO_DEFAULT_VALUE;
+    private static final ConfigDef CONFIG_DEF = (new ConfigDef()).define(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, ConfigDef.Type.STRING, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DEFAULT, ConfigDef.Importance.HIGH, CAMEL_TRANSFORMER_SINK_POJO_CLASS_DOC);
+
+    private String pojoClass;
+    private ObjectReader objectReader;
+    private AvroData avroData;
+
+
+    @Override
+    public R apply(R r) {
+        LOG.debug("Incoming record: {}", r);
+
+        if (r.value() != null && r.valueSchema() != null && Schema.Type.STRUCT.equals(r.valueSchema().type())) {
+            GenericRecord avroGenericRecord = (GenericRecord)avroData.fromConnectData(r.valueSchema(), r.value());
+
+            LOG.debug("GenericRecord created: {} \nwith schema: {}", avroGenericRecord, avroGenericRecord == null ? "null" : avroGenericRecord.getClass().getName());
+
+            GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroGenericRecord.getSchema());
+
+            Object pojo;
+            try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+                writer.write(avroGenericRecord, encoder);
+                encoder.flush();
+
+                byte[] avroData = out.toByteArray();
+                out.close();
+                pojo = objectReader
+                        .with(new AvroSchema(avroGenericRecord.getSchema()))
+                        .readValue(avroData);
+                LOG.debug("Pojo of class {} created: {}", pojo.getClass(), pojo);
+            } catch (IOException e) {
+                throw new ConnectException("Error in generating POJO from Struct.", e);
+            }
+
+            LOG.debug("Generate pojo: {}", pojo);
+            return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
+                    null, pojo, r.timestamp());
+        } else {
+            LOG.debug("Incoming record with a null value or a value schema != Schema.Type.STRUCT, nothing to be done.");
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+        //NOOP
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
+        this.pojoClass = config.getString(CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY);
+
+        this.avroData = new AvroData(new AvroDataConfig(configs));
+
+        try {
+            this.objectReader = MAPPER.readerFor(Class.forName(pojoClass));
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException("Unable to initialize SinkPojoToSchemaAndStructTransform ", e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
index 6f0f850..128de09 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java
@@ -55,7 +55,7 @@ public class SourcePojoToSchemaAndStructTransform<R extends ConnectRecord<R>> im
         LOG.debug("Incoming record: {}", r);
 
         if (r.value() != null) {
-            String recordClassCanonicalName = r.value().getClass().getCanonicalName();
+            String recordClassCanonicalName = r.value().getClass().getName();
             CacheEntry cacheEntry = avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new Function<String, CacheEntry>() {
                 @Override
                 public CacheEntry apply(String s) {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index c6fa7eb..49bf878 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -295,7 +295,7 @@ public class CamelSourceTaskTest {
         assertEquals(1, results.size());
         Header bigDecimalHeader = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + "bigdecimal").next();
         assertEquals("[B", bigDecimalHeader.value().getClass().getName());
-        assertEquals(Decimal.class.getCanonicalName(), bigDecimalHeader.schema().name());
+        assertEquals(Decimal.class.getName(), bigDecimalHeader.schema().name());
         assertEquals(Schema.Type.BYTES, bigDecimalHeader.schema().type());
 
         sourceTask.stop();
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java
new file mode 100644
index 0000000..9511a55
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoWithMap.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoWithMap {
+    private Map<String, Integer> map = new HashMap<>();
+
+    public Map<String, Integer> getMap() {
+        return map;
+    }
+
+    public void setMap(Map<String, Integer> map) {
+        this.map = map;
+    }
+
+    public void addToMap(String key, Integer value) {
+        map.put(key, value);
+    }
+}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java
new file mode 100644
index 0000000..5f1655a
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransformTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.transforms.SlackMessage.Attachment;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SinkPojoToSchemaAndStructTransformTest {
+
+    @Test
+    public void testRecordValueConversion() {
+        SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        SlackMessage sm = new SlackMessage();
+
+        Attachment at1 = new Attachment();
+        Attachment.Field at1f1 = new Attachment.Field();
+        at1f1.setTitle("ciao");
+        at1f1.setShortValue(true);
+        at1.setFields(new ArrayList<Attachment.Field>(Collections.singleton(at1f1)));
+        at1.setAuthorName("Andrea");
+
+        Attachment at2 = new Attachment();
+        at2.setColor("green");
+
+        ArrayList<Attachment> attachments = new ArrayList<>();
+        attachments.add(at1);
+        attachments.add(at2);
+
+        sm.setText("text");
+        sm.setAttachments(attachments);
+
+        ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(
+                new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, sm));
+
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform();
+        sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, SlackMessage.class.getName()));
+
+        ConnectRecord transformedCr = sinkPojoToSchemaAndStructTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+        assertEquals(SlackMessage.class.getName(), transformedCr.value().getClass().getName());
+        SlackMessage transformedSM = (SlackMessage)transformedCr.value();
+        assertEquals(sm.getText(), transformedSM.getText());
+        assertEquals(sm.getAttachments().size(), transformedSM.getAttachments().size());
+    }
+
+    @Test
+    public void testMapValueConversion() {
+        SourcePojoToSchemaAndStructTransform sourcePojoToSchemaAndStructTransform = new SourcePojoToSchemaAndStructTransform();
+        sourcePojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        PojoWithMap pwm = new PojoWithMap();
+        pwm.addToMap("ciao", 9);
+
+        ConnectRecord cr = sourcePojoToSchemaAndStructTransform.apply(new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, pwm));
+
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform();
+        sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, PojoWithMap.class.getName()));
+
+        ConnectRecord transformedCr = sinkPojoToSchemaAndStructTransform.apply(cr);
+
+        assertEquals("testTopic", transformedCr.topic());
+        assertEquals(Schema.STRING_SCHEMA, transformedCr.keySchema());
+        assertEquals("testKeyValue", transformedCr.key());
+
+        assertEquals(PojoWithMap.class.getName(), transformedCr.value().getClass().getName());
+        PojoWithMap transformedPWM = (PojoWithMap)transformedCr.value();
+        assertEquals(pwm.getMap().size(), transformedPWM.getMap().size());
+        assertEquals(pwm.getMap().keySet(), transformedPWM.getMap().keySet());
+    }
+
+    @Test()
+    public void testNotStructSchemaConversion() {
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform();
+        sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, PojoWithMap.class.getName()));
+
+        Map map = Collections.singletonMap("ciao", 9);
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                null, map);
+
+        sinkPojoToSchemaAndStructTransform.apply(cr);
+    }
+
+    @Test()
+    public void testNullValueConversion() {
+        SinkPojoToSchemaAndStructTransform sinkPojoToSchemaAndStructTransform = new SinkPojoToSchemaAndStructTransform();
+        sinkPojoToSchemaAndStructTransform.configure(Collections.singletonMap(SinkPojoToSchemaAndStructTransform.CAMEL_TRANSFORMER_SINK_POJO_CLASS_PROPERTY, PojoWithMap.class.getName()));
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, null);
+
+        ConnectRecord transformedCr = sinkPojoToSchemaAndStructTransform.apply(cr);
+        assertEquals(cr, transformedCr);
+    }
+}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java
new file mode 100644
index 0000000..24fa125
--- /dev/null
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SlackMessage.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.transforms;
+
+import java.util.List;
+
+//XXX: this class can be removed and tests updated accordingly after Camel updated to 3.7.0
+public class SlackMessage {
+
+    private String text;
+    private String channel;
+    private String username;
+    private String user;
+    private String iconUrl;
+    private String iconEmoji;
+    private List<Attachment> attachments;
+
+    public String getText() {
+        return text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+    }
+
+    public String getChannel() {
+        return channel;
+    }
+
+    public void setChannel(String channel) {
+        this.channel = channel;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getIconUrl() {
+        return iconUrl;
+    }
+
+    public void setIconUrl(String iconUrl) {
+        this.iconUrl = iconUrl;
+    }
+
+    public String getIconEmoji() {
+        return iconEmoji;
+    }
+
+    public void setIconEmoji(String iconEmoji) {
+        this.iconEmoji = iconEmoji;
+    }
+
+    public List<Attachment> getAttachments() {
+        return attachments;
+    }
+
+    public void setAttachments(List<Attachment> attachments) {
+        this.attachments = attachments;
+    }
+
+    public static class Attachment {
+
+        private String fallback;
+        private String color;
+        private String pretext;
+        private String authorName;
+        private String authorLink;
+        private String authorIcon;
+        private String title;
+        private String titleLink;
+        private String text;
+        private String imageUrl;
+        private String thumbUrl;
+        private String footer;
+        private String footerIcon;
+        private Long ts;
+        private List<Attachment.Field> fields;
+
+        public String getFallback() {
+            return fallback;
+        }
+
+        public void setFallback(String fallback) {
+            this.fallback = fallback;
+        }
+
+        public String getColor() {
+            return color;
+        }
+
+        public void setColor(String color) {
+            this.color = color;
+        }
+
+        public String getPretext() {
+            return pretext;
+        }
+
+        public void setPretext(String pretext) {
+            this.pretext = pretext;
+        }
+
+        public String getAuthorName() {
+            return authorName;
+        }
+
+        public void setAuthorName(String authorName) {
+            this.authorName = authorName;
+        }
+
+        public String getAuthorLink() {
+            return authorLink;
+        }
+
+        public void setAuthorLink(String authorLink) {
+            this.authorLink = authorLink;
+        }
+
+        public String getAuthorIcon() {
+            return authorIcon;
+        }
+
+        public void setAuthorIcon(String authorIcon) {
+            this.authorIcon = authorIcon;
+        }
+
+        public String getTitle() {
+            return title;
+        }
+
+        public void setTitle(String title) {
+            this.title = title;
+        }
+
+        public String getTitleLink() {
+            return titleLink;
+        }
+
+        public void setTitleLink(String titleLink) {
+            this.titleLink = titleLink;
+        }
+
+        public String getText() {
+            return text;
+        }
+
+        public void setText(String text) {
+            this.text = text;
+        }
+
+        public String getImageUrl() {
+            return imageUrl;
+        }
+
+        public void setImageUrl(String imageUrl) {
+            this.imageUrl = imageUrl;
+        }
+
+        public String getThumbUrl() {
+            return thumbUrl;
+        }
+
+        public void setThumbUrl(String thumbUrl) {
+            this.thumbUrl = thumbUrl;
+        }
+
+        public String getFooter() {
+            return footer;
+        }
+
+        public void setFooter(String footer) {
+            this.footer = footer;
+        }
+
+        public String getFooterIcon() {
+            return footerIcon;
+        }
+
+        public void setFooterIcon(String footerIcon) {
+            this.footerIcon = footerIcon;
+        }
+
+        public Long getTs() {
+            return ts;
+        }
+
+        public void setTs(Long ts) {
+            this.ts = ts;
+        }
+
+        public List<Attachment.Field> getFields() {
+            return fields;
+        }
+
+        public void setFields(List<Attachment.Field> fields) {
+            this.fields = fields;
+        }
+
+        public static class Field {
+
+            private String title;
+            private String value;
+            private Boolean shortValue;
+
+            public String getTitle() {
+                return title;
+            }
+
+            public void setTitle(String title) {
+                this.title = title;
+            }
+
+            public String getValue() {
+                return value;
+            }
+
+            public void setValue(String value) {
+                this.value = value;
+            }
+
+            public Boolean isShortValue() {
+                return shortValue;
+            }
+
+            public void setShortValue(Boolean shortValue) {
+                this.shortValue = shortValue;
+            }
+        }
+    }
+
+}
+
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
index 529c45f..dfe1ed6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransformTest.java
@@ -18,7 +18,6 @@ package org.apache.camel.kafkaconnector.transforms;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -157,7 +156,7 @@ public class SourcePojoToSchemaAndStructTransformTest {
         assertEquals(1, sourcePojoToSchemaAndStructTransform.getCache().keySet().size());
         ConnectRecord transformedCr = sourcePojoToSchemaAndStructTransform.apply(cr);
         assertEquals(1, sourcePojoToSchemaAndStructTransform.getCache().keySet().size());
-        assertTrue(sourcePojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName()));
+        assertTrue(sourcePojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getName()));
     }
 
     private void atLeastOneFieldWithGivenValueExists(List structs, String fieldName, String fieldExpectedValue) {
@@ -167,20 +166,4 @@ public class SourcePojoToSchemaAndStructTransformTest {
             struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName))
         );
     }
-
-    public class PojoWithMap {
-        private Map<String, Integer> map = new HashMap<>();
-
-        public Map<String, Integer> getMap() {
-            return map;
-        }
-
-        public void setMap(Map<String, Integer> map) {
-            this.map = map;
-        }
-
-        public void addToMap(String key, Integer value) {
-            map.put(key, value);
-        }
-    }
 }