You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/09 07:15:47 UTC
[pulsar] 01/04: fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2b16111b332468929314592494832118734b7c61
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon May 16 07:05:05 2022 -0700
fix: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type... (#15598)
(cherry picked from commit f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a)
---
.../io/kafka/connect/schema/KafkaConnectData.java | 77 ++++++++++-
.../io/kafka/connect/KafkaConnectSinkTest.java | 141 +++++++++++++++++----
.../connect/PulsarSchemaToKafkaSchemaTest.java | 33 +++++
3 files changed, 218 insertions(+), 33 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index b1a370ddde4..e39ce086a53 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -27,22 +27,20 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
+@Slf4j
public class KafkaConnectData {
public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
if (kafkaSchema == null) {
return nativeObject;
}
- if (nativeObject == null) {
- return defaultOrThrow(kafkaSchema);
- }
-
if (nativeObject instanceof JsonNode) {
JsonNode node = (JsonNode) nativeObject;
return jsonAsConnectData(node, kafkaSchema);
@@ -51,6 +49,73 @@ public class KafkaConnectData {
return avroAsConnectData(avroRecord, kafkaSchema);
}
+ return castToKafkaSchema(nativeObject, kafkaSchema);
+ }
+
+ public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
+ if (nativeObject == null) {
+ return defaultOrThrow(kafkaSchema);
+ }
+
+ if (nativeObject instanceof Number) {
+ // This is needed in case
+ // jackson decided to fit value into some other type internally
+ // (e.g. Double instead of Float).
+ // Kafka's ConnectSchema expects exact type
+ // https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
+ Number num = (Number) nativeObject;
+ switch (kafkaSchema.type()) {
+ case INT8:
+ if (!(nativeObject instanceof Byte)) {
+ if (log.isDebugEnabled()) {
+ log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass());
+ }
+ return num.byteValue();
+ }
+ break;
+ case INT16:
+ if (!(nativeObject instanceof Short)) {
+ if (log.isDebugEnabled()) {
+ log.debug("nativeObject of type {} converted to Short", nativeObject.getClass());
+ }
+ return num.shortValue();
+ }
+ break;
+ case INT32:
+ if (!(nativeObject instanceof Integer)) {
+ if (log.isDebugEnabled()) {
+ log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass());
+ }
+ return num.intValue();
+ }
+ break;
+ case INT64:
+ if (!(nativeObject instanceof Long)) {
+ if (log.isDebugEnabled()) {
+ log.debug("nativeObject of type {} converted to Long", nativeObject.getClass());
+ }
+ return num.longValue();
+ }
+ break;
+ case FLOAT32:
+ if (!(nativeObject instanceof Float)) {
+ if (log.isDebugEnabled()) {
+ log.debug("nativeObject of type {} converted to Float", nativeObject.getClass());
+ }
+ return num.floatValue();
+ }
+ break;
+ case FLOAT64:
+ if (!(nativeObject instanceof Double)) {
+ if (log.isDebugEnabled()) {
+ log.debug("nativeObject of type {} converted to Double", nativeObject.getClass());
+ }
+ return num.doubleValue();
+ }
+ break;
+ }
+ }
+
return nativeObject;
}
@@ -86,9 +151,9 @@ public class KafkaConnectData {
case BOOLEAN:
return jsonNode.booleanValue();
case NUMBER:
- jsonNode.doubleValue();
+ return jsonNode.doubleValue();
case STRING:
- jsonNode.textValue();
+ return jsonNode.textValue();
default:
throw new DataException("Don't know how to convert " + jsonNode
+ " to Connect data (schema is null).");
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 1fba098a228..14c7dcd7ef8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -336,8 +336,28 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
assertEquals(result.get("keySchema"), expectedKeySchema);
assertEquals(result.get("valueSchema"), expectedSchema);
- SinkRecord sinkRecord = sink.toSinkRecord(record);
- return sinkRecord;
+ if (schema.getSchemaInfo().getType().isPrimitive()) {
+ // to test cast of primitive values
+ Message msgOut = mock(MessageImpl.class);
+ when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"), schema));
+ when(msgOut.getKey()).thenReturn(result.get("key").toString());
+ when(msgOut.hasKey()).thenReturn(true);
+ when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+ Record<GenericObject> recordOut = PulsarRecord.<String>builder()
+ .topicName("fake-topic")
+ .message(msgOut)
+ .schema(schema)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .build();
+
+ SinkRecord sinkRecord = sink.toSinkRecord(recordOut);
+ return sinkRecord;
+ } else {
+ SinkRecord sinkRecord = sink.toSinkRecord(record);
+ return sinkRecord;
+ }
}
private GenericRecord getGenericRecord(Object value, Schema schema) {
@@ -353,71 +373,135 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
return rec;
}
+
+ @Test
+ public void genericRecordCastTest() throws Exception {
+ props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+
+ KafkaConnectSink sink = new KafkaConnectSink();
+ sink.open(props, context);
+
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+ final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+ // schema type INT32
+ obj.put("field1", (byte)10);
+ // schema type STRING
+ obj.put("field2", "test");
+ // schema type INT64
+ obj.put("field3", (short)100);
+
+ final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ when(msg.getKey()).thenReturn("key");
+ when(msg.hasKey()).thenReturn(true);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName("fake-topic")
+ .message(msg)
+ .schema(pulsarAvroSchema)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .build();
+
+ SinkRecord sinkRecord = sink.toSinkRecord(record);
+
+ Struct out = (Struct) sinkRecord.value();
+ Assert.assertEquals(out.get("field1").getClass(), Integer.class);
+ Assert.assertEquals(out.get("field2").getClass(), String.class);
+ Assert.assertEquals(out.get("field3").getClass(), Long.class);
+
+ Assert.assertEquals(out.get("field1"), 10);
+ Assert.assertEquals(out.get("field2"), "test");
+ Assert.assertEquals(out.get("field3"), 100L);
+
+ sink.close();
+ }
+
@Test
public void bytesRecordSchemaTest() throws Exception {
byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES");
- byte[] out = (byte[]) sinkRecord.value();
- Assert.assertEquals(out, in);
+ // test/mock writes it as string
+ Assert.assertEquals(sinkRecord.value(), "val");
}
@Test
public void stringRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING");
- String out = (String) sinkRecord.value();
- Assert.assertEquals(out, "val");
+ Assert.assertEquals(sinkRecord.value().getClass(), String.class);
+ Assert.assertEquals(sinkRecord.value(), "val");
}
@Test
public void booleanRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
- boolean out = (boolean) sinkRecord.value();
- Assert.assertEquals(out, true);
+ Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class);
+ Assert.assertEquals(sinkRecord.value(), true);
}
@Test
public void byteRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
- byte out = (byte) sinkRecord.value();
- Assert.assertEquals(out, 1);
+ Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
+ Assert.assertEquals(sinkRecord.value(), (byte)1);
}
@Test
public void shortRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
- short out = (short) sinkRecord.value();
- Assert.assertEquals(out, 1);
+ Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
+ Assert.assertEquals(sinkRecord.value(), (short)1);
}
@Test
public void integerRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
- int out = (int) sinkRecord.value();
- Assert.assertEquals(out, Integer.MAX_VALUE);
+ Assert.assertEquals(sinkRecord.value().getClass(), Integer.class);
+ Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE);
}
@Test
public void longRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
- long out = (long) sinkRecord.value();
- Assert.assertEquals(out, Long.MAX_VALUE);
+ Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
+ Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE);
+ }
+
+ @Test
+ public void longRecordSchemaTestCast() throws Exception {
+ // int 1 is coming from ObjectMapper, expect Long (as in schema) from sinkRecord
+ SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64");
+ Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
+ Assert.assertEquals(sinkRecord.value(), 1L);
}
@Test
public void floatRecordSchemaTest() throws Exception {
- // 1.0d is coming back from ObjectMapper
+ // 1.0d is coming back from ObjectMapper, expect Float (as in schema) from sinkRecord
SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
- float out = (float) sinkRecord.value();
- Assert.assertEquals(out, 1.0d);
+ Assert.assertEquals(sinkRecord.value().getClass(), Float.class);
+ Assert.assertEquals(sinkRecord.value(), 1.0f);
}
@Test
public void doubleRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
- double out = (double) sinkRecord.value();
- Assert.assertEquals(out, Double.MAX_VALUE);
+ Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
+ Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE);
+ }
+
+ @Test
+ public void doubleRecordSchemaTestCast() throws Exception {
+ SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d, "FLOAT64");
+ Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
+ Assert.assertEquals(sinkRecord.value(), 1.0d);
}
@Test
@@ -444,9 +528,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
Struct out = (Struct) sinkRecord.value();
- Assert.assertEquals((int)out.get("field1"), 10);
- Assert.assertEquals((String)out.get("field2"), "test");
- Assert.assertEquals((long)out.get("field3"), 100L);
+ Assert.assertEquals(out.get("field1").getClass(), Integer.class);
+ Assert.assertEquals(out.get("field1"), 10);
+ Assert.assertEquals(out.get("field2").getClass(), String.class);
+ Assert.assertEquals(out.get("field2"), "test");
+ Assert.assertEquals(out.get("field3").getClass(), Long.class);
+ Assert.assertEquals(out.get("field3"), 100L);
}
@Test
@@ -468,9 +555,9 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
Struct out = (Struct) sinkRecord.value();
- Assert.assertEquals((int)out.get("field1"), 10);
- Assert.assertEquals((String)out.get("field2"), "test");
- Assert.assertEquals((long)out.get("field3"), 100L);
+ Assert.assertEquals(out.get("field1"), 10);
+ Assert.assertEquals(out.get("field2"), "test");
+ Assert.assertEquals(out.get("field3"), 100L);
}
@Test
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 9075dd9c3d3..60caa2bbe81 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -29,9 +29,11 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.testng.annotations.Test;
+import java.math.BigInteger;
import java.util.List;
import static org.testng.Assert.assertEquals;
@@ -167,6 +169,37 @@ public class PulsarSchemaToKafkaSchemaTest {
}
}
+ @Test
+ public void castToKafkaSchemaTest() {
+ assertEquals(Byte.class,
+ KafkaConnectData.castToKafkaSchema(100L,
+ org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass());
+
+ assertEquals(Short.class,
+ KafkaConnectData.castToKafkaSchema(100.0d,
+ org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass());
+
+ assertEquals(Integer.class,
+ KafkaConnectData.castToKafkaSchema((byte)5,
+ org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass());
+
+ assertEquals(Long.class,
+ KafkaConnectData.castToKafkaSchema((short)5,
+ org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass());
+
+ assertEquals(Float.class,
+ KafkaConnectData.castToKafkaSchema(1.0d,
+ org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass());
+
+ assertEquals(Double.class,
+ KafkaConnectData.castToKafkaSchema(1.5f,
+ org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
+
+ assertEquals(Double.class,
+ KafkaConnectData.castToKafkaSchema(new BigInteger("100"),
+ org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
+ }
+
@Test
public void dateSchemaTest() {
org.apache.kafka.connect.data.Schema kafkaSchema =