You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/11 00:23:15 UTC

[pulsar] 08/08: Kafka connect sink adaptor to support non-primitive schemas (#10410)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3505b98b7ea0c115bb946c7a225a6b6b1e2ac42b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Jun 10 13:39:14 2021 -0700

    Kafka connect sink adaptor to support non-primitive schemas (#10410)
    
    (cherry picked from commit cf50da373500416dd2e9243088ec0b58496fa7bd)
---
 pom.xml                                            |   1 -
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  68 +------
 .../connect/schema/PulsarSchemaToKafkaSchema.java  | 105 +++++++++++
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  33 +++-
 .../connect/PulsarSchemaToKafkaSchemaTest.java     | 208 +++++++++++++++++++++
 5 files changed, 341 insertions(+), 74 deletions(-)

diff --git a/pom.xml b/pom.xml
index de59f44..d24e404 100644
--- a/pom.xml
+++ b/pom.xml
@@ -210,7 +210,6 @@ flexible messaging model and an intuitive client API.</description>
     <javassist.version>3.25.0-GA</javassist.version>
     <failsafe.version>2.3.1</failsafe.version>
     <skyscreamer.version>1.5.0</skyscreamer.version>
-    <confluent.version>5.2.2</confluent.version>
     <objenesis.version>3.1</objenesis.version>
     <awaitility.version>4.0.3</awaitility.version>
 
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index a75f1ee..cdd6c04b 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.io.kafka.connect;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -42,6 +41,7 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 
 import java.util.List;
 import java.util.Map;
@@ -59,37 +59,8 @@ import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_
 
 @Slf4j
 public class KafkaConnectSink implements Sink<GenericObject> {
-
     private boolean unwrapKeyValueIfAvailable;
 
-    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
-    private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeTypeToKafkaSchema;
-
-    static {
-        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
-                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
-                .put(Byte.class, Schema.INT8_SCHEMA)
-                .put(Short.class, Schema.INT16_SCHEMA)
-                .put(Integer.class, Schema.INT32_SCHEMA)
-                .put(Long.class, Schema.INT64_SCHEMA)
-                .put(Float.class, Schema.FLOAT32_SCHEMA)
-                .put(Double.class, Schema.FLOAT64_SCHEMA)
-                .put(String.class, Schema.STRING_SCHEMA)
-                .put(byte[].class, Schema.BYTES_SCHEMA)
-                .build();
-        pulsarSchemaTypeTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
-                .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
-                .put(SchemaType.INT8, Schema.INT8_SCHEMA)
-                .put(SchemaType.INT16, Schema.INT16_SCHEMA)
-                .put(SchemaType.INT32, Schema.INT32_SCHEMA)
-                .put(SchemaType.INT64, Schema.INT64_SCHEMA)
-                .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
-                .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
-                .put(SchemaType.STRING, Schema.STRING_SCHEMA)
-                .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
-                .build();
-    }
-
     private PulsarKafkaSinkContext sinkContext;
     @VisibleForTesting
     PulsarKafkaSinkTaskContext taskContext;
@@ -252,37 +223,6 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         }
     }
 
-    /**
-     * org.apache.kafka.connect.data.Schema for the object
-     * @param obj - Object to get schema of.
-     * @return org.apache.kafka.connect.data.Schema
-     */
-    private static Schema getKafkaConnectSchemaForObject(Object obj) {
-        if (obj != null && primitiveTypeToSchema.containsKey(obj.getClass())) {
-            return primitiveTypeToSchema.get(obj.getClass());
-        }
-        return null;
-    }
-
-    public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema, Object obj) {
-        if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null
-                && pulsarSchemaTypeTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
-            return pulsarSchemaTypeTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
-        }
-
-        Schema result = getKafkaConnectSchemaForObject(obj);
-        if (result == null) {
-            throw new IllegalStateException("Unsupported kafka schema for Pulsar Schema "
-                    + (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null
-                        ? "null"
-                        : pulsarSchema.getSchemaInfo().toString())
-                    + " object class "
-                    + (obj == null ? "null" : obj.getClass().getCanonicalName()));
-        }
-        return result;
-    }
-
-
     @SuppressWarnings("rawtypes")
     private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
