You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/01 06:40:12 UTC
[pulsar] branch master updated: KCA: handle kafka's logical schemas (#16485)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fe68a8e872f KCA: handle kafka's logical schemas (#16485)
fe68a8e872f is described below
commit fe68a8e872ff39369d5d401c8fc68da866c9b2dd
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Aug 31 23:40:03 2022 -0700
KCA: handle kafka's logical schemas (#16485)
---
.../io/kafka/connect/schema/KafkaConnectData.java | 58 ++++++
.../connect/schema/PulsarSchemaToKafkaSchema.java | 82 ++++++--
.../io/kafka/connect/KafkaConnectSinkTest.java | 219 ++++++++++++++++-----
3 files changed, 290 insertions(+), 69 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
index 671495c6df6..557cfbb9dd8 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -30,9 +30,13 @@ import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -122,6 +126,34 @@ public class KafkaConnectData {
}
public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
+ // special case for a few classes defined in org.apache.kafka.connect.data
+ // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
+ if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
+ if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ if (nativeObject instanceof java.util.Date) {
+ return nativeObject;
+ }
+ return Timestamp.toLogical(kafkaSchema, ((Number) nativeObject).longValue());
+ } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ if (nativeObject instanceof java.util.Date) {
+ return nativeObject;
+ }
+ return Date.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
+ } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ if (nativeObject instanceof java.util.Date) {
+ return nativeObject;
+ }
+ return Time.toLogical(kafkaSchema, ((Number) nativeObject).intValue());
+ } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ if (nativeObject instanceof java.math.BigDecimal) {
+ return nativeObject;
+ }
+ return Decimal.toLogical(kafkaSchema, (byte[]) nativeObject);
+ }
+ throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
+ + " for value " + nativeObject);
+ }
+
if (nativeObject instanceof Number) {
// This is needed in case
// jackson decided to fit value into some other type internally
@@ -242,6 +274,32 @@ public class KafkaConnectData {
return defaultOrThrow(kafkaSchema);
}
+ // special case for a few classes defined in org.apache.kafka.connect.data
+ // and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema
+ // time/date as String not supported as the format to parse is not clear
+ // (add it as a config param?)
+ if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) {
+ if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ return Timestamp.toLogical(kafkaSchema, jsonNode.longValue());
+ } else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ return Date.toLogical(kafkaSchema, jsonNode.intValue());
+ } else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ return Time.toLogical(kafkaSchema, jsonNode.intValue());
+ } else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) {
+ if (jsonNode.isNumber()) {
+ return jsonNode.decimalValue();
+ }
+ try {
+ return Decimal.toLogical(kafkaSchema, jsonNode.binaryValue());
+ } catch (IOException e) {
+ throw new IllegalStateException("Could not convert Kafka Logical Schema " + kafkaSchema.name()
+ + " for jsonNode " + jsonNode + " into Decimal");
+ }
+ }
+ throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name()
+ + " for jsonNode " + jsonNode);
+ }
+
switch (kafkaSchema.type()) {
case INT8:
Preconditions.checkArgument(jsonNode.isNumber());
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
index 72d68610bdb..d1834e2f9dd 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.io.kafka.connect.schema;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.nio.charset.StandardCharsets;
@@ -29,8 +30,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.DataException;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
@@ -38,6 +43,7 @@ import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
@Slf4j
public class PulsarSchemaToKafkaSchema {
private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
+ private static final ImmutableSet<String> kafkaLogicalSchemas;
private static final AvroData avroData = new AvroData(1000);
private static final Cache<byte[], Schema> schemaCache =
CacheBuilder.newBuilder().maximumSize(10000)
@@ -56,6 +62,16 @@ public class PulsarSchemaToKafkaSchema {
.put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
.put(SchemaType.DATE, Date.SCHEMA)
.build();
+ kafkaLogicalSchemas = ImmutableSet.<String>builder()
+ .add(Timestamp.LOGICAL_NAME)
+ .add(Date.LOGICAL_NAME)
+ .add(Time.LOGICAL_NAME)
+ .add(Decimal.LOGICAL_NAME)
+ .build();
+ }
+
+ public static boolean matchesToKafkaLogicalSchema(Schema kafkaSchema) {
+ return kafkaLogicalSchemas.contains(kafkaSchema.name());
}
// Parse json to shaded schema
@@ -67,30 +83,58 @@ public class PulsarSchemaToKafkaSchema {
}
public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
- if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null) {
- if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
- return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
- }
+ if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
+ throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
+ }
- try {
- return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
- if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
- KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
- return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
- getKafkaConnectSchema(kvSchema.getValueSchema()))
- .build();
+ String logicalSchemaName = pulsarSchema.getSchemaInfo().getName();
+ if (kafkaLogicalSchemas.contains(logicalSchemaName)) {
+ if (Timestamp.LOGICAL_NAME.equals(logicalSchemaName)) {
+ return Timestamp.SCHEMA;
+ } else if (Date.LOGICAL_NAME.equals(logicalSchemaName)) {
+ return Date.SCHEMA;
+ } else if (Time.LOGICAL_NAME.equals(logicalSchemaName)) {
+ return Time.SCHEMA;
+ } else if (Decimal.LOGICAL_NAME.equals(logicalSchemaName)) {
+ String scaleString = null;
+ final int scale;
+ if (pulsarSchema.getSchemaInfo().getProperties() != null) {
+ scaleString = pulsarSchema.getSchemaInfo().getProperties().get("scale");
+ }
+ if (scaleString == null) {
+ throw new DataException("Invalid Decimal schema: scale parameter not found.");
+ } else {
+ try {
+ scale = Integer.parseInt(scaleString);
+ } catch (NumberFormatException nfe) {
+ throw new DataException("Invalid scale parameter found in Decimal schema: ", nfe);
}
- org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
- parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(),
- StandardCharsets.UTF_8));
- return avroData.toConnectSchema(avroSchema);
- });
- } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
- throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee);
+ }
+ return Decimal.schema(scale);
}
+ throw new IllegalStateException("Unsupported Kafka Logical Schema " + logicalSchemaName);
+ }
+
+ if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
+ return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}
- throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
+ try {
+ return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
+ if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+ KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
+ return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
+ getKafkaConnectSchema(kvSchema.getValueSchema()))
+ .build();
+ }
+ org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
+ parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(),
+ StandardCharsets.UTF_8));
+ return avroData.toConnectSchema(avroSchema);
+ });
+ } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
+ throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee);
+ }
}
private static IllegalStateException logAndThrowOnUnsupportedSchema(
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 7661e5fc98d..4f3996c8fde 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -35,7 +36,11 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -52,9 +57,12 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.SinkContext;
@@ -71,10 +79,13 @@ import org.testng.collections.Maps;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.HashSet;
@@ -83,6 +94,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
@@ -104,10 +116,35 @@ import static org.testng.Assert.fail;
@SuppressWarnings({"unchecked", "rawtypes"})
@Slf4j
-public class KafkaConnectSinkTest extends ProducerConsumerBase {
+public class KafkaConnectSinkTest extends ProducerConsumerBase {
+
+ public class TestSchema implements Schema<byte[]>, Serializable {
+
+ private SchemaInfo schemaInfo;
+
+ public TestSchema(SchemaInfo schemaInfo) {
+ this.schemaInfo = schemaInfo;
+ }
+
+ @Override
+ public byte[] encode(byte[] data) {
+ return data;
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return schemaInfo;
+ }
+
+ @Override
+ public Schema<byte[]> clone() {
+ return null;
+ }
+ }
public class ResultCaptor<T> implements Answer {
private T result = null;
+
public T getResult() {
return result;
}
@@ -119,7 +156,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
}
}
- private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";
+ private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";
private Path file;
private Map<String, Object> props;
@@ -319,11 +356,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
}
private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected, String expectedSchema) throws Exception {
- return recordSchemaTest(value, schema, "key", "STRING", expected, expectedSchema);
+ return recordSchemaTest(value, schema, "key", "STRING", expected, expectedSchema);
}
private SinkRecord recordSchemaTest(Object value, Schema schema, Object expectedKey, String expectedKeySchema,
- Object expected, String expectedSchema) throws Exception {
+ Object expected, String expectedSchema) throws Exception {
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
KafkaConnectSink sink = new KafkaConnectSink();
@@ -354,7 +391,8 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
List<String> lines = Files.readAllLines(file, StandardCharsets.US_ASCII);
ObjectMapper om = new ObjectMapper();
- Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){});
+ Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>() {
+ });
assertEquals(expectedKey, result.get("key"));
assertEquals(expected, result.get("value"));
@@ -398,12 +436,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
- return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord);
+ return new GenericAvroRecord(new byte[]{1}, avroSchema, fields, avroRecord);
} else {
rec = MockGenericObjectWrapper.builder()
.nativeObject(value)
.schemaType(schema != null ? schema.getSchemaInfo().getType() : null)
- .schemaVersion(new byte[]{ 1 }).build();
+ .schemaVersion(new byte[]{1}).build();
}
return rec;
}
@@ -421,11 +459,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
// schema type INT32
- obj.put("field1", (byte)10);
+ obj.put("field1", (byte) 10);
// schema type STRING
obj.put("field2", "test");
// schema type INT64
- obj.put("field3", (short)100);
+ obj.put("field3", (short) 100);
final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
Message msg = mock(MessageImpl.class);
@@ -482,17 +520,17 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void byteRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
- SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
+ SinkRecord sinkRecord = recordSchemaTest((byte) 1, Schema.INT8, 1, "INT8");
Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
- Assert.assertEquals(sinkRecord.value(), (byte)1);
+ Assert.assertEquals(sinkRecord.value(), (byte) 1);
}
@Test
public void shortRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
- SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
+ SinkRecord sinkRecord = recordSchemaTest((short) 1, Schema.INT16, 1, "INT16");
Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
- Assert.assertEquals(sinkRecord.value(), (short)1);
+ Assert.assertEquals(sinkRecord.value(), (short) 1);
}
@Test
@@ -650,6 +688,77 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
Assert.assertEquals(key, 11);
}
+ @Test
+ public void kafkaLogicalTypesTimestampTest() {
+ Schema schema = new TestSchema(new SchemaInfoImpl()
+ .setName(Timestamp.LOGICAL_NAME)
+ .setType(SchemaType.INT64)
+ .setSchema(new byte[0]));
+
+ org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(schema);
+
+ java.util.Date date = getDateFromString("12/30/1999 11:12:13");
+ Object connectData = KafkaConnectData
+ .getKafkaConnectData(Timestamp.fromLogical(kafkaSchema, date), kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+ }
+
+ @Test
+ public void kafkaLogicalTypesTimeTest() {
+ Schema schema = new TestSchema(new SchemaInfoImpl()
+ .setName(Time.LOGICAL_NAME)
+ .setType(SchemaType.INT32)
+ .setSchema(new byte[0]));
+
+ org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(schema);
+
+ java.util.Date date = getDateFromString("01/01/1970 11:12:13");
+ Object connectData = KafkaConnectData
+ .getKafkaConnectData(Time.fromLogical(kafkaSchema, date), kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+ }
+
+ @Test
+ public void kafkaLogicalTypesDateTest() {
+ Schema schema = new TestSchema(new SchemaInfoImpl()
+ .setName(Date.LOGICAL_NAME)
+ .setType(SchemaType.INT32)
+ .setSchema(new byte[0]));
+
+ org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(schema);
+
+ java.util.Date date = getDateFromString("12/31/2022 00:00:00");
+ Object connectData = KafkaConnectData
+ .getKafkaConnectData(Date.fromLogical(kafkaSchema, date), kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+ }
+
+ @Test
+ public void kafkaLogicalTypesDecimalTest() {
+ Map<String, String> props = new HashMap<>();
+ props.put("scale", "10");
+ Schema schema = new TestSchema(new SchemaInfoImpl()
+ .setName(Decimal.LOGICAL_NAME)
+ .setType(SchemaType.BYTES)
+ .setProperties(props)
+ .setSchema(new byte[0]));
+
+ org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
+ .getKafkaConnectSchema(schema);
+
+ Object connectData = KafkaConnectData
+ .getKafkaConnectData(Decimal.fromLogical(kafkaSchema, BigDecimal.valueOf(100L, 10)), kafkaSchema);
+
+ org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
+ }
+
+
@Test
public void connectDataComplexAvroSchemaGenericRecordTest() {
AvroSchema<PulsarSchemaToKafkaSchemaTest.ComplexStruct> pulsarAvroSchema
@@ -744,28 +853,28 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void connectDataPrimitiveArraysTest() throws Exception {
- testPojoAsAvroAndJsonConversionToConnectData(new String[] {"test", "test2"});
+ testPojoAsAvroAndJsonConversionToConnectData(new String[]{"test", "test2"});
- testPojoAsAvroAndJsonConversionToConnectData(new char[] {'a', 'b', 'c'});
- testPojoAsAvroAndJsonConversionToConnectData(new Character[] {'a', 'b', 'c'});
+ testPojoAsAvroAndJsonConversionToConnectData(new char[]{'a', 'b', 'c'});
+ testPojoAsAvroAndJsonConversionToConnectData(new Character[]{'a', 'b', 'c'});
- testPojoAsAvroAndJsonConversionToConnectData(new byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new Byte[] {Byte.MIN_VALUE, Byte.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new byte[]{Byte.MIN_VALUE, Byte.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Byte[]{Byte.MIN_VALUE, Byte.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new short[] {Short.MIN_VALUE, Short.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new Short[] {Short.MIN_VALUE, Short.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new short[]{Short.MIN_VALUE, Short.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Short[]{Short.MIN_VALUE, Short.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new Integer[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new int[]{Integer.MIN_VALUE, Integer.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Integer[]{Integer.MIN_VALUE, Integer.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new long[] {Long.MIN_VALUE, Long.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new Long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new long[]{Long.MIN_VALUE, Long.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Long[]{Long.MIN_VALUE, Long.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new float[] {Float.MIN_VALUE, Float.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new Float[] {Float.MIN_VALUE, Float.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new float[]{Float.MIN_VALUE, Float.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Float[]{Float.MIN_VALUE, Float.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new double[] {Double.MIN_VALUE, Double.MAX_VALUE});
- testPojoAsAvroAndJsonConversionToConnectData(new Double[] {Double.MIN_VALUE, Double.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new double[]{Double.MIN_VALUE, Double.MAX_VALUE});
+ testPojoAsAvroAndJsonConversionToConnectData(new Double[]{Double.MIN_VALUE, Double.MAX_VALUE});
}
private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo) throws IOException {
@@ -853,20 +962,20 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
expectedValue.put("doubleField", 0.0d);
KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
- getGenericRecord(value, pulsarAvroSchema));
+ getGenericRecord(value, pulsarAvroSchema));
SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema),
expectedKey, "STRUCT", expectedValue, "STRUCT");
Struct outValue = (Struct) sinkRecord.value();
- Assert.assertEquals((int)outValue.get("field1"), 10);
- Assert.assertEquals((String)outValue.get("field2"), "value");
- Assert.assertEquals((long)outValue.get("field3"), 100L);
+ Assert.assertEquals((int) outValue.get("field1"), 10);
+ Assert.assertEquals((String) outValue.get("field2"), "value");
+ Assert.assertEquals((long) outValue.get("field3"), 100L);
Struct outKey = (Struct) sinkRecord.key();
- Assert.assertEquals((int)outKey.get("field1"), 11);
- Assert.assertEquals((String)outKey.get("field2"), "key");
- Assert.assertEquals((long)outKey.get("field3"), 101L);
+ Assert.assertEquals((int) outKey.get("field1"), 11);
+ Assert.assertEquals((String) outKey.get("field2"), "key");
+ Assert.assertEquals((long) outKey.get("field3"), 101L);
}
@Test
@@ -1186,7 +1295,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
when(msg.hasIndex()).thenReturn(false);
- final int partition = (int)(i % numPartitions);
+ final int partition = (int) (i % numPartitions);
final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
.topicName(topicName)
@@ -1257,14 +1366,14 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
.setCharField('c')
.setStringField("some text")
- .setByteArr(new byte[] {1 ,2})
- .setShortArr(new short[] {3, 4})
- .setIntArr(new int[] {5, 6})
- .setLongArr(new long[] {7, 8})
- .setFloatArr(new float[] {9.0f, 10.0f})
- .setDoubleArr(new double[] {11.0d, 12.0d})
+ .setByteArr(new byte[]{1, 2})
+ .setShortArr(new short[]{3, 4})
+ .setIntArr(new int[]{5, 6})
+ .setLongArr(new long[]{7, 8})
+ .setFloatArr(new float[]{9.0f, 10.0f})
+ .setDoubleArr(new double[]{11.0d, 12.0d})
.setCharArr(new char[]{'a', 'b'})
- .setStringArr(new String[] {"abc", "def"});
+ .setStringArr(new String[]{"abc", "def"});
}
private static GenericData.Record getStructRecord() {
@@ -1306,13 +1415,13 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
rec.put("doubleField", 6.1d);
rec.put("charField", 'c');
rec.put("stringField", "some string");
- rec.put("byteArr", new byte[] {(byte) 1, (byte) 2});
- rec.put("shortArr", new short[] {(short) 3, (short) 4});
- rec.put("intArr", new int[] {5, 6});
- rec.put("longArr", new long[] {7L, 8L});
- rec.put("floatArr", new float[] {9.0f, 10.0f});
- rec.put("doubleArr", new double[] {11.0d, 12.0d});
- rec.put("charArr", new char[] {'a', 'b', 'c'});
+ rec.put("byteArr", new byte[]{(byte) 1, (byte) 2});
+ rec.put("shortArr", new short[]{(short) 3, (short) 4});
+ rec.put("intArr", new int[]{5, 6});
+ rec.put("longArr", new long[]{7L, 8L});
+ rec.put("floatArr", new float[]{9.0f, 10.0f});
+ rec.put("doubleArr", new double[]{11.0d, 12.0d});
+ rec.put("charArr", new char[]{'a', 'b', 'c'});
Map<String, GenericData.Record> map = new HashMap<>();
map.put("key1", getStructRecord());
@@ -1322,4 +1431,14 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
return rec;
}
+
+ @SneakyThrows
+ private java.util.Date getDateFromString(String dateInString) {
+ SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");
+ formatter.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ java.util.Date parsedDate = formatter.parse(dateInString);
+ return parsedDate;
+ }
+
}