You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/01 06:40:12 UTC

[pulsar] branch master updated: KCA: handle kafka's logical schemas (#16485)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fe68a8e872f KCA: handle kafka's logical schemas (#16485)
fe68a8e872f is described below

commit fe68a8e872ff39369d5d401c8fc68da866c9b2dd
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Aug 31 23:40:03 2022 -0700

    KCA: handle kafka's logical schemas (#16485)
---
 .../io/kafka/connect/schema/KafkaConnectData.java  |  58 ++++++
 .../connect/schema/PulsarSchemaToKafkaSchema.java  |  82 ++++++--
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 219 ++++++++++++++++-----
 3 files changed, 290 insertions(+), 69 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 671495c6df6..557cfbb9dd8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -30,9 +30,13 @@ import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 
@@ -122,6 +126,34 @@ public class KafkaConnectData {
     }
 
     public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
+        // special case for a few classes defined in org.apache.kafka.connect.data
+        // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
+        if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
+            if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                if (nativeObject instanceof java.util.Date) {
+                    return nativeObject;
+                }
+                return Timestamp.toLogical(kafkaSchema, ((Number) nativeObject).longValue());
+            } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                if (nativeObject instanceof java.util.Date) {
+                    return nativeObject;
+                }
+                return Date.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
+            } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                if (nativeObject instanceof java.util.Date) {
+                    return nativeObject;
+                }
+                return Time.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
+            } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                if (nativeObject instanceof java.math.BigDecimal) {
+                    return nativeObject;
+                }
+                return Decimal.toLogical(kafkaSchema, (byte[]) nativeObject);
+            }
+            throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
+                    + " for value " + nativeObject);
+        }
+
         if (nativeObject instanceof Number) {
             // This is needed in case
             // jackson decided to fit value into some other type internally
@@ -242,6 +274,32 @@ public class KafkaConnectData {
             return defaultOrThrow(kafkaSchema);
         }
 
+        // special case for a few classes defined in org.apache.kafka.connect.data
+        // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
+        // time/date as String not supported as the format to parse is not clear
+        // (add it as a config param?)
+        if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
+            if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                return Timestamp.toLogical(kafkaSchema, jsonNode.longValue());
+            } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                return Date.toLogical(kafkaSchema, jsonNode.intValue());
+            } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                return Time.toLogical(kafkaSchema, jsonNode.intValue());
+            } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
+                if (jsonNode.isNumber()) {
+                    return jsonNode.decimalValue();
+                }
+                try {
+                    return Decimal.toLogical(kafkaSchema, jsonNode.binaryValue());
+                } catch (IOException e) {
+                    throw new IllegalStateException("Could not convert Kafka Logical Schema " + kafkaSchema.name()
+                            + " for jsonNode " + jsonNode + " into Decimal");
+                }
+            }
+            throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
+                    + " for jsonNode " + jsonNode);
+        }
+
         switch (kafkaSchema.type()) {
             case INT8:
                 Preconditions.checkArgument(jsonNode.isNumber());
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
index 72d68610bdb..d1834e2f9dd 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ExecutionError;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.nio.charset.StandardCharsets;
@@ -29,8 +30,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
@@ -38,6 +43,7 @@ import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
 @Slf4j
 public class PulsarSchemaToKafkaSchema {
     private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
+    private static final ImmutableSet<String> kafkaLogicalSchemas;
     private static final AvroData avroData = new AvroData(1000);
     private static final Cache<byte[], Schema> schemaCache =
             CacheBuilder.newBuilder().maximumSize(10000)
@@ -56,6 +62,16 @@ public class PulsarSchemaToKafkaSchema {
                 .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
                 .put(SchemaType.DATE, Date.SCHEMA)
                 .build();
+        kafkaLogicalSchemas = ImmutableSet.<String>builder()
+                .add(Timestamp.LOGICAL_NAME)
+                .add(Date.LOGICAL_NAME)
+                .add(Time.LOGICAL_NAME)
+                .add(Decimal.LOGICAL_NAME)
+                .build();
+    }
+
+    public static boolean matchesToKafkaLogicalSchema(Schema kafkaSchema) {
+        return kafkaLogicalSchemas.contains(kafkaSchema.name());
     }
 
     // Parse json to shaded schema
@@ -67,30 +83,58 @@ public class PulsarSchemaToKafkaSchema {
     }
 
     public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
-        if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null) {
-            if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
-                return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
-            }
+        if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
+            throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
+        }
 