@@ -301,8 +241,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
             key = kv.getKey();
             value = kv.getValue();
-            keySchema = getKafkaConnectSchema(kvSchema.getKeySchema(), key);
-            valueSchema = getKafkaConnectSchema(kvSchema.getValueSchema(), value);
+            keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
+            valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
         } else {
             if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
                 key = sourceRecord.getMessage().get().getKeyBytes();
@@ -311,8 +251,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
                 key = sourceRecord.getKey().orElse(null);
                 keySchema = Schema.STRING_SCHEMA;
             }
+            valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
             value = sourceRecord.getValue().getNativeObject();
-            valueSchema = getKafkaConnectSchema(sourceRecord.getSchema(), value);
         }
 
         long offset = sourceRecord.getRecordSequence()
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
new file mode 100644
index 0000000..c5bde39
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect.schema;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@Slf4j
+public class PulsarSchemaToKafkaSchema {
+    private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
+    private static final AvroData avroData = new AvroData(1000);
+    private static final Cache<byte[], Schema> schemaCache =
+            CacheBuilder.newBuilder().maximumSize(10000)
+                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+    static {
+        pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
+                .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
+                .put(SchemaType.INT8, Schema.INT8_SCHEMA)
+                .put(SchemaType.INT16, Schema.INT16_SCHEMA)
+                .put(SchemaType.INT32, Schema.INT32_SCHEMA)
+                .put(SchemaType.INT64, Schema.INT64_SCHEMA)
+                .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
+                .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
+                .put(SchemaType.STRING, Schema.STRING_SCHEMA)
+                .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
+                .put(SchemaType.DATE, Date.SCHEMA)
+                .build();
+    }
+
+    // Parse json to shaded schema
+    private static org.apache.pulsar.kafka.shade.avro.Schema parseAvroSchema(String schemaJson) {
+        final org.apache.pulsar.kafka.shade.avro.Schema.Parser parser = new org.apache.pulsar.kafka.shade.avro.Schema.Parser();
+        parser.setValidateDefaults(false);
+        return parser.parse(schemaJson);
+    }
+
+    public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
+        if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null) {
+            if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
+                return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
+            }
+
+            try {
+                return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
+                    if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+                        KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
+                        return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
+                                                 getKafkaConnectSchema(kvSchema.getValueSchema()))
+                                    .build();
+                    }
+                    org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
+                            parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(), UTF_8));
+                    return avroData.toConnectSchema(avroSchema);
+                });
+            } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
+                throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee);
+            }
+        }
+
+        throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
+    }
+
+    private static IllegalStateException logAndThrowOnUnsupportedSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
+                                                       String prefix,
+                                                       Throwable cause) {
+        String msg = prefix + " Pulsar Schema: "
+                + (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null
+                ? "null" : pulsarSchema.getSchemaInfo().toString());
+        log.error(msg);
+        return new IllegalStateException(msg, cause);
+    }
+}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index c388836..6f4f954 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -30,8 +30,10 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
@@ -47,6 +49,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -281,62 +284,74 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     @Test
     public void bytesRecordSchemaTest() throws Exception {
-        recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), null, "val", "BYTES");
         recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), Schema.BYTES, "val", "BYTES");
     }
 
     @Test
     public void stringRecordSchemaTest() throws Exception {
-        recordSchemaTest("val", null, "val", "STRING");
         recordSchemaTest("val", Schema.STRING, "val", "STRING");
     }
 
     @Test
     public void booleanRecordSchemaTest() throws Exception {
-        recordSchemaTest(true, null, true, "BOOLEAN");
         recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
     }
 
     @Test
     public void byteRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        recordSchemaTest((byte)1, null, 1, "INT8");
         recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
     }
 
     @Test
     public void shortRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        recordSchemaTest((short)1, null, 1, "INT16");
         recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
     }
 
     @Test
     public void integerRecordSchemaTest() throws Exception {
-        recordSchemaTest(Integer.MAX_VALUE, null, Integer.MAX_VALUE, "INT32");
         recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
     }
 
     @Test
     public void longRecordSchemaTest() throws Exception {
-        recordSchemaTest(Long.MAX_VALUE, null, Long.MAX_VALUE, "INT64");
         recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
     }
 
     @Test
     public void floatRecordSchemaTest() throws Exception {
         // 1.0d is coming back from ObjectMapper
-        recordSchemaTest(1.0f, null, 1.0d, "FLOAT32");
         recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
     }
 
     @Test
     public void doubleRecordSchemaTest() throws Exception {
-        recordSchemaTest(Double.MAX_VALUE, null, Double.MAX_VALUE, "FLOAT64");
         recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
     }
 
     @Test
