You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 07:15:49 UTC

[pulsar] 03/04: Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe570032a0aa8fe6f79fc753bd7a8266765c1461
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Jun 8 23:49:58 2022 -0700

    Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)
    
    (cherry picked from commit 9e7aa0f43add7be841cf6b1791ec34c2ced43f1f)
---
 .../io/kafka/connect/schema/KafkaConnectData.java  | 148 +++++++---
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 299 +++++++++++++++++++++
 .../connect/PulsarSchemaToKafkaSchemaTest.java     | 109 +++++++-
 3 files changed, 522 insertions(+), 34 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 8374dd24bf7..671495c6df6 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -37,31 +38,90 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 
 @Slf4j
 public class KafkaConnectData {
+
+    private static List<Object> arrayToList(Object nativeObject, Schema kafkaValueSchema) {
+        Preconditions.checkArgument(nativeObject.getClass().isArray());
+        int length = Array.getLength(nativeObject);
+        List<Object> out = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            // this handles primitive values too
+            Object elem = Array.get(nativeObject, i);
+            out.add(getKafkaConnectData(elem, kafkaValueSchema));
+        }
+        return out;
+    }
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
+        }
+
         if (nativeObject instanceof JsonNode) {
             JsonNode node = (JsonNode) nativeObject;
             return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                 if (nativeObject instanceof List) {
+                    List arr = (List) nativeObject;
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    return arrayToList(nativeObject, kafkaSchema.valueSchema());
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue) {
+                    org.apache.pulsar.common.schema.KeyValue kv =
+                            (org.apache.pulsar.common.schema.KeyValue) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka MAP");
+            case STRUCT:
+                if (nativeObject instanceof GenericData.Record) {
+                    GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+                    return avroAsConnectData(avroRecord, kafkaSchema);
+                } else if (nativeObject instanceof GenericRecord) {
+                    GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
+                    // Pulsar's GenericRecord
+                    if (pulsarGenericRecord.getNativeObject() instanceof JsonNode
+                            || pulsarGenericRecord.getNativeObject() instanceof GenericData.Record) {
+                        return getKafkaConnectData(pulsarGenericRecord.getNativeObject(), kafkaSchema);
+                    }
+                    return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + "into kafka STRUCT");
+            default:
+                Preconditions.checkArgument(kafkaSchema.type().isPrimitive(),
+                        "Expected primitive schema but got " + kafkaSchema.type());
+                return castToKafkaSchema(nativeObject, kafkaSchema);
+        }
     }
 
     public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
-        if (nativeObject == null) {
-            return defaultOrThrow(kafkaSchema);
-        }
-
         if (nativeObject instanceof Number) {
             // This is needed in case
             // jackson decided to fit value into some other type internally
@@ -121,6 +181,19 @@ public class KafkaConnectData {
             }
         }
 
+        if (nativeObject instanceof Character) {
+            Character ch = (Character) nativeObject;
+            if (kafkaSchema.type() == Schema.Type.STRING) {
+                return ch.toString();
+            }
+            return castToKafkaSchema(Character.getNumericValue(ch), kafkaSchema);
+        }
+
+        if (kafkaSchema.type() == Schema.Type.STRING && nativeObject instanceof CharSequence) {
+            // e.g. org.apache.avro.util.Utf8
+            return nativeObject.toString();
+        }
+
         return nativeObject;
     }
 