-            try {
-                return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
-                    if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
-                        KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
-                        return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
-                                                 getKafkaConnectSchema(kvSchema.getValueSchema()))
-                                    .build();
+        String logicalSchemaName = pulsarSchema.getSchemaInfo().getName();
+        if (kafkaLogicalSchemas.contains(logicalSchemaName)) {
+            if (Timestamp.LOGICAL_NAME.equals(logicalSchemaName)) {
+                return Timestamp.SCHEMA;
+            } else if (Date.LOGICAL_NAME.equals(logicalSchemaName)) {
+                return Date.SCHEMA;
+            } else if (Time.LOGICAL_NAME.equals(logicalSchemaName)) {
+                return Time.SCHEMA;
+            } else if (Decimal.LOGICAL_NAME.equals(logicalSchemaName)) {
+                String scaleString = null;
+                final int scale;
+                if (pulsarSchema.getSchemaInfo().getProperties() != null) {
+                    scaleString = pulsarSchema.getSchemaInfo().getProperties().get("scale");
+                }
+                if (scaleString == null) {
+                    throw new DataException("Invalid Decimal schema: scale parameter not found.");
+                } else {
+                    try {
+                        scale = Integer.parseInt(scaleString);
+                    } catch (NumberFormatException nfe) {
+                        throw new DataException("Invalid scale parameter found in Decimal schema: ", nfe);
                     }
-                    org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
-                            parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(),
-                                    StandardCharsets.UTF_8));
-                    return avroData.toConnectSchema(avroSchema);
-                });
-            } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
-                throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee);
+                }
+                return Decimal.schema(scale);
             }
+            throw new IllegalStateException("Unsupported Kafka Logical Schema " + logicalSchemaName);
+        }
+
+        if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
+            return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
         }
 
-        throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
+        try {
+            return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
+                if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+                    KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
+                    return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
+                                             getKafkaConnectSchema(kvSchema.getValueSchema()))
+                                .build();
+                }
+                org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
+                        parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(),
+                                StandardCharsets.UTF_8));
+                return avroData.toConnectSchema(avroSchema);
+            });
+        } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
+            throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee);
+        }
     }
 
     private static IllegalStateException logAndThrowOnUnsupportedSchema(
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 7661e5fc98d..4f3996c8fde 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -35,7 +36,11 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -52,9 +57,12 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.io.core.SinkContext;
@@ -71,10 +79,13 @@ import org.testng.collections.Maps;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
 import java.util.AbstractMap;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -83,6 +94,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -104,10 +116,35 @@ import static org.testng.Assert.fail;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 @Slf4j
-public class KafkaConnectSinkTest extends ProducerConsumerBase  {
+public class KafkaConnectSinkTest extends ProducerConsumerBase {
+
+    public class TestSchema implements Schema<byte[]>, Serializable {
+
+        private SchemaInfo schemaInfo;
+
+        public TestSchema(SchemaInfo schemaInfo) {
+            this.schemaInfo = schemaInfo;
+        }
+
+        @Override
+        public byte[] encode(byte[] data) {
+            return data;
+        }
+
+        @Override
+        public SchemaInfo getSchemaInfo() {
+            return schemaInfo;
+        }
+
+        @Override
+        public Schema<byte[]> clone() {
+            return null;
+        }
+    }
 
     public class ResultCaptor<T> implements Answer {
         private T result = null;
+
         public T getResult() {
             return result;
         }
@@ -119,7 +156,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         }
     }
 
-    private String offsetTopicName =  "persistent://my-property/my-ns/kafka-connect-sink-offset";
+    private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";
 
     private Path file;
     private Map<String, Object> props;
@@ -319,11 +356,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
     }
 
     private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected, String expectedSchema) throws Exception {
-        return recordSchemaTest(value, schema, "key",  "STRING", expected, expectedSchema);
+        return recordSchemaTest(value, schema, "key", "STRING", expected, expectedSchema);
     }
 
     private SinkRecord recordSchemaTest(Object value, Schema schema, Object expectedKey, String expectedKeySchema,
-                                  Object expected, String expectedSchema) throws Exception {
+                                        Object expected, String expectedSchema) throws Exception {
         props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
 
         KafkaConnectSink sink = new KafkaConnectSink();
@@ -354,7 +391,8 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
         List<String> lines = Files.readAllLines(file, StandardCharsets.US_ASCII);
         ObjectMapper om = new ObjectMapper();
-        Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){});
+        Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>() {
+        });
 
         assertEquals(expectedKey, result.get("key"));
         assertEquals(expected, result.get("value"));
