You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/08 22:11:43 UTC

[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15988: [Fix][pulasr-io] KCA Sink: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests

nicoloboschi commented on code in PR #15988:
URL: https://github.com/apache/pulsar/pull/15988#discussion_r892903123


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))

Review Comment:
   can we create a method to avoid duplication ? 



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                } else if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka MAP");
+            case STRUCT:
+                if (nativeObject instanceof JsonNode) {
+                    JsonNode node = (JsonNode) nativeObject;
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof GenericData.Record) {
+                    GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+                    return avroAsConnectData(avroRecord, kafkaSchema);
+                } else if (nativeObject instanceof GenericRecord pulsarGenericRecord) {
+                    // Pulsar's GenericRecord
+                    if (pulsarGenericRecord.getNativeObject() instanceof JsonNode
+                            || pulsarGenericRecord.getNativeObject() instanceof GenericData.Record) {
+                        return getKafkaConnectData(pulsarGenericRecord.getNativeObject(), kafkaSchema);
+                    }
+                    return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + "into kafka STRUCT");
+            default:
+                Preconditions.checkArgument(kafkaSchema.type().isPrimitive(), "Expected primitive schema");

Review Comment:
   "Expected primitive schema but found type: ..."



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                } else if (nativeObject instanceof JsonNode node) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                } else if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka MAP");
+            case STRUCT:
+                if (nativeObject instanceof JsonNode) {
+                    JsonNode node = (JsonNode) nativeObject;
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof GenericData.Record) {
+                    GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+                    return avroAsConnectData(avroRecord, kafkaSchema);
+                } else if (nativeObject instanceof GenericRecord pulsarGenericRecord) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org