You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 07:15:47 UTC

[pulsar] 01/04: fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b16111b332468929314592494832118734b7c61
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon May 16 07:05:05 2022 -0700

    fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598)
    
    (cherry picked from commit f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a)
---
 .../io/kafka/connect/schema/KafkaConnectData.java  |  77 ++++++++++-
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 141 +++++++++++++++++----
 .../connect/PulsarSchemaToKafkaSchemaTest.java     |  33 +++++
 3 files changed, 218 insertions(+), 33 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index b1a370ddde4..e39ce086a53 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -27,22 +27,20 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
 
+@Slf4j
 public class KafkaConnectData {
     public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
         if (kafkaSchema == null) {
             return nativeObject;
         }
 
-        if (nativeObject == null) {
-            return defaultOrThrow(kafkaSchema);
-        }
-
         if (nativeObject instanceof JsonNode) {
             JsonNode node = (JsonNode) nativeObject;
             return jsonAsConnectData(node, kafkaSchema);
@@ -51,6 +49,73 @@ public class KafkaConnectData {
             return avroAsConnectData(avroRecord, kafkaSchema);
         }
 
+        return castToKafkaSchema(nativeObject, kafkaSchema);
+    }
+
+    public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
+        }
+
+        if (nativeObject instanceof Number) {
+            // This is needed in case
+            // jackson decided to fit value into some other type internally
+            // (e.g. Double instead of Float).
+            // Kafka's ConnectSchema expects exact type
+            // https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
+            Number num = (Number) nativeObject;
+            switch (kafkaSchema.type()) {
+                case INT8:
+                    if (!(nativeObject instanceof Byte)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass());
+                        }
+                        return num.byteValue();
+                    }
+                    break;
+                case INT16:
+                    if (!(nativeObject instanceof Short)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Short", nativeObject.getClass());
+                        }
+                        return num.shortValue();
+                    }
+                    break;
+                case INT32:
+                    if (!(nativeObject instanceof Integer)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass());
+                        }
+                        return num.intValue();
+                    }
+                    break;
+                case INT64:
+                    if (!(nativeObject instanceof Long)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Long", nativeObject.getClass());
+                        }
+                        return num.longValue();
+                    }
+                    break;
+                case FLOAT32:
+                    if (!(nativeObject instanceof Float)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Float", nativeObject.getClass());
+                        }
+                        return num.floatValue();
+                    }
+                    break;
+                case FLOAT64:
+                    if (!(nativeObject instanceof Double)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("nativeObject of type {} converted to Double", nativeObject.getClass());
+                        }
+                        return num.doubleValue();
+                    }
+                    break;
+            }
+        }
+
         return nativeObject;
     }
 
@@ -86,9 +151,9 @@ public class KafkaConnectData {
                 case BOOLEAN:
                     return jsonNode.booleanValue();
                 case NUMBER:
-                    jsonNode.doubleValue();
+                    return jsonNode.doubleValue();
                 case STRING:
-                    jsonNode.textValue();
+                    return jsonNode.textValue();
                 default:
                     throw new DataException("Don't know how to convert " + jsonNode
                             + " to Connect data (schema is null).");
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 1fba098a228..14c7dcd7ef8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -336,8 +336,28 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         assertEquals(result.get("keySchema"), expectedKeySchema);
         assertEquals(result.get("valueSchema"), expectedSchema);
 
-        SinkRecord sinkRecord = sink.toSinkRecord(record);
-        return sinkRecord;
+        if (schema.getSchemaInfo().getType().isPrimitive()) {
+            // to test cast of primitive values
+            Message msgOut = mock(MessageImpl.class);
+            when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"), schema));
+            when(msgOut.getKey()).thenReturn(result.get("key").toString());
+            when(msgOut.hasKey()).thenReturn(true);
+            when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+            Record<GenericObject> recordOut = PulsarRecord.<String>builder()
+                    .topicName("fake-topic")
+                    .message(msgOut)
+                    .schema(schema)
+                    .ackFunction(status::incrementAndGet)
+                    .failFunction(status::decrementAndGet)
+                    .build();
+
+            SinkRecord sinkRecord = sink.toSinkRecord(recordOut);
+            return sinkRecord;
+        } else {
+            SinkRecord sinkRecord = sink.toSinkRecord(record);
+            return sinkRecord;
+        }
     }
 
     private GenericRecord getGenericRecord(Object value, Schema schema) {
@@ -353,71 +373,135 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         return rec;
     }
 
+
+    @Test
+    public void genericRecordCastTest() throws Exception {
+        props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        sink.open(props, context);
+
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        // schema type INT32
+        obj.put("field1", (byte)10);
+        // schema type STRING
+        obj.put("field2", "test");
+        // schema type INT64
+        obj.put("field3", (short)100);
+
+        final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
+        Message msg = mock(MessageImpl.class);
+        when(msg.getValue()).thenReturn(rec);
+        when(msg.getKey()).thenReturn("key");
+        when(msg.hasKey()).thenReturn(true);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName("fake-topic")
+                .message(msg)
+                .schema(pulsarAvroSchema)
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .build();
+
+        SinkRecord sinkRecord = sink.toSinkRecord(record);
+
+        Struct out = (Struct) sinkRecord.value();
+        Assert.assertEquals(out.get("field1").getClass(), Integer.class);
+        Assert.assertEquals(out.get("field2").getClass(), String.class);
+        Assert.assertEquals(out.get("field3").getClass(), Long.class);
+
+        Assert.assertEquals(out.get("field1"), 10);
+        Assert.assertEquals(out.get("field2"), "test");
+        Assert.assertEquals(out.get("field3"), 100L);
+
+        sink.close();
+    }
+
     @Test
     public void bytesRecordSchemaTest() throws Exception {
         byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
         SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES");
-        byte[] out = (byte[]) sinkRecord.value();
-        Assert.assertEquals(out, in);
+        // test/mock writes it as string
+        Assert.assertEquals(sinkRecord.value(), "val");
     }
 
     @Test
     public void stringRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING");