@@ -398,12 +436,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
                     .map(f -> new Field(f.name(), f.pos()))
                     .collect(Collectors.toList());
 
-            return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord);
+            return new GenericAvroRecord(new byte[]{1}, avroSchema, fields, avroRecord);
         } else {
             rec = MockGenericObjectWrapper.builder()
                     .nativeObject(value)
                     .schemaType(schema != null ? schema.getSchemaInfo().getType() : null)
-                    .schemaVersion(new byte[]{ 1 }).build();
+                    .schemaVersion(new byte[]{1}).build();
         }
         return rec;
     }
@@ -421,11 +459,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
         final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
         // schema type INT32
-        obj.put("field1", (byte)10);
+        obj.put("field1", (byte) 10);
         // schema type STRING
         obj.put("field2", "test");
         // schema type INT64
-        obj.put("field3", (short)100);
+        obj.put("field3", (short) 100);
 
         final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
         Message msg = mock(MessageImpl.class);
@@ -482,17 +520,17 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
     @Test
     public void byteRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
+        SinkRecord sinkRecord = recordSchemaTest((byte) 1, Schema.INT8, 1, "INT8");
         Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
-        Assert.assertEquals(sinkRecord.value(), (byte)1);
+        Assert.assertEquals(sinkRecord.value(), (byte) 1);
     }
 
     @Test
     public void shortRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
+        SinkRecord sinkRecord = recordSchemaTest((short) 1, Schema.INT16, 1, "INT16");
         Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
-        Assert.assertEquals(sinkRecord.value(), (short)1);
+        Assert.assertEquals(sinkRecord.value(), (short) 1);
     }
 
     @Test
@@ -650,6 +688,77 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         Assert.assertEquals(key, 11);
     }
 
