You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/11 00:23:15 UTC
[pulsar] 08/08: Kafka connect sink adaptor to support non-primitive
schemas (#10410)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3505b98b7ea0c115bb946c7a225a6b6b1e2ac42b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Jun 10 13:39:14 2021 -0700
Kafka connect sink adaptor to support non-primitive schemas (#10410)
(cherry picked from commit cf50da373500416dd2e9243088ec0b58496fa7bd)
---
pom.xml | 1 -
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 68 +------
.../connect/schema/PulsarSchemaToKafkaSchema.java | 105 +++++++++++
.../io/kafka/connect/KafkaConnectSinkTest.java | 33 +++-
.../connect/PulsarSchemaToKafkaSchemaTest.java | 208 +++++++++++++++++++++
5 files changed, 341 insertions(+), 74 deletions(-)
diff --git a/pom.xml b/pom.xml
index de59f44..d24e404 100644
--- a/pom.xml
+++ b/pom.xml
@@ -210,7 +210,6 @@ flexible messaging model and an intuitive client API.</description>
<javassist.version>3.25.0-GA</javassist.version>
<failsafe.version>2.3.1</failsafe.version>
<skyscreamer.version>1.5.0</skyscreamer.version>
- <confluent.version>5.2.2</confluent.version>
<objenesis.version>3.1</objenesis.version>
<awaitility.version>4.0.3</awaitility.version>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index a75f1ee..cdd6c04b 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.io.kafka.connect;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -42,6 +41,7 @@ import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import java.util.List;
import java.util.Map;
@@ -59,37 +59,8 @@ import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_
@Slf4j
public class KafkaConnectSink implements Sink<GenericObject> {
-
private boolean unwrapKeyValueIfAvailable;
- private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
- private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeTypeToKafkaSchema;
-
- static {
- primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
- .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
- .put(Byte.class, Schema.INT8_SCHEMA)
- .put(Short.class, Schema.INT16_SCHEMA)
- .put(Integer.class, Schema.INT32_SCHEMA)
- .put(Long.class, Schema.INT64_SCHEMA)
- .put(Float.class, Schema.FLOAT32_SCHEMA)
- .put(Double.class, Schema.FLOAT64_SCHEMA)
- .put(String.class, Schema.STRING_SCHEMA)
- .put(byte[].class, Schema.BYTES_SCHEMA)
- .build();
- pulsarSchemaTypeTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
- .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
- .put(SchemaType.INT8, Schema.INT8_SCHEMA)
- .put(SchemaType.INT16, Schema.INT16_SCHEMA)
- .put(SchemaType.INT32, Schema.INT32_SCHEMA)
- .put(SchemaType.INT64, Schema.INT64_SCHEMA)
- .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
- .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
- .put(SchemaType.STRING, Schema.STRING_SCHEMA)
- .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
- .build();
- }
-
private PulsarKafkaSinkContext sinkContext;
@VisibleForTesting
PulsarKafkaSinkTaskContext taskContext;
@@ -252,37 +223,6 @@ public class KafkaConnectSink implements Sink<GenericObject> {
}
}
- /**
- * org.apache.kafka.connect.data.Schema for the object
- * @param obj - Object to get schema of.
- * @return org.apache.kafka.connect.data.Schema
- */
- private static Schema getKafkaConnectSchemaForObject(Object obj) {
- if (obj != null && primitiveTypeToSchema.containsKey(obj.getClass())) {
- return primitiveTypeToSchema.get(obj.getClass());
- }
- return null;
- }
-
- public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema, Object obj) {
- if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null
- && pulsarSchemaTypeTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
- return pulsarSchemaTypeTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
- }
-
- Schema result = getKafkaConnectSchemaForObject(obj);
- if (result == null) {
- throw new IllegalStateException("Unsupported kafka schema for Pulsar Schema "
- + (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null
- ? "null"
- : pulsarSchema.getSchemaInfo().toString())
- + " object class "
- + (obj == null ? "null" : obj.getClass().getCanonicalName()));
- }
- return result;
- }
-
-
@SuppressWarnings("rawtypes")
private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
@@ -301,8 +241,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
key = kv.getKey();
value = kv.getValue();
- keySchema = getKafkaConnectSchema(kvSchema.getKeySchema(), key);
- valueSchema = getKafkaConnectSchema(kvSchema.getValueSchema(), value);
+ keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
+ valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
@@ -311,8 +251,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
key = sourceRecord.getKey().orElse(null);
keySchema = Schema.STRING_SCHEMA;
}
+ valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
value = sourceRecord.getValue().getNativeObject();
- valueSchema = getKafkaConnectSchema(sourceRecord.getSchema(), value);
}
long offset = sourceRecord.getRecordSequence()
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
new file mode 100644
index 0000000..c5bde39
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect.schema;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@Slf4j
+public class PulsarSchemaToKafkaSchema {
+ private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
+ private static final AvroData avroData = new AvroData(1000);
+ private static final Cache<byte[], Schema> schemaCache =
+ CacheBuilder.newBuilder().maximumSize(10000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+ static {
+ pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
+ .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
+ .put(SchemaType.INT8, Schema.INT8_SCHEMA)
+ .put(SchemaType.INT16, Schema.INT16_SCHEMA)
+ .put(SchemaType.INT32, Schema.INT32_SCHEMA)
+ .put(SchemaType.INT64, Schema.INT64_SCHEMA)
+ .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
+ .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
+ .put(SchemaType.STRING, Schema.STRING_SCHEMA)
+ .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
+ .put(SchemaType.DATE, Date.SCHEMA)
+ .build();
+ }
+
+ // Parse json to shaded schema
+ private static org.apache.pulsar.kafka.shade.avro.Schema parseAvroSchema(String schemaJson) {
+ final org.apache.pulsar.kafka.shade.avro.Schema.Parser parser = new org.apache.pulsar.kafka.shade.avro.Schema.Parser();
+ parser.setValidateDefaults(false);
+ return parser.parse(schemaJson);
+ }
+
+ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
+ if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null) {
+ if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
+ return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
+ }
+
+ try {
+ return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
+ if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+ KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
+ return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
+ getKafkaConnectSchema(kvSchema.getValueSchema()))
+ .build();
+ }
+ org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
+ parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(), UTF_8));
+ return avroData.toConnectSchema(avroSchema);
+ });
+ } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
+ throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee);
+ }
+ }
+
+ throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
+ }
+
+ private static IllegalStateException logAndThrowOnUnsupportedSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
+ String prefix,
+ Throwable cause) {
+ String msg = prefix + " Pulsar Schema: "
+ + (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null
+ ? "null" : pulsarSchema.getSchemaInfo().toString());
+ log.error(msg);
+ return new IllegalStateException(msg, cause);
+ }
+}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index c388836..6f4f954 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -30,8 +30,10 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
@@ -47,6 +49,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -281,62 +284,74 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void bytesRecordSchemaTest() throws Exception {
- recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), null, "val", "BYTES");
recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), Schema.BYTES, "val", "BYTES");
}
@Test
public void stringRecordSchemaTest() throws Exception {
- recordSchemaTest("val", null, "val", "STRING");
recordSchemaTest("val", Schema.STRING, "val", "STRING");
}
@Test
public void booleanRecordSchemaTest() throws Exception {
- recordSchemaTest(true, null, true, "BOOLEAN");
recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
}
@Test
public void byteRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
- recordSchemaTest((byte)1, null, 1, "INT8");
recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
}
@Test
public void shortRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
- recordSchemaTest((short)1, null, 1, "INT16");
recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
}
@Test
public void integerRecordSchemaTest() throws Exception {
- recordSchemaTest(Integer.MAX_VALUE, null, Integer.MAX_VALUE, "INT32");
recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
}
@Test
public void longRecordSchemaTest() throws Exception {
- recordSchemaTest(Long.MAX_VALUE, null, Long.MAX_VALUE, "INT64");
recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
}
@Test
public void floatRecordSchemaTest() throws Exception {
// 1.0d is coming back from ObjectMapper
- recordSchemaTest(1.0f, null, 1.0d, "FLOAT32");
recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
}
@Test
public void doubleRecordSchemaTest() throws Exception {
- recordSchemaTest(Double.MAX_VALUE, null, Double.MAX_VALUE, "FLOAT64");
recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
}
@Test
+ public void jsonSchemaTest() throws Exception {
+ JSONSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> jsonSchema = JSONSchema
+ .of(SchemaDefinition.<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>builder()
+ .withPojo(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class)
+ .withAlwaysAllowNull(false)
+ .build());
+ PulsarSchemaToKafkaSchemaTest.StructWithAnnotations obj = new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations();
+ obj.setField1(10);
+ obj.setField2("test");
+ obj.setField3(100L);
+
+ Map<String, Object> expected = new LinkedHashMap<>();
+ expected.put("field1", 10);
+ expected.put("field2", "test");
+ // integer is coming back from ObjectMapper
+ expected.put("field3", 100);
+
+ recordSchemaTest(obj, jsonSchema, expected, "STRUCT");
+ }
+
+ @Test
public void unknownRecordSchemaTest() throws Exception {
Object obj = new Object();
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
new file mode 100644
index 0000000..9075dd9
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.reflect.AvroDefault;
+import org.apache.avro.reflect.Nullable;
+import org.apache.kafka.connect.data.Date;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Test the conversion of PulsarSchema To KafkaSchema\.
+ */
+@Slf4j
+public class PulsarSchemaToKafkaSchemaTest {
+
+ static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1", "field2", "field3");
+
+ @Data
+ static class StructWithAnnotations {
+ int field1;
+ @Nullable
+ String field2;
+ @AvroDefault("\"1000\"")
+ Long field3;
+ }
+
+ @Test
+ public void bytesSchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+
+ kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+ }
+
+ @Test
+ public void stringSchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRING);
+ }
+
+ @Test
+ public void booleanSchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BOOLEAN);
+ }
+
+ @Test
+ public void int8SchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT8);
+ }
+
+ @Test
+ public void int16SchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT16);
+ }
+
+ @Test
+ public void int32SchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT32);
+ }
+
+ @Test
+ public void int64SchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT64);
+ }
+
+ @Test
+ public void float32SchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT32);
+ }
+
+ @Test
+ public void float64SchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT64);
+ }
+
+ @Test
+ public void kvBytesSchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES());
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP);
+ assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+ assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+ }
+
+ @Test
+ public void kvBytesIntSchemaTests() {
+ Schema pulsarKvSchema = KeyValueSchemaImpl.of(Schema.STRING, Schema.INT64);
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP);
+ assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.STRING);
+ assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.INT64);
+ }
+
+ @Test
+ public void avroSchemaTest() {
+ AvroSchema<StructWithAnnotations> pulsarAvroSchema = AvroSchema.of(StructWithAnnotations.class);
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+ assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
+ for (String name: STRUCT_FIELDS) {
+ assertEquals(kafkaSchema.field(name).name(), name);
+ }
+ }
+
+ @Test
+ public void jsonSchemaTest() {
+ JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema
+ .of(SchemaDefinition.<StructWithAnnotations>builder()
+ .withPojo(StructWithAnnotations.class)
+ .withAlwaysAllowNull(false)
+ .build());
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+ assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+ assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
+ for (String name: STRUCT_FIELDS) {
+ assertEquals(kafkaSchema.field(name).name(), name);
+ }
+ }
+
+ @Test
+ public void dateSchemaTest() {
+ org.apache.kafka.connect.data.Schema kafkaSchema =
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE);
+ assertEquals(kafkaSchema.type(), Date.SCHEMA.type());
+ }
+
+ // not supported schemas below:
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void timeSchemaTest() {
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void timestampSchemaTest() {
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void instantSchemaTest() {
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void localDateSchemaTest() {
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void localTimeSchemaTest() {
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void localDatetimeSchemaTest() {
+ PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME);
+ }
+
+}