You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 07:15:46 UTC

[pulsar] branch branch-2.10 updated (efc36b31f51 -> 5df120a7398)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from efc36b31f51 [refactor][test] Move assert equals and retry to a base class (#14815)
     new 2b16111b332 fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598)
     new 515a9bfd46e [pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue<GenericRecord, GenericRecord> (#15025)
     new fe570032a0a Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)
     new 5df120a7398 Fix cherry-pick compatibility with JDK8

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  18 +-
 .../io/kafka/connect/schema/KafkaConnectData.java  | 216 +++++++-
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 579 +++++++++++++++++++--
 .../connect/PulsarSchemaToKafkaSchemaTest.java     | 142 ++++-
 .../kafka/connect/SchemaedFileStreamSinkTask.java  |  47 +-
 5 files changed, 921 insertions(+), 81 deletions(-)


[pulsar] 02/04: [pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue (#15025)

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 515a9bfd46ec0f2ffa3bb3d2d9cd646575574565
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Apr 14 08:55:23 2022 -0700

    [pulsar-io] KCA: properly handle KeyValue that getNativeObject() returns: corrected type + support for KeyValue<GenericRecord, GenericRecord> (#15025)
    
    (cherry picked from commit d76b5d40da2c9055102b3ecf3e5f6b358ac52732)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  18 ++-
 .../io/kafka/connect/schema/KafkaConnectData.java  |  20 +++
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 136 +++++++++++++++++++--
 .../kafka/connect/SchemaedFileStreamSinkTask.java  |  47 ++++---
 4 files changed, 193 insertions(+), 28 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 502154065d9..31f7cbf6399 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -51,9 +51,9 @@ import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
@@ -255,11 +255,21 @@ public class KafkaConnectSink implements Sink<GenericObject> {
                 && sourceRecord.getSchema().getSchemaInfo() != null
                 && sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
             KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
-            KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
             keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
             valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
-            key = kv.getKey();
-            value = kv.getValue();
+
+            Object nativeObject = sourceRecord.getValue().getNativeObject();
+
+            if (nativeObject instanceof KeyValue) {
+                KeyValue kv = (KeyValue) nativeObject;
+                key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema);
+                value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema);
+            } else if (nativeObject != null) {
+                throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass());
+            } else {
+                key = null;
+                value = null;
+            }
         } else {
             if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
                 key = sourceRecord.getMessage().get().getKeyBytes();
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index e39ce086a53..8374dd24bf7 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -33,6 +33,7 @@ import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 
 @Slf4j
 public class KafkaConnectData {
@@ -47,6 +48,10 @@ public class KafkaConnectData {
         } else if (nativeObject instanceof GenericData.Record) {
             GenericData.Record avroRecord = (GenericData.Record) nativeObject;
             return avroAsConnectData(avroRecord, kafkaSchema);
+        } else if (nativeObject instanceof GenericRecord) {
+            // Pulsar's GenericRecord
+            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
+            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
         }
 
         return castToKafkaSchema(nativeObject, kafkaSchema);
@@ -134,6 +139,21 @@ public class KafkaConnectData {
         return struct;
     }
 
+    static Object pulsarGenericRecordAsConnectData(GenericRecord genericRecord, Schema kafkaSchema) {
+        if (kafkaSchema == null) {
+            if (genericRecord == null) {
+                return null;
+            }
+            throw new DataException("Don't know how to convert " + genericRecord + " to Connect data (schema is null).");
+        }
+
+        Struct struct = new Struct(kafkaSchema);
+        for (Field field : kafkaSchema.fields()) {
+            struct.put(field, getKafkaConnectData(genericRecord.getField(field.name()), field.schema()));
+        }
+        return struct;
+    }
+
     // with some help of
     // https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
     static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 14c7dcd7ef8..23d9f1b5ce2 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -40,10 +41,11 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
 import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
-import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SinkContext;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -58,12 +60,14 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.AbstractMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -331,10 +335,10 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         ObjectMapper om = new ObjectMapper();
         Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){});
 
-        assertEquals(result.get("key"), expectedKey);
-        assertEquals(result.get("value"), expected);
-        assertEquals(result.get("keySchema"), expectedKeySchema);
-        assertEquals(result.get("valueSchema"), expectedSchema);
+        assertEquals(expectedKey, result.get("key"));
+        assertEquals(expected, result.get("value"));
+        assertEquals(expectedKeySchema, result.get("keySchema"));
+        assertEquals(expectedSchema, result.get("valueSchema"));
 
         if (schema.getSchemaInfo().getType().isPrimitive()) {
             // to test cast of primitive values
@@ -362,8 +366,18 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     private GenericRecord getGenericRecord(Object value, Schema schema) {
         final GenericRecord rec;
-        if(value instanceof GenericRecord) {
+        if (value instanceof GenericRecord) {
             rec = (GenericRecord) value;
+        } else if (value instanceof org.apache.avro.generic.GenericRecord) {
+            org.apache.avro.generic.GenericRecord avroRecord =
+                    (org.apache.avro.generic.GenericRecord) value;
+            org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
+            List<Field> fields = avroSchema.getFields()
+                    .stream()
+                    .map(f -> new Field(f.name(), f.pos()))
+                    .collect(Collectors.toList());
+
+            return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord);
         } else {
             rec = MockGenericObjectWrapper.builder()
                     .nativeObject(value)
@@ -592,7 +606,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
     }
 
     @Test
-    public void KeyValueSchemaTest() throws Exception {
+    public void schemaKeyValueSchemaTest() throws Exception {
         KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
         SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
         String val = (String) sinkRecord.value();
@@ -601,6 +615,114 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         Assert.assertEquals(key, 11);
     }
 
+    @Test
+    public void schemaKeyValueAvroSchemaTest() throws Exception {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record key = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        key.put("field1", 11);
+        key.put("field2", "key");
+        key.put("field3", 101L);
+
+        final GenericData.Record value = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        value.put("field1", 10);
+        value.put("field2", "value");
+        value.put("field3", 100L);
+
+        Map<String, Object> expectedKey = new LinkedHashMap<>();
+        expectedKey.put("field1", 11);
+        expectedKey.put("field2", "key");
+        // integer is coming back from ObjectMapper
+        expectedKey.put("field3", 101);
+
+        Map<String, Object> expectedValue = new LinkedHashMap<>();
+        expectedValue.put("field1", 10);
+        expectedValue.put("field2", "value");
+        // integer is coming back from ObjectMapper
+        expectedValue.put("field3", 100);
+
+        KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
+                            getGenericRecord(value, pulsarAvroSchema));
+
+        SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema),
+                expectedKey, "STRUCT", expectedValue, "STRUCT");
+
+        Struct outValue = (Struct) sinkRecord.value();
+        Assert.assertEquals((int)outValue.get("field1"), 10);
+        Assert.assertEquals((String)outValue.get("field2"), "value");
+        Assert.assertEquals((long)outValue.get("field3"), 100L);
+
+        Struct outKey = (Struct) sinkRecord.key();
+        Assert.assertEquals((int)outKey.get("field1"), 11);
+        Assert.assertEquals((String)outKey.get("field2"), "key");
+        Assert.assertEquals((long)outKey.get("field3"), 101L);
+    }
+
+    @Test
+    public void nullKeyValueSchemaTest() throws Exception {
+        props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        Message msg = mock(MessageImpl.class);
+        // value is null
+        when(msg.getValue()).thenReturn(null);
+        when(msg.getKey()).thenReturn("key");
+        when(msg.hasKey()).thenReturn(true);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName("fake-topic")
+                .message(msg)
+                .schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .build();
+
+        sink.write(record);
+        sink.flush();
+
+        // expect fail
+        assertEquals(status.get(), -1);
+
+        sink.close();
+    }
+
+    @Test
+    public void wrongKeyValueSchemaTest() throws Exception {
+        props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        Message msg = mock(MessageImpl.class);
+        // value is of a wrong/unsupported type
+        when(msg.getValue()).thenReturn(new AbstractMap.SimpleEntry<>(11, "value"));
+        when(msg.getKey()).thenReturn("key");
+        when(msg.hasKey()).thenReturn(true);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName("fake-topic")
+                .message(msg)
+                .schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .build();
+
+        sink.write(record);
+        sink.flush();
+
+        // expect fail
+        assertEquals(status.get(), -1);
+
+        sink.close();
+    }
+
     @Test
     public void offsetTest() throws Exception {
         final AtomicLong entryId = new AtomicLong(0L);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
index 9821a58eb22..07b9117d2e4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.connect;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
@@ -37,6 +38,7 @@ import java.util.Map;
  * A FileStreamSinkTask for testing that writes data other than just a value, i.e.:
  * key, value, key and value schemas.
  */
+@Slf4j
 public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
 
     @Override
@@ -49,32 +51,28 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
                     ? new String((byte[]) record.value(), StandardCharsets.US_ASCII)
                     : record.value();
 
+            Object key = record.keySchema() == Schema.BYTES_SCHEMA
+                    ? new String((byte[]) record.key(), StandardCharsets.US_ASCII)
+                    : record.key();
+
             Map<String, Object> recOut = Maps.newHashMap();
             recOut.put("keySchema", record.keySchema().type().toString());
             recOut.put("valueSchema", record.valueSchema().type().toString());
-            recOut.put("key", record.key());
-            if (val instanceof Struct) {
-                Map<String, Object> map = Maps.newHashMap();
-                Struct struct = (Struct)val;
-
-                // no recursion needed for tests
-                for (Field f: struct.schema().fields()) {
-                    map.put(f.name(), struct.get(f));
-                }
-
-                recOut.put("value", map);
-            } else {
-                recOut.put("value", val);
-            }
+            recOut.put("key", toWritableValue(key));
+            recOut.put("value", toWritableValue(val));
 
             ObjectMapper om = new ObjectMapper();
             try {
+                String valueAsString = om.writeValueAsString(recOut);
+
+                log.info("FileSink writing {}", valueAsString);
+
                 SinkRecord toSink = new SinkRecord(record.topic(),
                         record.kafkaPartition(),
-                        record.keySchema(),
-                        record.key(),
                         Schema.STRING_SCHEMA,
-                        om.writeValueAsString(recOut),
+                        "", // blank key, real one is serialized with recOut
+                        Schema.STRING_SCHEMA,
+                        valueAsString,
                         record.kafkaOffset(),
                         record.timestamp(),
                         record.timestampType());
@@ -87,4 +85,19 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
         super.put(out);
     }
 
+    private Object toWritableValue(Object val) {
+        if (val instanceof Struct) {
+            Map<String, Object> map = Maps.newHashMap();
+            Struct struct = (Struct) val;
+
+            // no recursion needed for tests
+            for (Field f: struct.schema().fields()) {
+                map.put(f.name(), struct.get(f));
+            }
+            return map;
+        } else {
+            return val;
+        }
+    }
+
 }


[pulsar] 03/04: Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe570032a0aa8fe6f79fc753bd7a8266765c1461
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Jun 8 23:49:58 2022 -0700

    Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)
    
    (cherry picked from commit 9e7aa0f43add7be841cf6b1791ec34c2ced43f1f)