+    public void jsonSchemaTest() throws Exception {
+        JSONSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> jsonSchema = JSONSchema
+                .of(SchemaDefinition.<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>builder()
+                        .withPojo(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class)
+                        .withAlwaysAllowNull(false)
+                        .build());
+        PulsarSchemaToKafkaSchemaTest.StructWithAnnotations obj = new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations();
+        obj.setField1(10);
+        obj.setField2("test");
+        obj.setField3(100L);
+
+        Map<String, Object> expected = new LinkedHashMap<>();
+        expected.put("field1", 10);
+        expected.put("field2", "test");
+        // integer is coming back from ObjectMapper
+        expected.put("field3", 100);
+
+        recordSchemaTest(obj, jsonSchema, expected, "STRUCT");
+    }
+
+    @Test
     public void unknownRecordSchemaTest() throws Exception {
         Object obj = new Object();
         props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
new file mode 100644
index 0000000..9075dd9
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.reflect.AvroDefault;
+import org.apache.avro.reflect.Nullable;
+import org.apache.kafka.connect.data.Date;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Test the conversion of PulsarSchema To KafkaSchema\.
+ */
+@Slf4j
+public class PulsarSchemaToKafkaSchemaTest {
+
+    static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1", "field2", "field3");
+
+    @Data
+    static class StructWithAnnotations {
+        int field1;
+        @Nullable
+        String field2;
+        @AvroDefault("\"1000\"")
+        Long field3;
+    }
+
+    @Test
+    public void bytesSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+
+        kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+    }
+
+    @Test
+    public void stringSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRING);
+    }
+
+    @Test
+    public void booleanSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BOOLEAN);
+    }
+
+    @Test
+    public void int8SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT8);
+    }
+
+    @Test
+    public void int16SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT16);
+    }
+
+    @Test
+    public void int32SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT32);
+    }
+
+    @Test
+    public void int64SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT64);
+    }
+
+    @Test
+    public void float32SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT32);
+    }
+
+    @Test
+    public void float64SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT64);
+    }
+
+    @Test
+    public void kvBytesSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES());
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP);
+        assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+        assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+    }
+
+    @Test
+    public void kvBytesIntSchemaTests() {
+        Schema pulsarKvSchema = KeyValueSchemaImpl.of(Schema.STRING, Schema.INT64);
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP);
+        assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.STRING);
+        assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.INT64);
+    }
+
+    @Test
+    public void avroSchemaTest() {
+        AvroSchema<StructWithAnnotations> pulsarAvroSchema = AvroSchema.of(StructWithAnnotations.class);
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
+        for (String name: STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
+    @Test
+    public void jsonSchemaTest() {
+        JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema
+                .of(SchemaDefinition.<StructWithAnnotations>builder()
+                .withPojo(StructWithAnnotations.class)
+                .withAlwaysAllowNull(false)
+                .build());
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
+        for (String name: STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
+    @Test
+    public void dateSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE);
+        assertEquals(kafkaSchema.type(), Date.SCHEMA.type());
+    }
+
+    // not supported schemas below:
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void timeSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void timestampSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void instantSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void localDateSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void localTimeSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void localDatetimeSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME);
+    }
+
+}