@@ -161,23 +234,8 @@ public class KafkaConnectData {
             if (jsonNode == null || jsonNode.isNull()) {
                 return null;
             }
-            switch (jsonNode.getNodeType()) {
-                case BINARY:
-                    try {
-                        return jsonNode.binaryValue();
-                    } catch (IOException e) {
-                        throw new DataException("Cannot get binary value for " + jsonNode);
-                    }
-                case BOOLEAN:
-                    return jsonNode.booleanValue();
-                case NUMBER:
-                    return jsonNode.doubleValue();
-                case STRING:
-                    return jsonNode.textValue();
-                default:
-                    throw new DataException("Don't know how to convert " + jsonNode
-                            + " to Connect data (schema is null).");
-            }
+            throw new DataException("Don't know how to convert " + jsonNode
+                + " to Connect data (schema is null).");
         }
 
         if (jsonNode == null || jsonNode.isNull()) {
@@ -186,39 +244,65 @@ public class KafkaConnectData {
 
         switch (kafkaSchema.type()) {
             case INT8:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return (byte) jsonNode.shortValue();
             case INT16:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.shortValue();
             case INT32:
+                if (jsonNode.isTextual() && jsonNode.textValue().length() == 1) {
+                    // char encoded as String instead of Integer
+                    return Character.getNumericValue(jsonNode.textValue().charAt(0));
+                }
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.intValue();
             case INT64:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return  jsonNode.longValue();
             case FLOAT32:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.floatValue();
             case FLOAT64:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.doubleValue();
             case BOOLEAN:
+                Preconditions.checkArgument(jsonNode.isBoolean());
                 return jsonNode.booleanValue();
             case STRING:
+                Preconditions.checkArgument(jsonNode.isTextual());
                 return jsonNode.textValue();
             case BYTES:
+                Preconditions.checkArgument(jsonNode.isBinary());
                 try {
                     return jsonNode.binaryValue();
                 } catch (IOException e) {
                     throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema);
                 }
             case ARRAY:
-                List<Object> list = new ArrayList<>();
+                if (jsonNode.isTextual() && kafkaSchema.valueSchema().type() == Schema.Type.INT32) {
+                    // char[] encoded as String in json
+                    List<Object> list = new ArrayList<>();
+                    for (char ch: jsonNode.textValue().toCharArray()) {
+                        list.add(Character.getNumericValue(ch));
+                    }
+                    return list;
+                }
+
                 Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array");
-                for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) {
+                List<Object> list = new ArrayList<>();
+                for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext();) {
                     list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema()));
                 }
                 return list;
             case MAP:
+                Preconditions.checkArgument(jsonNode.isObject(), "jsonNode has to be an Object node");
+                Preconditions.checkArgument(kafkaSchema.keySchema().type() == Schema.Type.STRING,
+                        "kafka schema for json map is expected to be STRING");
                 Map<String, Object> map = new HashMap<>();
                 for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) {
                     Map.Entry<String, JsonNode> elem = it.next();
-                    map.put(elem.getKey(), jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
+                    map.put(elem.getKey(),
+                            jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
                 }
                 return map;
             case STRUCT:
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 23d9f1b5ce2..2c0ea31ac4c 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -22,8 +22,17 @@ package org.apache.pulsar.io.kafka.connect;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -47,6 +56,8 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -56,11 +67,14 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.AbstractMap;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -538,6 +552,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         expected.put("field2", "test");
         // integer is coming back from ObjectMapper
         expected.put("field3", 100);
+        expected.put("byteField", 0);
+        expected.put("shortField", 0);
+        expected.put("intField", 0);
+        expected.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expected.put("floatField", 0.0d);
+        expected.put("doubleField", 0.0d);
 
         SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
 
@@ -565,6 +586,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         expected.put("field2", "test");
         // integer is coming back from ObjectMapper
         expected.put("field3", 100);
+        expected.put("byteField", 0);
+        expected.put("shortField", 0);
+        expected.put("intField", 0);
+        expected.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expected.put("floatField", 0.0d);
+        expected.put("doubleField", 0.0d);
 
         SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
 
@@ -615,6 +643,167 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         Assert.assertEquals(key, 11);
     }
 
+    @Test
+    public void connectDataComplexAvroSchemaGenericRecordTest() {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+        final GenericData.Record key = getComplexStructRecord();
+        final GenericData.Record value = getComplexStructRecord();
+        KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
+                getGenericRecord(value, pulsarAvroSchema));
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema));
+
+        Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+    @Test
+    public void connectDataPojoArrTest() throws Exception {
+        PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojo =
+                new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct()
+                };
+
+        testPojoAsAvroAndJsonConversionToConnectData(pojo);
+    }
+
+    @Test
+    public void connectDataPojoListTest() throws Exception {
+        List<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+                Lists.newArrayList(
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct()
+                );
+
+        /*
+        Need this because of (AFAICT)
+        https://issues.apache.org/jira/browse/AVRO-1183
+        https://github.com/apache/pulsar/issues/4851
+        to generate proper schema
+        */
+        PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojoForSchema =
+                new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct()
+                };
+
+        AvroSchema pulsarAvroSchema = AvroSchema.of(pojoForSchema.getClass());
+
+        testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+    }
+
+    @Test
+    public void connectDataPojoMapTest() throws Exception {
+        Map<String, PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+                Maps.newHashMap();
+        pojo.put("key1", getPojoComplexStruct());
+        pojo.put("key2", getPojoComplexStruct());
+
+        testPojoAsAvroAndJsonConversionToConnectData(pojo);
+    }
+
+    @Test
+    public void connectDataPrimitivesTest() throws Exception {
+        testPojoAsAvroAndJsonConversionToConnectData("test");
+
+        testPojoAsAvroAndJsonConversionToConnectData('a');
+
+        testPojoAsAvroAndJsonConversionToConnectData(Byte.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Byte.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Short.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Short.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Integer.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Integer.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Long.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Long.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Float.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Float.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Double.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Double.MAX_VALUE);
+    }
+
+    @Test
+    public void connectDataPrimitiveArraysTest() throws Exception {
+        testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test", "test2"});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b', 'c'});
+        testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a', 'b', 'c'});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Integer[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+    }
+
+    private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo) throws IOException {
+        AvroSchema pulsarAvroSchema = AvroSchema.of(pojo.getClass());
+        testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+    }
+
+    private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchema pulsarAvroSchema) throws IOException {
+        Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(pulsarAvroSchema);
+
+        Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+
+        Object jsonNode = pojoAsJsonNode(pojo);
+        connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+    private JsonNode pojoAsJsonNode(Object pojo) {
+        ObjectMapper om = new ObjectMapper();
+        JsonNode json = om.valueToTree(pojo);
+        return json;
+    }
+
+    private Object pojoAsAvroRecord(Object pojo, AvroSchema pulsarAvroSchema) throws IOException {
+        DatumWriter writer = new ReflectDatumWriter<>();
+
+        writer.setSchema(pulsarAvroSchema.getAvroSchema());
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        Encoder enc = new EncoderFactory().directBinaryEncoder(out, null);
+        writer.write(pojo, enc);
+        enc.flush();
+        byte[] data = out.toByteArray();
+
+        DatumReader<GenericRecord> reader = new GenericDatumReader<>(pulsarAvroSchema.getAvroSchema());
+        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+        Object value = reader.read(null, decoder);
+        return value;
+    }
+
     @Test
     public void schemaKeyValueAvroSchemaTest() throws Exception {
         AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
@@ -635,12 +824,26 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         expectedKey.put("field2", "key");
         // integer is coming back from ObjectMapper
         expectedKey.put("field3", 101);
+        expectedKey.put("byteField", 0);
+        expectedKey.put("shortField", 0);
+        expectedKey.put("intField", 0);
+        expectedKey.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expectedKey.put("floatField", 0.0d);
+        expectedKey.put("doubleField", 0.0d);
 
         Map<String, Object> expectedValue = new LinkedHashMap<>();
         expectedValue.put("field1", 10);
         expectedValue.put("field2", "value");
         // integer is coming back from ObjectMapper
         expectedValue.put("field3", 100);
+        expectedValue.put("byteField", 0);
+        expectedValue.put("shortField", 0);
+        expectedValue.put("intField", 0);
+        expectedValue.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expectedValue.put("floatField", 0.0d);
+        expectedValue.put("doubleField", 0.0d);
 
         KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
                             getGenericRecord(value, pulsarAvroSchema));
@@ -781,4 +984,100 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         sink.close();
     }
 