-        String out = (String) sinkRecord.value();
-        Assert.assertEquals(out, "val");
+        Assert.assertEquals(sinkRecord.value().getClass(), String.class);
+        Assert.assertEquals(sinkRecord.value(), "val");
     }
 
     @Test
     public void booleanRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
-        boolean out = (boolean) sinkRecord.value();
-        Assert.assertEquals(out, true);
+        Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class);
+        Assert.assertEquals(sinkRecord.value(), true);
     }
 
     @Test
     public void byteRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
         SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
-        byte out = (byte) sinkRecord.value();
-        Assert.assertEquals(out, 1);
+        Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
+        Assert.assertEquals(sinkRecord.value(), (byte)1);
     }
 
     @Test
     public void shortRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
         SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
-        short out = (short) sinkRecord.value();
-        Assert.assertEquals(out, 1);
+        Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
+        Assert.assertEquals(sinkRecord.value(), (short)1);
     }
 
     @Test
     public void integerRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
-        int out = (int) sinkRecord.value();
-        Assert.assertEquals(out, Integer.MAX_VALUE);
+        Assert.assertEquals(sinkRecord.value().getClass(), Integer.class);
+        Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE);
     }
 
     @Test
     public void longRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
-        long out = (long) sinkRecord.value();
-        Assert.assertEquals(out, Long.MAX_VALUE);
+        Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
+        Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE);
+    }
+
+    @Test
+    public void longRecordSchemaTestCast() throws Exception {
+        // int 1 is coming from ObjectMapper, expect Long (as in schema) from sinkRecord
+        SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64");
+        Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
+        Assert.assertEquals(sinkRecord.value(), 1L);
     }
 
     @Test
     public void floatRecordSchemaTest() throws Exception {
-        // 1.0d is coming back from ObjectMapper
+        // 1.0d is coming back from ObjectMapper, expect Float (as in schema) from sinkRecord
         SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
-        float out = (float) sinkRecord.value();
-        Assert.assertEquals(out, 1.0d);
+        Assert.assertEquals(sinkRecord.value().getClass(), Float.class);
+        Assert.assertEquals(sinkRecord.value(), 1.0f);
     }
 
     @Test
     public void doubleRecordSchemaTest() throws Exception {
         SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
-        double out = (double) sinkRecord.value();
-        Assert.assertEquals(out, Double.MAX_VALUE);
+        Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
+        Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE);
+    }
+
+    @Test
+    public void doubleRecordSchemaTestCast() throws Exception {
+        SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d, "FLOAT64");
+        Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
+        Assert.assertEquals(sinkRecord.value(), 1.0d);
     }
 
     @Test
@@ -444,9 +528,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
 
         Struct out = (Struct) sinkRecord.value();
-        Assert.assertEquals((int)out.get("field1"), 10);
-        Assert.assertEquals((String)out.get("field2"), "test");
-        Assert.assertEquals((long)out.get("field3"), 100L);
+        Assert.assertEquals(out.get("field1").getClass(), Integer.class);
+        Assert.assertEquals(out.get("field1"), 10);
+        Assert.assertEquals(out.get("field2").getClass(), String.class);
+        Assert.assertEquals(out.get("field2"), "test");
+        Assert.assertEquals(out.get("field3").getClass(), Long.class);
+        Assert.assertEquals(out.get("field3"), 100L);
     }
 
     @Test
@@ -468,9 +555,9 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
 
         Struct out = (Struct) sinkRecord.value();
-        Assert.assertEquals((int)out.get("field1"), 10);
-        Assert.assertEquals((String)out.get("field2"), "test");
-        Assert.assertEquals((long)out.get("field3"), 100L);
+        Assert.assertEquals(out.get("field1"), 10);
+        Assert.assertEquals(out.get("field2"), "test");
+        Assert.assertEquals(out.get("field3"), 100L);
     }
 
     @Test
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 9075dd9c3d3..60caa2bbe81 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -29,9 +29,11 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 import org.testng.annotations.Test;
 
+import java.math.BigInteger;
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
@@ -167,6 +169,37 @@ public class PulsarSchemaToKafkaSchemaTest {
         }
     }
 
+    @Test
+    public void castToKafkaSchemaTest() {
+        assertEquals(Byte.class,
+                KafkaConnectData.castToKafkaSchema(100L,
+                        org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass());
+
+        assertEquals(Short.class,
+                KafkaConnectData.castToKafkaSchema(100.0d,
+                        org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass());
+
+        assertEquals(Integer.class,
+                KafkaConnectData.castToKafkaSchema((byte)5,
+                        org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass());
+
+        assertEquals(Long.class,
+                KafkaConnectData.castToKafkaSchema((short)5,
+                        org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass());
+
+        assertEquals(Float.class,
+                KafkaConnectData.castToKafkaSchema(1.0d,
+                        org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass());
+
+        assertEquals(Double.class,
+                KafkaConnectData.castToKafkaSchema(1.5f,
+                        org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
+
+        assertEquals(Double.class,
+                KafkaConnectData.castToKafkaSchema(new BigInteger("100"),
+                        org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
+    }
+
     @Test
     public void dateSchemaTest() {
         org.apache.kafka.connect.data.Schema kafkaSchema =