---
 .../io/kafka/connect/schema/KafkaConnectData.java  | 148 +++++++---
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 299 +++++++++++++++++++++
 .../connect/PulsarSchemaToKafkaSchemaTest.java     | 109 +++++++-
 3 files changed, 522 insertions(+), 34 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 8374dd24bf7..671495c6df6 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -37,31 +38,90 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 
 @Slf4j
 public class KafkaConnectData {
+
+    private static List<Object> arrayToList(Object nativeObject, Schema kafkaValueSchema) {
+        Preconditions.checkArgument(nativeObject.getClass().isArray());
+        int length = Array.getLength(nativeObject);
+        List<Object> out = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            // this handles primitive values too
+            Object elem = Array.get(nativeObject, i);
+            out.add(getKafkaConnectData(elem, kafkaValueSchema));
+        }
+        return out;
+    }
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
+        }
+
         if (nativeObject instanceof JsonNode) {
             JsonNode node = (JsonNode) nativeObject;
             return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                 if (nativeObject instanceof List) {
+                    List arr = (List) nativeObject;
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    return arrayToList(nativeObject, kafkaSchema.valueSchema());
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue) {
+                    org.apache.pulsar.common.schema.KeyValue kv =
+                            (org.apache.pulsar.common.schema.KeyValue) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka MAP");
+            case STRUCT:
+                if (nativeObject instanceof GenericData.Record) {
+                    GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+                    return avroAsConnectData(avroRecord, kafkaSchema);
+                } else if (nativeObject instanceof GenericRecord) {
+                    GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
+                    // Pulsar's GenericRecord
+                    if (pulsarGenericRecord.getNativeObject() instanceof JsonNode
+                            || pulsarGenericRecord.getNativeObject() instanceof GenericData.Record) {
+                        return getKafkaConnectData(pulsarGenericRecord.getNativeObject(), kafkaSchema);
+                    }
+                    return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + "into kafka STRUCT");
+            default:
+                Preconditions.checkArgument(kafkaSchema.type().isPrimitive(),
+                        "Expected primitive schema but got " + kafkaSchema.type());
+                return castToKafkaSchema(nativeObject, kafkaSchema);
+        }
     }
 
     public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