+    private static PulsarSchemaToKafkaSchemaTest.StructWithAnnotations getPojoStructWithAnnotations() {
+        return new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations()
+                .setField1(1)
+                .setField2("field2")
+                .setField3(100L)
+                .setByteField((byte) 1)
+                .setShortField((short) 2)
+                .setIntField(3)
+                .setLongField(4)
+                .setFloatField(5.0f)
+                .setDoubleField(6.0d);
+    }
+
+    private static PulsarSchemaToKafkaSchemaTest.ComplexStruct getPojoComplexStruct() {
+        return new PulsarSchemaToKafkaSchemaTest.ComplexStruct()
+                .setStringList(Lists.newArrayList("str11", "str22"))
+                .setStructArr(new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations[]{getPojoStructWithAnnotations()})
+                .setStructList(Lists.newArrayList(getPojoStructWithAnnotations()))
+                .setStruct(getPojoStructWithAnnotations())
+                .setStructMap(Map.of("key1", getPojoStructWithAnnotations(),
+                        "key2", getPojoStructWithAnnotations()))
+
+                .setByteField((byte) 1)
+                .setShortField((short) 2)
+                .setIntField(3)
+                .setLongField(4)
+                .setFloatField(5.0f)
+                .setDoubleField(6.0d)
+                .setCharField('c')
+                .setStringField("some text")
+
+                .setByteArr(new byte[] {1 ,2})
+                .setShortArr(new short[] {3, 4})
+                .setIntArr(new int[] {5, 6})
+                .setLongArr(new long[] {7, 8})
+                .setFloatArr(new float[] {9.0f, 10.0f})
+                .setDoubleArr(new double[] {11.0d, 12.0d})
+                .setCharArr(new char[]{'a', 'b'})
+                .setStringArr(new String[] {"abc", "def"});
+    }
+
+    private static GenericData.Record getStructRecord() {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+        rec.put("field1", 11);
+        rec.put("field2", "str99");
+        rec.put("field3", 101L);
+        rec.put("byteField", (byte) 1);
+        rec.put("shortField", (short) 2);
+        rec.put("intField", 3);
+        rec.put("longField", 4L);
+        rec.put("floatField", 5.0f);
+        rec.put("doubleField", 6.0d);
+
+        return rec;
+    }
+
+    private static GenericData.Record getComplexStructRecord() {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+        final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+        rec.put("stringArr", new String[]{"str1", "str2"});
+        rec.put("stringList", Lists.newArrayList("str11", "str22"));
+        rec.put("structArr", new GenericData.Record[]{getStructRecord(), getStructRecord()});
+        rec.put("structList", Lists.newArrayList(getStructRecord(), getStructRecord()));
+
+        rec.put("struct", getStructRecord());
+        rec.put("byteField", (byte) 1);
+        rec.put("shortField", (short) 2);
+        rec.put("intField", 3);
+        rec.put("longField", 4L);
+        rec.put("floatField", 5.1f);
+        rec.put("doubleField", 6.1d);
+        rec.put("charField", 'c');
+        rec.put("stringField", "some string");
+        rec.put("byteArr", new byte[] {(byte) 1, (byte) 2});
+        rec.put("shortArr", new short[] {(short) 3, (short) 4});
+        rec.put("intArr", new int[] {5, 6});
+        rec.put("longArr", new long[] {7L, 8L});
+        rec.put("floatArr", new float[] {9.0f, 10.0f});
+        rec.put("doubleArr", new double[] {11.0d, 12.0d});
+        rec.put("charArr", new char[] {'a', 'b', 'c'});
+
+        Map<String, GenericData.Record> map = new HashMap<>();
+        map.put("key1", getStructRecord());
+        map.put("key2", getStructRecord());
+
+        rec.put("structMap", map);
+
+        return rec;
+    }
 }
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 60caa2bbe81..ecf0633f588 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.connect;
 
 import com.google.common.collect.Lists;
 import lombok.Data;
