You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/09 06:50:05 UTC
[pulsar] branch master updated: Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9e7aa0f43ad Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)
9e7aa0f43ad is described below
commit 9e7aa0f43add7be841cf6b1791ec34c2ced43f1f
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Jun 8 23:49:58 2022 -0700
Fix: org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY} + tests (#15988)
---
.../io/kafka/connect/schema/KafkaConnectData.java | 148 +++++++---
.../io/kafka/connect/KafkaConnectSinkTest.java | 299 +++++++++++++++++++++
.../connect/PulsarSchemaToKafkaSchemaTest.java | 109 +++++++-
3 files changed, 522 insertions(+), 34 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 8374dd24bf7..671495c6df6 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,31 +38,90 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
@Slf4j
public class KafkaConnectData {
+
+ private static List<Object> arrayToList(Object nativeObject, Schema kafkaValueSchema) {
+ Preconditions.checkArgument(nativeObject.getClass().isArray());
+ int length = Array.getLength(nativeObject);
+ List<Object> out = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ // this handles primitive values too
+ Object elem = Array.get(nativeObject, i);
+ out.add(getKafkaConnectData(elem, kafkaValueSchema));
+ }
+ return out;
+ }
+
+ @SuppressWarnings("unchecked")
public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
if (kafkaSchema == null) {
return nativeObject;
}
+ if (nativeObject == null) {
+ return defaultOrThrow(kafkaSchema);
+ }
+
if (nativeObject instanceof JsonNode) {
JsonNode node = (JsonNode) nativeObject;
return jsonAsConnectData(node, kafkaSchema);
- } else if (nativeObject instanceof GenericData.Record) {
- GenericData.Record avroRecord = (GenericData.Record) nativeObject;
- return avroAsConnectData(avroRecord, kafkaSchema);
- } else if (nativeObject instanceof GenericRecord) {
- // Pulsar's GenericRecord
- GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
- return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
}
- return castToKafkaSchema(nativeObject, kafkaSchema);
+ switch (kafkaSchema.type()) {
+ case ARRAY:
+ if (nativeObject instanceof List) {
+ List arr = (List) nativeObject;
+ return arr.stream()
+ .map(x -> getKafkaConnectData(x, kafkaSchema.valueSchema()))
+ .toList();
+ } else if (nativeObject.getClass().isArray()) {
+ return arrayToList(nativeObject, kafkaSchema.valueSchema());
+ }
+ throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+ + " into kafka ARRAY");
+ case MAP:
+ if (nativeObject instanceof Map) {
+ Map<Object, Object> map = (Map<Object, Object>) nativeObject;
+ Map<Object, Object> responseMap = new HashMap<>(map.size());
+ for (Map.Entry<Object, Object> kv : map.entrySet()) {
+ Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+ Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+ responseMap.put(key, val);
+ }
+ return responseMap;
+ } else if (nativeObject instanceof org.apache.pulsar.common.schema.KeyValue) {
+ org.apache.pulsar.common.schema.KeyValue kv =
+ (org.apache.pulsar.common.schema.KeyValue) nativeObject;
+ Map<Object, Object> responseMap = new HashMap<>();
+ Object key = getKafkaConnectData(kv.getKey(), kafkaSchema.keySchema());
+ Object val = getKafkaConnectData(kv.getValue(), kafkaSchema.valueSchema());
+ responseMap.put(key, val);
+ return responseMap;
+ }
+ throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+ + " into kafka MAP");
+ case STRUCT:
+ if (nativeObject instanceof GenericData.Record) {
+ GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+ return avroAsConnectData(avroRecord, kafkaSchema);
+ } else if (nativeObject instanceof GenericRecord) {
+ GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
+ // Pulsar's GenericRecord
+ if (pulsarGenericRecord.getNativeObject() instanceof JsonNode
+ || pulsarGenericRecord.getNativeObject() instanceof GenericData.Record) {
+ return getKafkaConnectData(pulsarGenericRecord.getNativeObject(), kafkaSchema);
+ }
+ return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
+ }
+ throw new IllegalStateException("Don't know how to convert " + nativeObject.getClass()
+ + "into kafka STRUCT");
+ default:
+ Preconditions.checkArgument(kafkaSchema.type().isPrimitive(),
+ "Expected primitive schema but got " + kafkaSchema.type());
+ return castToKafkaSchema(nativeObject, kafkaSchema);
+ }
}
public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
- if (nativeObject == null) {
- return defaultOrThrow(kafkaSchema);
- }
-
if (nativeObject instanceof Number) {
// This is needed in case
// jackson decided to fit value into some other type internally
@@ -121,6 +181,19 @@ public class KafkaConnectData {
}
}
+ if (nativeObject instanceof Character) {
+ Character ch = (Character) nativeObject;
+ if (kafkaSchema.type() == Schema.Type.STRING) {
+ return ch.toString();
+ }
+ return castToKafkaSchema(Character.getNumericValue(ch), kafkaSchema);
+ }
+
+ if (kafkaSchema.type() == Schema.Type.STRING && nativeObject instanceof CharSequence) {
+ // e.g. org.apache.avro.util.Utf8
+ return nativeObject.toString();
+ }
+
return nativeObject;
}
@@ -161,23 +234,8 @@ public class KafkaConnectData {
if (jsonNode == null || jsonNode.isNull()) {
return null;
}
- switch (jsonNode.getNodeType()) {
- case BINARY:
- try {
- return jsonNode.binaryValue();
- } catch (IOException e) {
- throw new DataException("Cannot get binary value for " + jsonNode);
- }
- case BOOLEAN:
- return jsonNode.booleanValue();
- case NUMBER:
- return jsonNode.doubleValue();
- case STRING:
- return jsonNode.textValue();
- default:
- throw new DataException("Don't know how to convert " + jsonNode
- + " to Connect data (schema is null).");
- }
+ throw new DataException("Don't know how to convert " + jsonNode
+ + " to Connect data (schema is null).");
}
if (jsonNode == null || jsonNode.isNull()) {
@@ -186,39 +244,65 @@ public class KafkaConnectData {
switch (kafkaSchema.type()) {
case INT8:
+ Preconditions.checkArgument(jsonNode.isNumber());
return (byte) jsonNode.shortValue();
case INT16:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.shortValue();
case INT32:
+ if (jsonNode.isTextual() && jsonNode.textValue().length() == 1) {
+ // char encoded as String instead of Integer
+ return Character.getNumericValue(jsonNode.textValue().charAt(0));
+ }
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.intValue();
case INT64:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.longValue();
case FLOAT32:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.floatValue();
case FLOAT64:
+ Preconditions.checkArgument(jsonNode.isNumber());
return jsonNode.doubleValue();
case BOOLEAN:
+ Preconditions.checkArgument(jsonNode.isBoolean());
return jsonNode.booleanValue();
case STRING:
+ Preconditions.checkArgument(jsonNode.isTextual());
return jsonNode.textValue();
case BYTES:
+ Preconditions.checkArgument(jsonNode.isBinary());
try {
return jsonNode.binaryValue();
} catch (IOException e) {
throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema);
}
case ARRAY:
- List<Object> list = new ArrayList<>();
+ if (jsonNode.isTextual() && kafkaSchema.valueSchema().type() == Schema.Type.INT32) {
+ // char[] encoded as String in json
+ List<Object> list = new ArrayList<>();
+ for (char ch: jsonNode.textValue().toCharArray()) {
+ list.add(Character.getNumericValue(ch));
+ }
+ return list;
+ }
+
Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array");
- for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) {
+ List<Object> list = new ArrayList<>();
+ for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext();) {
list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema()));
}
return list;
case MAP:
+ Preconditions.checkArgument(jsonNode.isObject(), "jsonNode has to be an Object node");
+ Preconditions.checkArgument(kafkaSchema.keySchema().type() == Schema.Type.STRING,
+ "kafka schema for json map is expected to be STRING");
Map<String, Object> map = new HashMap<>();
for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) {
Map.Entry<String, JsonNode> elem = it.next();
- map.put(elem.getKey(), jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
+ map.put(elem.getKey(),
+ jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
}
return map;
case STRUCT:
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 23d9f1b5ce2..2c0ea31ac4c 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -22,8 +22,17 @@ package org.apache.pulsar.io.kafka.connect;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -47,6 +56,8 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -56,11 +67,14 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -538,6 +552,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
expected.put("field2", "test");
// integer is coming back from ObjectMapper
expected.put("field3", 100);
+ expected.put("byteField", 0);
+ expected.put("shortField", 0);
+ expected.put("intField", 0);
+ expected.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expected.put("floatField", 0.0d);
+ expected.put("doubleField", 0.0d);
SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
@@ -565,6 +586,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
expected.put("field2", "test");
// integer is coming back from ObjectMapper
expected.put("field3", 100);
+ expected.put("byteField", 0);
+ expected.put("shortField", 0);
+ expected.put("intField", 0);
+ expected.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expected.put("floatField", 0.0d);
+ expected.put("doubleField", 0.0d);
SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
@@ -615,6 +643,167 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
Assert.assertEquals(key, 11);
}
+ @Test
+ public void connectDataComplexAvroSchemaGenericRecordTest() {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+ final GenericData.Record key = getComplexStructRecord();
+ final GenericData.Record value = getComplexStructRecord();
+ KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
+ getGenericRecord(value, pulsarAvroSchema));
+
+ org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema));
+
+ Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+ }
+
+ @Test
+ public void connectDataPojoArrTest() throws Exception {
+ PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojo =
+ new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+ getPojoComplexStruct(),
+ getPojoComplexStruct(),
+ getPojoComplexStruct()
+ };
+
+ testPojoAsAvroAndJsonConversionToConnectData(pojo);
+ }
+
+ @Test
+ public void connectDataPojoListTest() throws Exception {
+ List<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+ Lists.newArrayList(
+ getPojoComplexStruct(),
+ getPojoComplexStruct(),
+ getPojoComplexStruct()
+ );
+
+ /*
+ Need this because of (AFAICT)
+ https://issues.apache.org/jira/browse/AVRO-1183
+ https://github.com/apache/pulsar/issues/4851
+ to generate proper schema
+ */
+ PulsarSchemaToKafkaSchemaTest.ComplexStruct[] pojoForSchema =
+ new PulsarSchemaToKafkaSchemaTest.ComplexStruct[]{
+ getPojoComplexStruct(),
+ getPojoComplexStruct(),
+ getPojoComplexStruct()
+ };
+
+ AvroSchema pulsarAvroSchema = AvroSchema.of(pojoForSchema.getClass());
+
+ testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+ }
+
+ @Test
+ public void connectDataPojoMapTest() throws Exception {
+ Map<String, PulsarSchemaToKafkaSchemaTest.ComplexStruct> pojo =
+ Maps.newHashMap();
+ pojo.put("key1", getPojoComplexStruct());
+ pojo.put("key2", getPojoComplexStruct());
+
+ testPojoAsAvroAndJsonConversionToConnectData(pojo);
+ }
+
+ @Test
+ public void connectDataPrimitivesTest() throws Exception {
+ testPojoAsAvroAndJsonConversionToConnectData("test");
+
+ testPojoAsAvroAndJsonConversionToConnectData('a');
+
+ testPojoAsAvroAndJsonConversionToConnectData(Byte.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Byte.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Short.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Short.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Integer.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Integer.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Long.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Long.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Float.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Float.MAX_VALUE);
+
+ testPojoAsAvroAndJsonConversionToConnectData(Double.MIN_VALUE);
+ testPojoAsAvroAndJsonConversionToConnectData(Double.MAX_VALUE);
+ }
+
+ @Test
+ public void connectDataPrimitiveArraysTest() throws Exception {
+ testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test", "test2"});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b', 'c'});
+ testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a', 'b', 'c'});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Integer[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+
+ testPojoAsAvroAndJsonConversionToConnectData(new double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+ }
+
+ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo) throws IOException {
+ AvroSchema pulsarAvroSchema = AvroSchema.of(pojo.getClass());
+ testPojoAsAvroAndJsonConversionToConnectData(pojo, pulsarAvroSchema);
+ }
+
+ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchema pulsarAvroSchema) throws IOException {
+ Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);
+
+ org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(pulsarAvroSchema);
+
+ Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+
+ Object jsonNode = pojoAsJsonNode(pojo);
+ connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+ }
+
+ private JsonNode pojoAsJsonNode(Object pojo) {
+ ObjectMapper om = new ObjectMapper();
+ JsonNode json = om.valueToTree(pojo);
+ return json;
+ }
+
+ private Object pojoAsAvroRecord(Object pojo, AvroSchema pulsarAvroSchema) throws IOException {
+ DatumWriter writer = new ReflectDatumWriter<>();
+
+ writer.setSchema(pulsarAvroSchema.getAvroSchema());
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder enc = new EncoderFactory().directBinaryEncoder(out, null);
+ writer.write(pojo, enc);
+ enc.flush();
+ byte[] data = out.toByteArray();
+
+ DatumReader<GenericRecord> reader = new GenericDatumReader<>(pulsarAvroSchema.getAvroSchema());
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+ Object value = reader.read(null, decoder);
+ return value;
+ }
+
@Test
public void schemaKeyValueAvroSchemaTest() throws Exception {
AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
@@ -635,12 +824,26 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
expectedKey.put("field2", "key");
// integer is coming back from ObjectMapper
expectedKey.put("field3", 101);
+ expectedKey.put("byteField", 0);
+ expectedKey.put("shortField", 0);
+ expectedKey.put("intField", 0);
+ expectedKey.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expectedKey.put("floatField", 0.0d);
+ expectedKey.put("doubleField", 0.0d);
Map<String, Object> expectedValue = new LinkedHashMap<>();
expectedValue.put("field1", 10);
expectedValue.put("field2", "value");
// integer is coming back from ObjectMapper
expectedValue.put("field3", 100);
+ expectedValue.put("byteField", 0);
+ expectedValue.put("shortField", 0);
+ expectedValue.put("intField", 0);
+ expectedValue.put("longField", 0);
+ // double is coming back from ObjectMapper
+ expectedValue.put("floatField", 0.0d);
+ expectedValue.put("doubleField", 0.0d);
KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
getGenericRecord(value, pulsarAvroSchema));
@@ -781,4 +984,100 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
sink.close();
}
+ private static PulsarSchemaToKafkaSchemaTest.StructWithAnnotations getPojoStructWithAnnotations() {
+ return new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations()
+ .setField1(1)
+ .setField2("field2")
+ .setField3(100L)
+ .setByteField((byte) 1)
+ .setShortField((short) 2)
+ .setIntField(3)
+ .setLongField(4)
+ .setFloatField(5.0f)
+ .setDoubleField(6.0d);
+ }
+
+ private static PulsarSchemaToKafkaSchemaTest.ComplexStruct getPojoComplexStruct() {
+ return new PulsarSchemaToKafkaSchemaTest.ComplexStruct()
+ .setStringList(Lists.newArrayList("str11", "str22"))
+ .setStructArr(new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations[]{getPojoStructWithAnnotations()})
+ .setStructList(Lists.newArrayList(getPojoStructWithAnnotations()))
+ .setStruct(getPojoStructWithAnnotations())
+ .setStructMap(Map.of("key1", getPojoStructWithAnnotations(),
+ "key2", getPojoStructWithAnnotations()))
+
+ .setByteField((byte) 1)
+ .setShortField((short) 2)
+ .setIntField(3)
+ .setLongField(4)
+ .setFloatField(5.0f)
+ .setDoubleField(6.0d)
+ .setCharField('c')
+ .setStringField("some text")
+
+ .setByteArr(new byte[] {1 ,2})
+ .setShortArr(new short[] {3, 4})
+ .setIntArr(new int[] {5, 6})
+ .setLongArr(new long[] {7, 8})
+ .setFloatArr(new float[] {9.0f, 10.0f})
+ .setDoubleArr(new double[] {11.0d, 12.0d})
+ .setCharArr(new char[]{'a', 'b'})
+ .setStringArr(new String[] {"abc", "def"});
+ }
+
+ private static GenericData.Record getStructRecord() {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+ final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+ rec.put("field1", 11);
+ rec.put("field2", "str99");
+ rec.put("field3", 101L);
+ rec.put("byteField", (byte) 1);
+ rec.put("shortField", (short) 2);
+ rec.put("intField", 3);
+ rec.put("longField", 4L);
+ rec.put("floatField", 5.0f);
+ rec.put("doubleField", 6.0d);
+
+ return rec;
+ }
+
+ private static GenericData.Record getComplexStructRecord() {
+ AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
+ = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.ComplexStruct.class);
+
+ final GenericData.Record rec = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+
+ rec.put("stringArr", new String[]{"str1", "str2"});
+ rec.put("stringList", Lists.newArrayList("str11", "str22"));
+ rec.put("structArr", new GenericData.Record[]{getStructRecord(), getStructRecord()});
+ rec.put("structList", Lists.newArrayList(getStructRecord(), getStructRecord()));
+
+ rec.put("struct", getStructRecord());
+ rec.put("byteField", (byte) 1);
+ rec.put("shortField", (short) 2);
+ rec.put("intField", 3);
+ rec.put("longField", 4L);
+ rec.put("floatField", 5.1f);
+ rec.put("doubleField", 6.1d);
+ rec.put("charField", 'c');
+ rec.put("stringField", "some string");
+ rec.put("byteArr", new byte[] {(byte) 1, (byte) 2});
+ rec.put("shortArr", new short[] {(short) 3, (short) 4});
+ rec.put("intArr", new int[] {5, 6});
+ rec.put("longArr", new long[] {7L, 8L});
+ rec.put("floatArr", new float[] {9.0f, 10.0f});
+ rec.put("doubleArr", new double[] {11.0d, 12.0d});
+ rec.put("charArr", new char[] {'a', 'b', 'c'});
+
+ Map<String, GenericData.Record> map = new HashMap<>();
+ map.put("key1", getStructRecord());
+ map.put("key2", getStructRecord());
+
+ rec.put("structMap", map);
+
+ return rec;
+ }
}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
index 60caa2bbe81..ecf0633f588 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.kafka.connect;
import com.google.common.collect.Lists;
import lombok.Data;
+import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
@@ -35,6 +36,7 @@ import org.testng.annotations.Test;
import java.math.BigInteger;
import java.util.List;
+import java.util.Map;
import static org.testng.Assert.assertEquals;
@@ -44,15 +46,90 @@ import static org.testng.Assert.assertEquals;
@Slf4j
public class PulsarSchemaToKafkaSchemaTest {
- static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1", "field2", "field3");
+ static final List<String> STRUCT_FIELDS = Lists.newArrayList(
+ "field1",
+ "field2",
+ "field3",
+ "byteField",
+ "shortField",
+ "intField",
+ "longField",
+ "floatField",
+ "doubleField"
+ );
+ static final List<String> COMPLEX_STRUCT_FIELDS = Lists.newArrayList(
+ "stringArr",
+ "stringList",
+ "structArr",
+ "structList",
+ "structMap",
+ "struct",
+ "byteField",
+ "shortField",
+ "intField",
+ "longField",
+ "floatField",
+ "doubleField",
+ "charField",
+ "stringField",
+ "byteArr",
+ "shortArr",
+ "intArr",
+ "longArr",
+ "floatArr",
+ "doubleArr",
+ "charArr"
+ );
@Data
+ @Accessors(chain = true)
static class StructWithAnnotations {
int field1;
@Nullable
String field2;
- @AvroDefault("\"1000\"")
+ @AvroDefault("1000")
Long field3;
+
+ @AvroDefault("0")
+ byte byteField;
+ @AvroDefault("0")
+ short shortField;
+ @AvroDefault("0")
+ int intField;
+ @AvroDefault("0")
+ long longField;
+ @AvroDefault("0")
+ float floatField;
+ @AvroDefault("0")
+ double doubleField;
+ }
+
+ @Data
+ @Accessors(chain = true)
+ static class ComplexStruct {
+ List<String> stringList;
+ StructWithAnnotations[] structArr;
+ List<StructWithAnnotations> structList;
+ Map<String, StructWithAnnotations> structMap;
+ StructWithAnnotations struct;
+
+ byte byteField;
+ short shortField;
+ int intField;
+ long longField;
+ float floatField;
+ double doubleField;
+ char charField;
+ String stringField;
+
+ byte[] byteArr;
+ short[] shortArr;
+ int[] intArr;
+ long[] longArr;
+ float[] floatArr;
+ double[] doubleArr;
+ char[] charArr;
+ String[] stringArr;
}
@Test
@@ -153,6 +230,18 @@ public class PulsarSchemaToKafkaSchemaTest {
}
}
+ @Test
+ public void avroComplexSchemaTest() {
+ AvroSchema<ComplexStruct> pulsarAvroSchema = AvroSchema.of(ComplexStruct.class);
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+ assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size());
+ for (String name: COMPLEX_STRUCT_FIELDS) {
+ assertEquals(kafkaSchema.field(name).name(), name);
+ }
+ }
+
@Test
public void jsonSchemaTest() {
JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema
@@ -169,6 +258,22 @@ public class PulsarSchemaToKafkaSchemaTest {
}
}
+ @Test
+ public void jsonComplexSchemaTest() {
+ JSONSchema<ComplexStruct> jsonSchema = JSONSchema
+ .of(SchemaDefinition.<ComplexStruct>builder()
+ .withPojo(ComplexStruct.class)
+ .withAlwaysAllowNull(false)
+ .build());
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+ assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size());
+ for (String name: COMPLEX_STRUCT_FIELDS) {
+ assertEquals(kafkaSchema.field(name).name(), name);
+ }
+ }
+
@Test
public void castToKafkaSchemaTest() {
assertEquals(Byte.class,