-        if (nativeObject == null) {
-            return defaultOrThrow(kafkaSchema);
-        }
-
         if (nativeObject instanceof Number) {
             // This is needed in case
             // jackson decided to fit value into some other type internally
@@ -121,6 +181,19 @@ public class KafkaConnectData {
             }
         }
 
+        if (nativeObject instanceof Character) {
+            Character ch = (Character) nativeObject;
+            if (kafkaSchema.type() == Schema.Type.STRING) {
+                return ch.toString();
+            }
+            return castToKafkaSchema(Character.getNumericValue(ch), kafkaSchema);
+        }
+
+        if (kafkaSchema.type() == Schema.Type.STRING && nativeObject instanceof CharSequence) {
+            // e.g. org.apache.avro.util.Utf8
+            return nativeObject.toString();
+        }
+
         return nativeObject;
     }
 
@@ -161,23 +234,8 @@ public class KafkaConnectData {
             if (jsonNode == null || jsonNode.isNull()) {
                 return null;
             }
-            switch (jsonNode.getNodeType()) {
-                case BINARY:
-                    try {
-                        return jsonNode.binaryValue();
-                    } catch (IOException e) {
-                        throw new DataException("Cannot get binary value for " + jsonNode);
-                    }
-                case BOOLEAN:
-                    return jsonNode.booleanValue();
-                case NUMBER:
-                    return jsonNode.doubleValue();
-                case STRING:
-                    return jsonNode.textValue();
-                default:
-                    throw new DataException("Don't know how to convert " + jsonNode
-                            + " to Connect data (schema is null).");
-            }
+            throw new DataException("Don't know how to convert " + jsonNode
+                + " to Connect data (schema is null).");
         }
 
         if (jsonNode == null || jsonNode.isNull()) {
@@ -186,39 +244,65 @@ public class KafkaConnectData {
 
         switch (kafkaSchema.type()) {
             case INT8:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return (byte) jsonNode.shortValue();
             case INT16:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.shortValue();
             case INT32:
+                if (jsonNode.isTextual() && jsonNode.textValue().length() == 1) {
+                    // char encoded as String instead of Integer
+                    return Character.getNumericValue(jsonNode.textValue().charAt(0));
+                }
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.intValue();
             case INT64:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return  jsonNode.longValue();
             case FLOAT32:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.floatValue();
             case FLOAT64:
+                Preconditions.checkArgument(jsonNode.isNumber());
                 return jsonNode.doubleValue();
             case BOOLEAN:
+                Preconditions.checkArgument(jsonNode.isBoolean());
                 return jsonNode.booleanValue();
             case STRING:
+                Preconditions.checkArgument(jsonNode.isTextual());
                 return jsonNode.textValue();
             case BYTES:
+                Preconditions.checkArgument(jsonNode.isBinary());
                 try {
                     return jsonNode.binaryValue();
                 } catch (IOException e) {
                     throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema);
                 }
             case ARRAY:
-                List<Object> list = new ArrayList<>();
+                if (jsonNode.isTextual() && kafkaSchema.valueSchema().type() == Schema.Type.INT32) {
+                    // char[] encoded as String in json
+                    List<Object> list = new ArrayList<>();
+                    for (char ch: jsonNode.textValue().toCharArray()) {
+                        list.add(Character.getNumericValue(ch));
+                    }
+                    return list;
+                }
+
                 Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array");
-                for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) {
+                List<Object> list = new ArrayList<>();
+                for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext();) {
                     list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema()));
                 }
                 return list;
             case MAP:
