You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/08 20:20:57 UTC

[GitHub] [pulsar] dlg99 opened a new pull request, #15988: [Fix][pulasr-io] KCA Sink: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests

dlg99 opened a new pull request, #15988:
URL: https://github.com/apache/pulsar/pull/15988

   ### Motivation
   
   KCA Sink fails on some complex json objects with "org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY}" or similar
   
   ### Modifications
   
   Improved handling of all possible (AFAICT) expected kafka schema types and their mapping, conversion of types etc.
   Improved test coverage.
   
   ### Verifying this change
   
   
   This change added tests.
   
   ### Does this pull request potentially affect one of the following parts:
   
   No
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli merged pull request #15988: [Fix][pulasr-io] KCA Sink: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #15988:
URL: https://github.com/apache/pulsar/pull/15988


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15988: [Fix][pulasr-io] KCA Sink: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #15988:
URL: https://github.com/apache/pulsar/pull/15988#discussion_r892903123


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))

Review Comment:
   can we create a method to avoid duplication ? 



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                } else if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka MAP");
+            case STRUCT:
+                if (nativeObject instanceof JsonNode) {
+                    JsonNode node = (JsonNode) nativeObject;
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof GenericData.Record) {
+                    GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+                    return avroAsConnectData(avroRecord, kafkaSchema);
+                } else if (nativeObject instanceof GenericRecord pulsarGenericRecord) {
+                    // Pulsar's GenericRecord
+                    if (pulsarGenericRecord.getNativeObject() instanceof JsonNode
+                            || pulsarGenericRecord.getNativeObject() instanceof GenericData.Record) {
+                        return getKafkaConnectData(pulsarGenericRecord.getNativeObject(), kafkaSchema);
+                    }
+                    return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + "into kafka STRUCT");
+            default:
+                Preconditions.checkArgument(kafkaSchema.type().isPrimitive(), "Expected primitive schema");

Review Comment:
   "Expected primitive schema but found type: ..."



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                } else if (nativeObject instanceof JsonNode node) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {
+                    return arr.stream()
+                            .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                            .toList();
+                } else if (nativeObject.getClass().isArray()) {
+                    if (nativeObject instanceof byte[]) {
+                        byte[] arr = (byte[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof short[]) {
+                        short[] arr = (short[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof int[]) {
+                        int[] arr = (int[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof long[]) {
+                        long[] arr = (long[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof float[]) {
+                        float[] arr = (float[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof double[]) {
+                        double[] arr = (double[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof boolean[]) {
+                        boolean[] arr = (boolean[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else if (nativeObject instanceof char[]) {
+                        char[] arr = (char[]) nativeObject;
+                        return Arrays.stream(ArrayUtils.toObject(arr))
+                                .map(x -> castToKafkaSchema(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    } else {
+                        Object[] arr = (Object[]) nativeObject;
+                        return Arrays.stream(arr)
+                                .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+                                .toList();
+                    }
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka ARRAY");
+            case MAP:
+                if (nativeObject instanceof Map) {
+                    Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+                    Map<Object, Object> responseMap = new HashMap<>(map.size());
+                    for (Map.Entry<Object, Object> kv : map.entrySet()) {
+                        Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                        Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                        responseMap.put(key, val);
+                    }
+                    return responseMap;
+                } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue kv) {
+                    Map<Object, Object> responseMap = new HashMap<>();
+                    Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+                    Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+                    responseMap.put(key, val);
+                    return responseMap;
+                } else if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                }
+                throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+                        + " into kafka MAP");
+            case STRUCT:
+                if (nativeObject instanceof JsonNode) {
+                    JsonNode node = (JsonNode) nativeObject;
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof GenericData.Record) {
+                    GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+                    return avroAsConnectData(avroRecord, kafkaSchema);
+                } else if (nativeObject instanceof GenericRecord pulsarGenericRecord) {

Review Comment:
   please keep it JDK8 compatible so we can cherry-pick it to release branches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #15988: [Fix][pulasr-io] KCA Sink: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #15988:
URL: https://github.com/apache/pulsar/pull/15988#discussion_r892924136


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {

Review Comment:
   will do, IntelliJ "helped" me with this ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org