+    @Test
+    public void kafkaLogicalTypesTimestampTest() {
+        Schema schema = new TestSchema(new SchemaInfoImpl()
+                .setName(Timestamp.LOGICAL_NAME)
+                .setType(SchemaType.INT64)
+                .setSchema(new byte[0]));
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(schema);
+
+        java.util.Date date = getDateFromString("12/30/1999 11:12:13");
+        Object connectData = KafkaConnectData
+                .getKafkaConnectData(Timestamp.fromLogical(kafkaSchema, date), kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+    @Test
+    public void kafkaLogicalTypesTimeTest() {
+        Schema schema = new TestSchema(new SchemaInfoImpl()
+                .setName(Time.LOGICAL_NAME)
+                .setType(SchemaType.INT32)
+                .setSchema(new byte[0]));
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(schema);
+
+        java.util.Date date = getDateFromString("01/01/1970 11:12:13");
+        Object connectData = KafkaConnectData
+                .getKafkaConnectData(Time.fromLogical(kafkaSchema, date), kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+    @Test
+    public void kafkaLogicalTypesDateTest() {
+        Schema schema = new TestSchema(new SchemaInfoImpl()
+                .setName(Date.LOGICAL_NAME)
+                .setType(SchemaType.INT32)
+                .setSchema(new byte[0]));
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(schema);
+
+        java.util.Date date = getDateFromString("12/31/2022 00:00:00");
+        Object connectData = KafkaConnectData
+                .getKafkaConnectData(Date.fromLogical(kafkaSchema, date), kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+    @Test
+    public void kafkaLogicalTypesDecimalTest() {
+        Map<String, String> props = new HashMap<>();
+        props.put("scale", "10");
+        Schema schema = new TestSchema(new SchemaInfoImpl()
+                .setName(Decimal.LOGICAL_NAME)
+                .setType(SchemaType.BYTES)
+                .setProperties(props)
+                .setSchema(new byte[0]));
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(schema);
+
+        Object connectData = KafkaConnectData
+                .getKafkaConnectData(Decimal.fromLogical(kafkaSchema, BigDecimal.valueOf(100L, 10)), kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+
     @Test
     public void connectDataComplexAvroSchemaGenericRecordTest() {
         AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
@@ -744,28 +853,28 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     @Test
     public void connectDataPrimitiveArraysTest() throws Exception {
-        testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test", "test2"});
+        testPojoAsAvroAndJsonConversionToConnectData(new String[]{"test", "test2"});
 
-        testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b', 'c'});
-        testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a', 'b', 'c'});
+        testPojoAsAvroAndJsonConversionToConnectData(new char[]{'a', 'b', 'c'});
+        testPojoAsAvroAndJsonConversionToConnectData(new Character[]{'a', 'b', 'c'});
 
-        testPojoAsAvroAndJsonConversionToConnectData(new byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
-        testPojoAsAvroAndJsonConversionToConnectData(new Byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new byte[]{Byte.MIN_VALUE, Byte.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Byte[]{Byte.MIN_VALUE, Byte.MAX_VALUE});
 
-        testPojoAsAvroAndJsonConversionToConnectData(new short[] {Short.MIN_VALUE, Short.MAX_VALUE});
-        testPojoAsAvroAndJsonConversionToConnectData(new Short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new short[]{Short.MIN_VALUE, Short.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Short[]{Short.MIN_VALUE, Short.MAX_VALUE});
 
-        testPojoAsAvroAndJsonConversionToConnectData(new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
-        testPojoAsAvroAndJsonConversionToConnectData(new Integer[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new int[]{Integer.MIN_VALUE, Integer.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Integer[]{Integer.MIN_VALUE, Integer.MAX_VALUE});
 
-        testPojoAsAvroAndJsonConversionToConnectData(new long[] {Long.MIN_VALUE, Long.MAX_VALUE});
-        testPojoAsAvroAndJsonConversionToConnectData(new Long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new long[]{Long.MIN_VALUE, Long.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Long[]{Long.MIN_VALUE, Long.MAX_VALUE});
 
-        testPojoAsAvroAndJsonConversionToConnectData(new float[] {Float.MIN_VALUE, Float.MAX_VALUE});
-        testPojoAsAvroAndJsonConversionToConnectData(new Float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new float[]{Float.MIN_VALUE, Float.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Float[]{Float.MIN_VALUE, Float.MAX_VALUE});
 
-        testPojoAsAvroAndJsonConversionToConnectData(new double[] {Double.MIN_VALUE, Double.MAX_VALUE});
-        testPojoAsAvroAndJsonConversionToConnectData(new Double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new double[]{Double.MIN_VALUE, Double.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Double[]{Double.MIN_VALUE, Double.MAX_VALUE});
     }
 
     private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo) throws IOException {
@@ -853,20 +962,20 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         expectedValue.put("doubleField", 0.0d);
 
         KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
-                            getGenericRecord(value, pulsarAvroSchema));
+                getGenericRecord(value, pulsarAvroSchema));
 
         SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema),
                 expectedKey, "STRUCT", expectedValue, "STRUCT");
 
         Struct outValue = (Struct) sinkRecord.value();
-        Assert.assertEquals((int)outValue.get("field1"), 10);
-        Assert.assertEquals((String)outValue.get("field2"), "value");
-        Assert.assertEquals((long)outValue.get("field3"), 100L);
+        Assert.assertEquals((int) outValue.get("field1"), 10);
+        Assert.assertEquals((String) outValue.get("field2"), "value");
+        Assert.assertEquals((long) outValue.get("field3"), 100L);
 
         Struct outKey = (Struct) sinkRecord.key();
-        Assert.assertEquals((int)outKey.get("field1"), 11);
-        Assert.assertEquals((String)outKey.get("field2"), "key");
-        Assert.assertEquals((long)outKey.get("field3"), 101L);
+        Assert.assertEquals((int) outKey.get("field1"), 11);
+        Assert.assertEquals((String) outKey.get("field2"), "key");
+        Assert.assertEquals((long) outKey.get("field3"), 101L);
     }
 
     @Test
@@ -1186,7 +1295,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
             when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
             when(msg.hasIndex()).thenReturn(false);
 
-            final int partition = (int)(i % numPartitions);
+            final int partition = (int) (i % numPartitions);
             final AtomicInteger status = new AtomicInteger(0);
             Record<GenericObject> record = PulsarRecord.<String>builder()
                     .topicName(topicName)
@@ -1257,14 +1366,14 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
                 .setCharField('c')
                 .setStringField("some text")
 
-                .setByteArr(new byte[] {1 ,2})
-                .setShortArr(new short[] {3, 4})
-                .setIntArr(new int[] {5, 6})
-                .setLongArr(new long[] {7, 8})
-                .setFloatArr(new float[] {9.0f, 10.0f})
-                .setDoubleArr(new double[] {11.0d, 12.0d})
+                .setByteArr(new byte[]{1, 2})
+                .setShortArr(new short[]{3, 4})
+                .setIntArr(new int[]{5, 6})
+                .setLongArr(new long[]{7, 8})
+                .setFloatArr(new float[]{9.0f, 10.0f})
+                .setDoubleArr(new double[]{11.0d, 12.0d})
                 .setCharArr(new char[]{'a', 'b'})
-                .setStringArr(new String[] {"abc", "def"});
+                .setStringArr(new String[]{"abc", "def"});
     }
 
     private static GenericData.Record getStructRecord() {
@@ -1306,13 +1415,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         rec.put("doubleField", 6.1d);
         rec.put("charField", 'c');
         rec.put("stringField", "some string");
-        rec.put("byteArr", new byte[] {(byte) 1, (byte) 2});
-        rec.put("shortArr", new short[] {(short) 3, (short) 4});
-        rec.put("intArr", new int[] {5, 6});
-        rec.put("longArr", new long[] {7L, 8L});
-        rec.put("floatArr", new float[] {9.0f, 10.0f});
-        rec.put("doubleArr", new double[] {11.0d, 12.0d});
-        rec.put("charArr", new char[] {'a', 'b', 'c'});
+        rec.put("byteArr", new byte[]{(byte) 1, (byte) 2});
+        rec.put("shortArr", new short[]{(short) 3, (short) 4});
+        rec.put("intArr", new int[]{5, 6});
+        rec.put("longArr", new long[]{7L, 8L});
+        rec.put("floatArr", new float[]{9.0f, 10.0f});
+        rec.put("doubleArr", new double[]{11.0d, 12.0d});
+        rec.put("charArr", new char[]{'a', 'b', 'c'});
 
         Map<String, GenericData.Record> map = new HashMap<>();
         map.put("key1", getStructRecord());
@@ -1322,4 +1431,14 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
         return rec;
     }
+
+    @SneakyThrows
+    private java.util.Date getDateFromString(String dateInString) {
+        SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");
+        formatter.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+        java.util.Date parsedDate = formatter.parse(dateInString);
+        return parsedDate;
+    }
+
 }