+                Preconditions.checkArgument(jsonNode.isObject(), "jsonNode has to be an Object node");
+                Preconditions.checkArgument(kafkaSchema.keySchema().type() == Schema.Type.STRING,
+                        "kafka schema for json map is expected to be STRING");
                 Map<String, Object> map = new HashMap<>();
                 for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) {
                     Map.Entry<String, JsonNode> elem = it.next();
-                    map.put(elem.getKey(), jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
+                    map.put(elem.getKey(),
+                            jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
                 }
                 return map;
             case STRUCT:
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 23d9f1b5ce2..2c0ea31ac4c 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -22,8 +22,17 @@ package org.apache.pulsar.io.kafka.connect;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -47,6 +56,8 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -56,11 +67,14 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.AbstractMap;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -538,6 +552,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         expected.put("field2", "test");
         // integer is coming back from ObjectMapper
         expected.put("field3", 100);
+        expected.put("byteField", 0);
+        expected.put("shortField", 0);
+        expected.put("intField", 0);
+        expected.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expected.put("floatField", 0.0d);
+        expected.put("doubleField", 0.0d);
 
         SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
 
@@ -565,6 +586,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         expected.put("field2", "test");
         // integer is coming back from ObjectMapper
         expected.put("field3", 100);
+        expected.put("byteField", 0);
+        expected.put("shortField", 0);
+        expected.put("intField", 0);
+        expected.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expected.put("floatField", 0.0d);
+        expected.put("doubleField", 0.0d);
 
         SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
 
@@ -615,6 +643,167 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         Assert.assertEquals(key, 11);
     }
 
+    @Test
+    public void connectDataComplexAvroSchemaGenericRecordTest() {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+        final GenericData.Record key = getComplexStructRecord();
+        final GenericData.Record value = getComplexStructRecord();
+        KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
+                getGenericRecord(value, pulsarAvroSchema));
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema));
+
+        Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+    @Test
+    public void connectDataPojoArrTest() throws Exception {
+        PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojo =
+                new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct()
+                };
+
+        testPojoAsAvroAndJsonConversionToConnectData(pojo);
+    }
+
+    @Test
+    public void connectDataPojoListTest() throws Exception {
+        List<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+                Lists.newArrayList(
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct()
+                );
+
+        /*
+        Need this because of (AFAICT)
+        https://issues.apache.org/jira/browse/AVRO-1183
+        https://github.com/apache/pulsar/issues/4851
+        to generate proper schema
+        */
+        PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojoForSchema =
+                new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct(),
+                        getPojoComplexStruct()
+                };
+
+        AvroSchema pulsarAvroSchema = AvroSchema.of(pojoForSchema.getClass());
+
+        testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+    }
+
+    @Test
+    public void connectDataPojoMapTest() throws Exception {
+        Map<String, PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+                Maps.newHashMap();
+        pojo.put("key1", getPojoComplexStruct());
+        pojo.put("key2", getPojoComplexStruct());
+
+        testPojoAsAvroAndJsonConversionToConnectData(pojo);
+    }
+
+    @Test
+    public void connectDataPrimitivesTest() throws Exception {
+        testPojoAsAvroAndJsonConversionToConnectData("test");
+
+        testPojoAsAvroAndJsonConversionToConnectData('a');
+
+        testPojoAsAvroAndJsonConversionToConnectData(Byte.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Byte.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Short.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Short.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Integer.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Integer.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Long.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Long.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Float.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Float.MAX_VALUE);
+
+        testPojoAsAvroAndJsonConversionToConnectData(Double.MIN_VALUE);
+        testPojoAsAvroAndJsonConversionToConnectData(Double.MAX_VALUE);
+    }
+
+    @Test
+    public void connectDataPrimitiveArraysTest() throws Exception {
+        testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test", "test2"});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b', 'c'});
+        testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a', 'b', 'c'});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Integer[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+
+        testPojoAsAvroAndJsonConversionToConnectData(new double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+        testPojoAsAvroAndJsonConversionToConnectData(new Double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+    }
+
+    private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo) throws IOException {
+        AvroSchema pulsarAvroSchema = AvroSchema.of(pojo.getClass());
+        testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+    }
+
+    private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchema pulsarAvroSchema) throws IOException {
+        Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);
+
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+                .getKafkaConnectSchema(pulsarAvroSchema);
+
+        Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);
+
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+
+        Object jsonNode = pojoAsJsonNode(pojo);
+        connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
+        org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+    }
+
+    private JsonNode pojoAsJsonNode(Object pojo) {
+        ObjectMapper om = new ObjectMapper();
+        JsonNode json = om.valueToTree(pojo);
+        return json;
+    }
+
+    private Object pojoAsAvroRecord(Object pojo, AvroSchema pulsarAvroSchema) throws IOException {
+        DatumWriter writer = new ReflectDatumWriter<>();
+
+        writer.setSchema(pulsarAvroSchema.getAvroSchema());
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        Encoder enc = new EncoderFactory().directBinaryEncoder(out, null);
+        writer.write(pojo, enc);
+        enc.flush();
+        byte[] data = out.toByteArray();
+
+        DatumReader<GenericRecord> reader = new GenericDatumReader<>(pulsarAvroSchema.getAvroSchema());
+        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+        Object value = reader.read(null, decoder);
+        return value;
+    }
+
     @Test
     public void schemaKeyValueAvroSchemaTest() throws Exception {
         AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
@@ -635,12 +824,26 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         expectedKey.put("field2", "key");
         // integer is coming back from ObjectMapper
         expectedKey.put("field3", 101);
+        expectedKey.put("byteField", 0);
+        expectedKey.put("shortField", 0);
+        expectedKey.put("intField", 0);
+        expectedKey.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expectedKey.put("floatField", 0.0d);
+        expectedKey.put("doubleField", 0.0d);
 
         Map<String, Object> expectedValue = new LinkedHashMap<>();
         expectedValue.put("field1", 10);
         expectedValue.put("field2", "value");
         // integer is coming back from ObjectMapper
         expectedValue.put("field3", 100);
+        expectedValue.put("byteField", 0);
+        expectedValue.put("shortField", 0);
+        expectedValue.put("intField", 0);
+        expectedValue.put("longField", 0);
+        // double is coming back from ObjectMapper
+        expectedValue.put("floatField", 0.0d);
+        expectedValue.put("doubleField", 0.0d);
 
         KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
                             getGenericRecord(value, pulsarAvroSchema));