+import lombok.experimental.Accessors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.reflect.AvroDefault;
 import org.apache.avro.reflect.Nullable;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigInteger;
 import java.util.List;
+import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
 
@@ -44,15 +46,90 @@ import static org.testng.Assert.assertEquals;
 @Slf4j
 public class PulsarSchemaToKafkaSchemaTest {
 
-    static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1", "field2", "field3");
+    static final List<String> STRUCT_FIELDS = Lists.newArrayList(
+            "field1",
+            "field2",
+            "field3",
+            "byteField",
+            "shortField",
+            "intField",
+            "longField",
+            "floatField",
+            "doubleField"
+        );
+    static final List<String> COMPLEX_STRUCT_FIELDS = Lists.newArrayList(
+            "stringArr",
+            "stringList",
+            "structArr",
+            "structList",
+            "structMap",
+            "struct",
+            "byteField",
+            "shortField",
+            "intField",
+            "longField",
+            "floatField",
+            "doubleField",
+            "charField",
+            "stringField",
+            "byteArr",
+            "shortArr",
+            "intArr",
+            "longArr",
+            "floatArr",
+            "doubleArr",
+            "charArr"
+        );
 
     @Data
+    @Accessors(chain = true)
     static class StructWithAnnotations {
         int field1;
         @Nullable
         String field2;
-        @AvroDefault("\"1000\"")
+        @AvroDefault("1000")
         Long field3;
+
+        @AvroDefault("0")
+        byte byteField;
+        @AvroDefault("0")
+        short shortField;
+        @AvroDefault("0")
+        int intField;
+        @AvroDefault("0")
+        long longField;
+        @AvroDefault("0")
+        float floatField;
+        @AvroDefault("0")
+        double doubleField;
+    }
+
+    @Data
+    @Accessors(chain = true)
+    static class ComplexStruct {
+        List<String> stringList;
+        StructWithAnnotations[] structArr;
+        List<StructWithAnnotations> structList;
+        Map<String, StructWithAnnotations> structMap;
+        StructWithAnnotations struct;
+
+        byte byteField;
+        short shortField;
+        int intField;
+        long longField;
+        float floatField;
+        double doubleField;
+        char charField;
+        String stringField;
+
+        byte[] byteArr;
+        short[] shortArr;
+        int[] intArr;
+        long[] longArr;
+        float[] floatArr;
+        double[] doubleArr;
+        char[] charArr;
+        String[] stringArr;
     }
 
     @Test
@@ -153,6 +230,18 @@ public class PulsarSchemaToKafkaSchemaTest {
         }
     }
 
+    @Test
+    public void avroComplexSchemaTest() {
+        AvroSchema<ComplexStruct> pulsarAvroSchema = AvroSchema.of(ComplexStruct.class);
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size());
+        for (String name: COMPLEX_STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
     @Test
     public void jsonSchemaTest() {
         JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema
@@ -169,6 +258,22 @@ public class PulsarSchemaToKafkaSchemaTest {
         }
     }
 
+    @Test
+    public void jsonComplexSchemaTest() {
+        JSONSchema<ComplexStruct> jsonSchema = JSONSchema
+                .of(SchemaDefinition.<ComplexStruct>builder()
+                        .withPojo(ComplexStruct.class)
+                        .withAlwaysAllowNull(false)
+                        .build());
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size());
+        for (String name: COMPLEX_STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
     @Test
     public void castToKafkaSchemaTest() {
         assertEquals(Byte.class,