You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/08 22:47:27 UTC

[GitHub] [pulsar] dlg99 commented on a diff in pull request #15988: [Fix][pulasr-io] KCA Sink: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests

dlg99 commented on code in PR #15988:
URL: https://github.com/apache/pulsar/pull/15988#discussion_r892924136


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java:
##########
@@ -37,31 +39,124 @@
 
 @Slf4j
 public class KafkaConnectData {
+
+    @SuppressWarnings("unchecked")
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject instanceof JsonNode) {
-            JsonNode node = (JsonNode) nativeObject;
-            return jsonAsConnectData(node, kafkaSchema);
-        } else if (nativeObject instanceof GenericData.Record) {
-            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
-            return avroAsConnectData(avroRecord, kafkaSchema);
-        } else if (nativeObject instanceof GenericRecord) {
-            // Pulsar's GenericRecord
-            GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
-            return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
         }
 
-        return castToKafkaSchema(nativeObject, kafkaSchema);
+        switch (kafkaSchema.type()) {
+            case ARRAY:
+                if (nativeObject instanceof JsonNode node) {
+                    return jsonAsConnectData(node, kafkaSchema);
+                } else if (nativeObject instanceof List arr) {

Review Comment:
   will do, IntelliJ "helped" me with this ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org