@@ -781,4 +984,100 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         sink.close();
     }
 
+    private static PulsarSchemaToKafkaSchemaTest.StructWithAnnotations getPojoStructWithAnnotations() {
+        return new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations()
+                .setField1(1)
+                .setField2("field2")
+                .setField3(100L)
+                .setByteField((byte) 1)
+                .setShortField((short) 2)
+                .setIntField(3)
+                .setLongField(4)
+                .setFloatField(5.0f)
+                .setDoubleField(6.0d);
+    }
+
+    private static PulsarSchemaToKafkaSchemaTest.ComplexStruct getPojoComplexStruct() {
+        return new PulsarSchemaToKafkaSchemaTest.ComplexStruct()
+                .setStringList(Lists.newArrayList("str11", "str22"))
+                .setStructArr(new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations[]{getPojoStructWithAnnotations()})
+                .setStructList(Lists.newArrayList(getPojoStructWithAnnotations()))
+                .setStruct(getPojoStructWithAnnotations())
+                .setStructMap(Map.of("key1", getPojoStructWithAnnotations(),
+                        "key2", getPojoStructWithAnnotations()))
+
+                .setByteField((byte) 1)
+                .setShortField((short) 2)
+                .setIntField(3)
+                .setLongField(4)
+                .setFloatField(5.0f)
+                .setDoubleField(6.0d)
+                .setCharField('c')
+                .setStringField("some text")
+
+                .setByteArr(new byte[] {1 ,2})
+                .setShortArr(new short[] {3, 4})
+                .setIntArr(new int[] {5, 6})
+                .setLongArr(new long[] {7, 8})
+                .setFloatArr(new float[] {9.0f, 10.0f})
+                .setDoubleArr(new double[] {11.0d, 12.0d})
+                .setCharArr(new char[]{'a', 'b'})
+                .setStringArr(new String[] {"abc", "def"});
+    }
+
+    private static GenericData.Record getStructRecord() {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+        rec.put("field1", 11);
+        rec.put("field2", "str99");
+        rec.put("field3", 101L);
+        rec.put("byteField", (byte) 1);
+        rec.put("shortField", (short) 2);
+        rec.put("intField", 3);
+        rec.put("longField", 4L);
+        rec.put("floatField", 5.0f);
+        rec.put("doubleField", 6.0d);
+
+        return rec;
+    }
+
+    private static GenericData.Record getComplexStructRecord() {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+        final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+        rec.put("stringArr", new String[]{"str1", "str2"});
+        rec.put("stringList", Lists.newArrayList("str11", "str22"));
+        rec.put("structArr", new GenericData.Record[]{getStructRecord(), getStructRecord()});
+        rec.put("structList", Lists.newArrayList(getStructRecord(), getStructRecord()));
+
+        rec.put("struct", getStructRecord());
+        rec.put("byteField", (byte) 1);
+        rec.put("shortField", (short) 2);
+        rec.put("intField", 3);
+        rec.put("longField", 4L);
+        rec.put("floatField", 5.1f);
+        rec.put("doubleField", 6.1d);
+        rec.put("charField", 'c');
+        rec.put("stringField", "some string");
+        rec.put("byteArr", new byte[] {(byte) 1, (byte) 2});
+        rec.put("shortArr", new short[] {(short) 3, (short) 4});
+        rec.put("intArr", new int[] {5, 6});
+        rec.put("longArr", new long[] {7L, 8L});
+        rec.put("floatArr", new float[] {9.0f, 10.0f});
+        rec.put("doubleArr", new double[] {11.0d, 12.0d});
+        rec.put("charArr", new char[] {'a', 'b', 'c'});
+
+        Map<String, GenericData.Record> map = new HashMap<>();
+        map.put("key1", getStructRecord());
+        map.put("key2", getStructRecord());
+
+        rec.put("structMap", map);
+
+        return rec;
+    }
 }
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 60caa2bbe81..ecf0633f588 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.connect;
 
 import com.google.common.collect.Lists;
 import lombok.Data;
+import lombok.experimental.Accessors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.reflect.AvroDefault;
 import org.apache.avro.reflect.Nullable;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigInteger;
 import java.util.List;
+import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
 
@@ -44,15 +46,90 @@ import static org.testng.Assert.assertEquals;
 @Slf4j
 public class PulsarSchemaToKafkaSchemaTest {
 
-    static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1", "field2", "field3");
+    static final List<String> STRUCT_FIELDS = Lists.newArrayList(
+            "field1",
+            "field2",
+            "field3",
+            "byteField",
+            "shortField",
+            "intField",
+            "longField",
+            "floatField",
+            "doubleField"
+        );
+    static final List<String> COMPLEX_STRUCT_FIELDS = Lists.newArrayList(
+            "stringArr",
+            "stringList",
+            "structArr",
+            "structList",
+            "structMap",
+            "struct",
+            "byteField",
+            "shortField",
+            "intField",
+            "longField",
+            "floatField",
+            "doubleField",
+            "charField",
+            "stringField",
+            "byteArr",
+            "shortArr",
+            "intArr",
+            "longArr",
+            "floatArr",
+            "doubleArr",
+            "charArr"
+        );
 
     @Data
+    @Accessors(chain = true)
     static class StructWithAnnotations {
         int field1;
         @Nullable
         String field2;
-        @AvroDefault("\"1000\"")
+        @AvroDefault("1000")
         Long field3;
+
+        @AvroDefault("0")
+        byte byteField;
+        @AvroDefault("0")
+        short shortField;
+        @AvroDefault("0")
+        int intField;
+        @AvroDefault("0")
+        long longField;
+        @AvroDefault("0")
+        float floatField;
+        @AvroDefault("0")
+        double doubleField;
+    }
+
+    @Data
+    @Accessors(chain = true)
+    static class ComplexStruct {
+        List<String> stringList;
+        StructWithAnnotations[] structArr;
+        List<StructWithAnnotations> structList;
+        Map<String, StructWithAnnotations> structMap;
+        StructWithAnnotations struct;
+
+        byte byteField;
+        short shortField;
+        int intField;
+        long longField;
+        float floatField;
+        double doubleField;
+        char charField;
+        String stringField;
+
+        byte[] byteArr;
+        short[] shortArr;
+        int[] intArr;
+        long[] longArr;
+        float[] floatArr;
+        double[] doubleArr;
+        char[] charArr;
+        String[] stringArr;
     }
 
     @Test
@@ -153,6 +230,18 @@ public class PulsarSchemaToKafkaSchemaTest {
         }
     }
 
+    @Test
+    public void avroComplexSchemaTest() {
+        AvroSchema<ComplexStruct> pulsarAvroSchema = AvroSchema.of(ComplexStruct.class);
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size());
+        for (String name: COMPLEX_STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
     @Test
     public void jsonSchemaTest() {
         JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema
@@ -169,6 +258,22 @@ public class PulsarSchemaToKafkaSchemaTest {
         }
     }
 
+    @Test
+    public void jsonComplexSchemaTest() {
+        JSONSchema<ComplexStruct> jsonSchema = JSONSchema
+                .of(SchemaDefinition.<ComplexStruct>builder()
+                        .withPojo(ComplexStruct.class)
+                        .withAlwaysAllowNull(false)
+                        .build());
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size());
+        for (String name: COMPLEX_STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
     @Test
     public void castToKafkaSchemaTest() {
         assertEquals(Byte.class,


[pulsar] 01/04: fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598)

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b16111b332468929314592494832118734b7c61
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon May 16 07:05:05 2022 -0700

    fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598)
    
    (cherry picked from commit f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a)
---
 .../io/kafka/connect/schema/KafkaConnectData.java  |  77 ++++++++++-
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 141 +++++++++++++++++----
 .../connect/PulsarSchemaToKafkaSchemaTest.java     |  33 +++++
 3 files changed, 218 insertions(+), 33 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index b1a370ddde4..e39ce086a53 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -27,22 +27,20 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
 
+@Slf4j
 public class KafkaConnectData {
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject == null) {
-            return defaultOrThrow(kafkaSchema);
-        }
-
         if (nativeObject instanceof JsonNode) {
             JsonNode node = (JsonNode) nativeObject;
             return jsonAsConnectData(node, kafkaSchema);
@@ -51,6 +49,73 @@ public class KafkaConnectData {
             return avroAsConnectData(avroRecord, kafkaSchema);
         }
 
+        return castToKafkaSchema(nativeObject, kafkaSchema);
+    }
+
+    public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
+        }
+
+        if (nativeObject instanceof Number) {
+            // This is needed in case
+            // jackson decided to fit value into some other type internally
+            // (e.g. Double instead of Float).
+            // Kafka's ConnectSchema expects exact type
+            // https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
+            Number num = (Number) nativeObject;
+            switch (kafkaSchema.type()) {
+                case INT8:
+                    if (!(nativeObject instanceof Byte)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass());
+                        }
+                        return num.byteValue();
+                    }
+                    break;
+                case INT16:
+                    if (!(nativeObject instanceof Short)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Short", nativeObject.getClass());
+                        }
+                        return num.shortValue();
+                    }
+                    break;
+                case INT32:
+                    if (!(nativeObject instanceof Integer)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass());
+                        }
+                        return num.intValue();
+                    }
+                    break;
+                case INT64:
+                    if (!(nativeObject instanceof Long)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Long", nativeObject.getClass());
+                        }
+                        return num.longValue();
+                    }
+                    break;
+                case FLOAT32:
+                    if (!(nativeObject instanceof Float)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Float", nativeObject.getClass());
+                        }
+                        return num.floatValue();
+                    }
+                    break;
+                case FLOAT64:
+                    if (!(nativeObject instanceof Double)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Double", nativeObject.getClass());
+                        }
+                        return num.doubleValue();
+                    }
+                    break;
+            }
+        }
+
         return nativeObject;
     }
 
@@ -86,9 +151,9 @@ public class KafkaConnectData {
                 case BOOLEAN:
                     return jsonNode.booleanValue();
                 case NUMBER:
-                    jsonNode.doubleValue();
+                    return jsonNode.doubleValue();
                 case STRING:
-                    jsonNode.textValue();
+                    return jsonNode.textValue();
                 default:
                     throw new DataException("Don't know how to convert " + jsonNode
                             + " to Connect data (schema is null).");
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 1fba098a228..14c7dcd7ef8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -336,8 +336,28 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         assertEquals(result.get("keySchema"), expectedKeySchema);
         assertEquals(result.get("valueSchema"), expectedSchema);
 
-        SinkRecord sinkRecord = sink.toSinkRecord(record);
-        return sinkRecord;
+        if (schema.getSchemaInfo().getType().isPrimitive()) {
+            // to test cast of primitive values
+            Message msgOut = mock(MessageImpl.class);
+            when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"), schema));
+            when(msgOut.getKey()).thenReturn(result.get("key").toString());
+            when(msgOut.hasKey()).thenReturn(true);
+            when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+            Record<GenericObject> recordOut = PulsarRecord.<String>builder()
+                    .topicName("fake-topic")
+                    .message(msgOut)
+                    .schema(schema)
+                    .ackFunction(status::incrementAndGet)
+                    .failFunction(status::decrementAndGet)
+                    .build();
+
+            SinkRecord sinkRecord = sink.toSinkRecord(recordOut);
+            return sinkRecord;
+        } else {
+            SinkRecord sinkRecord = sink.toSinkRecord(record);
+            return sinkRecord;
+        }
     }
 
     private GenericRecord getGenericRecord(Object value, Schema schema) {
@@ -353,71 +373,135 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         return rec;
     }
 
+
+    @Test
+    public void genericRecordCastTest() throws Exception {
+        props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        // schema type INT32
+        obj.put("field1", (byte)10);
+        // schema type STRING
+        obj.put("field2", "test");
+        // schema type INT64
+        obj.put("field3", (short)100);
+
+        final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
+        Message msg = mock(MessageImpl.class);
+        when(msg.getValue()).thenReturn(rec);
+        when(msg.getKey()).thenReturn("key");
+        when(msg.hasKey()).thenReturn(true);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName("fake-topic")
+                .message(msg)
+                .schema(pulsarAvroSchema)
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .build();
+
+        SinkRecord sinkRecord = sink.toSinkRecord(record);
+
+        Struct out = (Struct) sinkRecord.value();
+        Assert.assertEquals(out.get("field1").getClass(), Integer.class);
+        Assert.assertEquals(out.get("field2").getClass(), String.class);
+        Assert.assertEquals(out.get("field3").getClass(), Long.class);
+
+        Assert.assertEquals(out.get("field1"), 10);
+        Assert.assertEquals(out.get("field2"), "test");
+        Assert.assertEquals(out.get("field3"), 100L);
+
+        sink.close();
+    }
+
     @Test
     public void bytesRecordSchemaTest() throws Exception {
         byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
         SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES");
-        byte[] out = (byte[]) sinkRecord.value();
-        Assert.assertEquals(out, in);
+        // test/mock writes it as string
+        Assert.assertEquals(sinkRecord.value(), "val");
     }
 
     @Test
     public void stringRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING");
-        String out = (String) sinkRecord.value();
-        Assert.assertEquals(out, "val");
+        Assert.assertEquals(sinkRecord.value().getClass(), String.class);
+        Assert.assertEquals(sinkRecord.value(), "val");
     }
 
     @Test
     public void booleanRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
-        boolean out = (boolean) sinkRecord.value();
-        Assert.assertEquals(out, true);
+        Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class);
+        Assert.assertEquals(sinkRecord.value(), true);
     }
 
     @Test
     public void byteRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
         SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
-        byte out = (byte) sinkRecord.value();
-        Assert.assertEquals(out, 1);
+        Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
+        Assert.assertEquals(sinkRecord.value(), (byte)1);
     }
 
     @Test
     public void shortRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
         SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
-        short out = (short) sinkRecord.value();
-        Assert.assertEquals(out, 1);
+        Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
+        Assert.assertEquals(sinkRecord.value(), (short)1);
     }
 
     @Test
     public void integerRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
-        int out = (int) sinkRecord.value();
-        Assert.assertEquals(out, Integer.MAX_VALUE);
+        Assert.assertEquals(sinkRecord.value().getClass(), Integer.class);
+        Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE);
     }
 
     @Test
     public void longRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
-        long out = (long) sinkRecord.value();
-        Assert.assertEquals(out, Long.MAX_VALUE);
+        Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
+        Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE);
+    }
+
+    @Test
+    public void longRecordSchemaTestCast() throws Exception {
+        // int 1 is coming from ObjectMapper, expect Long (as in schema) from sinkRecord
+        SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64");
+        Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
+        Assert.assertEquals(sinkRecord.value(), 1L);
     }
 
     @Test
     public void floatRecordSchemaTest() throws Exception {
-        // 1.0d is coming back from ObjectMapper
+        // 1.0d is coming back from ObjectMapper, expect Float (as in schema) from sinkRecord
         SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
-        float out = (float) sinkRecord.value();
-        Assert.assertEquals(out, 1.0d);
+        Assert.assertEquals(sinkRecord.value().getClass(), Float.class);
+        Assert.assertEquals(sinkRecord.value(), 1.0f);
     }
 
     @Test
     public void doubleRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
-        double out = (double) sinkRecord.value();
-        Assert.assertEquals(out, Double.MAX_VALUE);
+        Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
+        Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE);
+    }
+
+    @Test
+    public void doubleRecordSchemaTestCast() throws Exception {
+        SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d, "FLOAT64");
+        Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
+        Assert.assertEquals(sinkRecord.value(), 1.0d);
     }
 
     @Test
@@ -444,9 +528,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
 
         Struct out = (Struct) sinkRecord.value();
-        Assert.assertEquals((int)out.get("field1"), 10);
-        Assert.assertEquals((String)out.get("field2"), "test");
-        Assert.assertEquals((long)out.get("field3"), 100L);
+        Assert.assertEquals(out.get("field1").getClass(), Integer.class);
+        Assert.assertEquals(out.get("field1"), 10);
+        Assert.assertEquals(out.get("field2").getClass(), String.class);
+        Assert.assertEquals(out.get("field2"), "test");
+        Assert.assertEquals(out.get("field3").getClass(), Long.class);
+        Assert.assertEquals(out.get("field3"), 100L);
     }
 
     @Test
@@ -468,9 +555,9 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
 
         Struct out = (Struct) sinkRecord.value();
-        Assert.assertEquals((int)out.get("field1"), 10);
-        Assert.assertEquals((String)out.get("field2"), "test");
-        Assert.assertEquals((long)out.get("field3"), 100L);
+        Assert.assertEquals(out.get("field1"), 10);
+        Assert.assertEquals(out.get("field2"), "test");
+        Assert.assertEquals(out.get("field3"), 100L);
     }
 
     @Test
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 9075dd9c3d3..60caa2bbe81 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -29,9 +29,11 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 import org.testng.annotations.Test;
 
+import java.math.BigInteger;
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
@@ -167,6 +169,37 @@ public class PulsarSchemaToKafkaSchemaTest {
         }
     }
 
+    @Test
+    public void castToKafkaSchemaTest() {
+        assertEquals(Byte.class,
+                KafkaConnectData.castToKafkaSchema(100L,
+                        org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass());
+
+        assertEquals(Short.class,
+                KafkaConnectData.castToKafkaSchema(100.0d,
+                        org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass());
+
+        assertEquals(Integer.class,
+                KafkaConnectData.castToKafkaSchema((byte)5,
+                        org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass());
+
+        assertEquals(Long.class,
+                KafkaConnectData.castToKafkaSchema((short)5,
+                        org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass());
+
+        assertEquals(Float.class,
+                KafkaConnectData.castToKafkaSchema(1.0d,
+                        org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass());
+
+        assertEquals(Double.class,
+                KafkaConnectData.castToKafkaSchema(1.5f,
+                        org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
+
+        assertEquals(Double.class,
+                KafkaConnectData.castToKafkaSchema(new BigInteger("100"),
+                        org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
+    }
+
     @Test
     public void dateSchemaTest() {
         org.apache.kafka.connect.data.Schema kafkaSchema =


[pulsar] 04/04: Fix cherry-pick compatibility with JDK8

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5df120a73980650fb6dbfd8e8048db1efcb71a98
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Thu Jun 9 09:15:32 2022 +0200

    Fix cherry-pick compatibility with JDK8
---
 .../apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java    | 3 ++-
 .../org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java   | 7 ++++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 671495c6df6..d9a756f07d6 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
 import org.apache.kafka.connect.data.Field;
@@ -72,7 +73,7 @@ public class KafkaConnectData {
                     List arr = (List) nativeObject;
                     return arr.stream()
                             .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
-                            .toList();
+                            .collect(Collectors.toList());
                 } else if (nativeObject.getClass().isArray()) {
                     return arrayToList(nativeObject, kafkaSchema.valueSchema());
                 }
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 2c0ea31ac4c..d07b0f3979d 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -998,14 +998,15 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
     }
 
     private static PulsarSchemaToKafkaSchemaTest.ComplexStruct getPojoComplexStruct() {
+        Map<String, PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> map = new HashMap<>();
+        map.put("key1", getPojoStructWithAnnotations());
+        map.put("key2", getPojoStructWithAnnotations());
         return new PulsarSchemaToKafkaSchemaTest.ComplexStruct()
                 .setStringList(Lists.newArrayList("str11", "str22"))
                 .setStructArr(new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations[]{getPojoStructWithAnnotations()})
                 .setStructList(Lists.newArrayList(getPojoStructWithAnnotations()))
                 .setStruct(getPojoStructWithAnnotations())
-                .setStructMap(Map.of("key1", getPojoStructWithAnnotations(),
-                        "key2", getPojoStructWithAnnotations()))
-
+                .setStructMap(map)
                 .setByteField((byte) 1)
                 .setShortField((short) 2)
                 .setIntField(3)