You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/07 16:02:32 UTC

[pulsar] branch master updated: Support Pulsar schema for pulsar kafka client wrapper (#4534)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 99d3c11  Support Pulsar schema for pulsar kafka client wrapper (#4534)
99d3c11 is described below

commit 99d3c11ec551dccbf85fbb77e268e1a97f3a806c
Author: tuteng <eg...@gmail.com>
AuthorDate: Mon Jul 8 00:02:26 2019 +0800

    Support Pulsar schema for pulsar kafka client wrapper (#4534)
    
    Fixes https://github.com/apache/pulsar/issues/4228
    
    Master Issue: https://github.com/apache/pulsar/issues/4228
    
    ### Motivation
    
    Use Pulsar schema in pulsar kafka client.
    
    ### Modifications
    
    Support schema of pulsar for pulsar kafka client
    
    ### Verifying this change
    
    Add Unit test
---
 .../kafka/compat/examples/ConsumerAvroExample.java |  75 ++++++
 .../kafka/compat/examples/ProducerAvroExample.java |  68 +++++
 .../client/kafka/compat/examples/utils/Bar.java    |  30 +++
 .../client/kafka/compat/examples/utils/Foo.java    |  35 +++
 .../clients/consumer/PulsarKafkaConsumer.java      | 108 ++++----
 .../clients/producer/PulsarKafkaProducer.java      |  83 +++---
 .../compat/KafkaProducerInterceptorWrapper.java    |  51 ++--
 .../client/kafka/compat/PulsarKafkaSchema.java     |  77 ++++++
 .../clients/producer/PulsarKafkaProducerTest.java  |  91 ++++++-
 .../KafkaProducerInterceptorWrapperTest.java       |   8 +-
 tests/pulsar-kafka-compat-client-test/pom.xml      |   5 +
 .../integration/compat/kafka/KafkaApiTest.java     | 277 +++++++++++++++++++++
 12 files changed, 804 insertions(+), 104 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
new file mode 100644
index 0000000..3e39b8d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class ConsumerAvroExample {
+
+    public static void main(String[] args) {
+        String topic = "persistent://public/default/test-avro";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", IntegerDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+
+        @SuppressWarnings("resource")
+        Consumer<Foo, Bar> consumer = new KafkaConsumer<>(props, fooSchema, barSchema);
+        consumer.subscribe(Arrays.asList(topic));
+
+        while (true) {
+            ConsumerRecords<Foo, Bar> records = consumer.poll(100);
+            records.forEach(record -> {
+                log.info("Received record: {}", record);
+            });
+
+            // Commit last offset
+            consumer.commitSync();
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
new file mode 100644
index 0000000..aa5e29a
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class ProducerAvroExample {
+    public static void main(String[] args) {
+        String topic = "persistent://public/default/test-avro";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+
+        props.put("key.serializer", IntegerSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+
+
+        Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
+            log.info("Message {} sent successfully", i);
+        }
+
+        producer.close();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
new file mode 100644
index 0000000..8120900
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Bar {
+    private boolean field1;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
new file mode 100644
index 0000000..d584f51
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Foo {
+    @Nullable
+    private String field1;
+    @Nullable
+    private String field2;
+    private int field3;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index bbab13e..15e23a2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -41,7 +41,6 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
@@ -49,20 +48,22 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -74,8 +75,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
     private final PulsarClient client;
 
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
+    private final Schema<K> keySchema;
+    private final Schema<V> valueSchema;
 
     private final String groupId;
     private final boolean isAutoCommit;
@@ -110,66 +111,75 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);
 
     public PulsarKafkaConsumer(Map<String, Object> configs) {
-        this(configs, null, null);
+        this(new ConsumerConfig(configs), null, null);
     }
 
     public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
-            Deserializer<V> valueDeserializer) {
-        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
-                keyDeserializer, valueDeserializer);
+                               Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(configs),
+                new PulsarKafkaSchema<K>(keyDeserializer), new PulsarKafkaSchema<V>(valueDeserializer));
+    }
+
+    public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ConsumerConfig(configs), keySchema, valueSchema);
     }
 
     public PulsarKafkaConsumer(Properties properties) {
-        this(properties, null, null);
+        this(new ConsumerConfig(properties), null, null);
     }
 
     public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
-            Deserializer<V> valueDeserializer) {
-        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
-                keyDeserializer, valueDeserializer);
+                               Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(properties),
+                new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
+    }
+
+    public PulsarKafkaConsumer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ConsumerConfig(properties), keySchema, valueSchema);
     }
 
     @SuppressWarnings("unchecked")
-    private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer,
-            Deserializer<V> valueDeserializer) {
+    private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
 
-        if (keyDeserializer == null) {
-            this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                    Deserializer.class);
-            this.keyDeserializer.configure(config.originals(), true);
+        if (keySchema == null) {
+            Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+            kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
+            this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
         } else {
-            this.keyDeserializer = keyDeserializer;
-            config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+            this.keySchema = keySchema;
+            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
         }
 
-        if (valueDeserializer == null) {
-            this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                    Deserializer.class);
-            this.valueDeserializer.configure(config.originals(), true);
+        if (valueSchema == null) {
+            Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+            kafkaValueDeserializer.configure(consumerConfig.originals(), true);
+            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
         } else {
-            this.valueDeserializer = valueDeserializer;
-            config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+            this.valueSchema = valueSchema;
+            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
         }
 
-        groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
-        isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
-        strategy = getStrategy(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+        isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         log.info("Offset reset strategy has been assigned value {}", strategy);
 
-        String serviceUrl = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+        String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
 
         // If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
-        if(config.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
-            maxRecordsInSinglePoll = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+        if(consumerConfig.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
+            maxRecordsInSinglePoll = consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
         } else {
             maxRecordsInSinglePoll = 1000;
         }
 
-        interceptors = (List) config.getConfiguredInstances(
+        interceptors = (List) consumerConfig.getConfiguredInstances(
                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
 
         this.properties = new Properties();
-        config.originals().forEach((k, v) -> properties.put(k, v));
+        consumerConfig.originals().forEach((k, v) -> properties.put(k, v));
         ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
         // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
         // all the acknowledgments sent to broker within a short time frame
@@ -352,7 +362,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 }
 
                 K key = getKey(topic, msg);
-                V value = valueDeserializer.deserialize(topic, msg.getData());
+                if (valueSchema instanceof PulsarKafkaSchema) {
+                    ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
+                }
+                V value = valueSchema.decode(msg.getData());
 
                 TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
                 long timestamp = msg.getPublishTime();
@@ -403,13 +416,18 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
             return null;
         }
 
-        if (keyDeserializer instanceof StringDeserializer) {
-            return (K) msg.getKey();
-        } else {
-            // Assume base64 encoding
-            byte[] data = Base64.getDecoder().decode(msg.getKey());
-            return keyDeserializer.deserialize(topic, data);
+        if (keySchema instanceof PulsarKafkaSchema) {
+            PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
+            Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
+            if (kafkaDeserializer instanceof StringDeserializer) {
+                return (K) msg.getKey();
+            }
+            pulsarKafkaSchema.setTopic(topic);
         }
+        // Assume base64 encoding
+        byte[] data = Base64.getDecoder().decode(msg.getKey());
+        return keySchema.decode(data);
+
     }
 
     @Override
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 00c49bb..1c5758f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -43,20 +43,21 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
 import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
-import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
-import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,59 +68,68 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
     private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
 
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
+    private final Schema<K> keySchema;
+    private final Schema<V> valueSchema;
 
     private final Partitioner partitioner;
     private volatile Cluster cluster = Cluster.empty();
 
     private List<ProducerInterceptor<K, V>> interceptors;
 
+    private final Properties properties;
+
     public PulsarKafkaProducer(Map<String, Object> configs) {
-        this(configs, null, null);
+        this(new ProducerConfig(configs), null, null);
     }
 
     public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer,
-            Serializer<V> valueSerializer) {
-        this(configs, new Properties(), keySerializer, valueSerializer);
+                               Serializer<V> valueSerializer) {
+        this(new ProducerConfig(configs), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
+    }
+
+    public PulsarKafkaProducer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ProducerConfig(configs), keySchema, valueSchema);
     }
 
     public PulsarKafkaProducer(Properties properties) {
-        this(properties, null, null);
+        this(new ProducerConfig(properties), null, null);
     }
 
     public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(new HashMap<>(), properties, keySerializer, valueSerializer);
+        this(new ProducerConfig(properties), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
     }
 
-    @SuppressWarnings({ "unchecked", "deprecation" })
-    private PulsarKafkaProducer(Map<String, Object> conf, Properties properties, Serializer<K> keySerializer,
-            Serializer<V> valueSerializer) {
-        properties.forEach((k, v) -> conf.put((String) k, v));
+    public PulsarKafkaProducer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ProducerConfig(properties), keySchema, valueSchema);
+    }
 
-        ProducerConfig producerConfig = new ProducerConfig(conf);
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
 
-        if (keySerializer == null) {
-            this.keySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                    Serializer.class);
-            this.keySerializer.configure(producerConfig.originals(), true);
+        if (keySchema == null) {
+            Serializer<K> kafkaKeySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+            kafkaKeySerializer.configure(producerConfig.originals(), true);
+            this.keySchema = new PulsarKafkaSchema<>(kafkaKeySerializer);
         } else {
-            this.keySerializer = keySerializer;
+            this.keySchema = keySchema;
             producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
         }
 
-        if (valueSerializer == null) {
-            this.valueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                    Serializer.class);
-            this.valueSerializer.configure(producerConfig.originals(), false);
+        if (valueSchema == null) {
+            Serializer<V> kafkaValueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+            kafkaValueSerializer.configure(producerConfig.originals(), false);
+            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueSerializer);
         } else {
-            this.valueSerializer = valueSerializer;
+            this.valueSchema = valueSchema;
             producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
         }
 
         partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
         partitioner.configure(producerConfig.originals());
 
+        this.properties = new Properties();
+        producerConfig.originals().forEach((k, v) -> properties.put(k, v));
+
         long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
 
         String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
@@ -275,7 +285,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
             // Add the partitions info for the new topic
             cluster = cluster.withPartitions(readPartitionsInfo(topic));
             List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
-                    .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySerializer, valueSerializer, topic))
+                    .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
                     .collect(Collectors.toList());
             return pulsarProducerBuilder.clone()
                     .topic(topic)
@@ -312,7 +322,10 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
             builder.eventTime(record.timestamp());
         }
 
-        byte[] value = valueSerializer.serialize(record.topic(), record.value());
+        if (valueSchema instanceof PulsarKafkaSchema) {
+            ((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
+        }
+        byte[] value = valueSchema.encode(record.value());
         builder.value(value);
 
         if (record.partition() != null) {
@@ -329,12 +342,14 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
     private String getKey(String topic, K key) {
         // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
-        if (keySerializer instanceof StringSerializer) {
+        if (key instanceof String) {
             return (String) key;
-        } else {
-            byte[] keyBytes = keySerializer.serialize(topic, key);
-            return Base64.getEncoder().encodeToString(keyBytes);
         }
+        if (keySchema instanceof PulsarKafkaSchema) {
+            ((PulsarKafkaSchema) keySchema).setTopic(topic);
+        }
+        byte[] keyBytes = keySchema.encode(key);
+        return Base64.getEncoder().encodeToString(keyBytes);
     }
 
     private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
index 24697cd..dbe827f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -67,9 +67,9 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
     final private org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor;
 
     // For serializer key/value, and to determine the deserializer for key/value.
-    private final Serializer<K> keySerializer;
+    private final Schema<K> keySchema;
 
-    private final Serializer<V> valueSerializer;
+    private final Schema<V> valueSchema;
 
     // Keep the topic, as each Pulsar producer will tie to a Kafka topic, and ProducerInterceptor will tie to a Pulsar
     // producer, it's safe to set it as final.
@@ -91,12 +91,12 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
      * @param topic                    Topic this {@link ProducerInterceptor} will be associated to.
      */
     public KafkaProducerInterceptorWrapper(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor,
-                                           Serializer<K> keySerializer,
-                                           Serializer<V> valueSerializer,
+                                           Schema<K> keySchema,
+                                           Schema<V> valueSchema,
                                            String topic) {
         this.kafkaProducerInterceptor = kafkaProducerInterceptor;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
+        this.keySchema = keySchema;
+        this.valueSchema = valueSchema;
         this.topic = topic;
     }
 
@@ -163,7 +163,10 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
     private Message<byte[]> toPulsarMessage(ProducerRecord<K, V> producerRecord) {
         TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, scheme);
         typedMessageBuilder.key(serializeKey(topic, producerRecord.key()));
-        typedMessageBuilder.value(valueSerializer.serialize(topic, producerRecord.value()));
+        if (valueSchema instanceof PulsarKafkaSchema) {
+            ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
+        }
+        typedMessageBuilder.value(valueSchema.encode(producerRecord.value()));
         typedMessageBuilder.eventTime(eventTime);
         typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, partitionID);
         return typedMessageBuilder.getMessage();
@@ -178,8 +181,14 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
      * @return Kafka record.
      */
     private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
-        Deserializer valueDeserializer = getDeserializer(valueSerializer);
-        V value = (V) valueDeserializer.deserialize(topic, message.getValue());
+        V value;
+        if (valueSchema instanceof PulsarKafkaSchema) {
+            PulsarKafkaSchema<V> pulsarKeyKafkaSchema = (PulsarKafkaSchema<V>) valueSchema;
+            Deserializer valueDeserializer = getDeserializer((pulsarKeyKafkaSchema.getKafkaSerializer()));
+            value = (V) valueDeserializer.deserialize(topic, message.getValue());
+        } else {
+            value = valueSchema.decode(message.getValue());
+        }
         try {
             scheme = (Schema<byte[]>) FieldUtils.readField(message, "schema", true);
             PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
@@ -198,22 +207,28 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
 
     private String serializeKey(String topic, K key) {
         // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
-        if (keySerializer instanceof StringSerializer) {
+        if (key instanceof String) {
             return (String) key;
-        } else {
-            byte[] keyBytes = keySerializer.serialize(topic, key);
-            return Base64.getEncoder().encodeToString(keyBytes);
         }
+        if (keySchema instanceof PulsarKafkaSchema) {
+            ((PulsarKafkaSchema<K>) keySchema).setTopic(topic);
+        }
+        byte[] keyBytes = keySchema.encode(key);
+        return Base64.getEncoder().encodeToString(keyBytes);
     }
 
     private K deserializeKey(String topic, String key) {
-        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
-        if (keySerializer instanceof StringSerializer) {
-            return (K) key;
-        } else {
-            Deserializer keyDeserializer = getDeserializer(keySerializer);
+        if (keySchema instanceof PulsarKafkaSchema) {
+            PulsarKafkaSchema<K> pulsarKeyKafkaSchema = (PulsarKafkaSchema<K>) keySchema;
+            // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+            if (pulsarKeyKafkaSchema.getKafkaSerializer() instanceof StringSerializer) {
+                return (K) key;
+            }
+
+            Deserializer keyDeserializer = getDeserializer(pulsarKeyKafkaSchema.getKafkaSerializer());
             return (K) keyDeserializer.deserialize(topic, Base64.getDecoder().decode(key));
         }
+        return keySchema.decode(Base64.getDecoder().decode(key));
     }
 
     /**
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
new file mode 100644
index 0000000..807f482
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PulsarKafkaSchema<T> implements Schema<T> {
+
+    private final Serializer<T> kafkaSerializer;
+
+    private final Deserializer<T> kafkaDeserializer;
+
+    private String topic;
+
+    public PulsarKafkaSchema(Serializer<T> serializer) {
+        this(serializer, null);
+    }
+
+    public PulsarKafkaSchema(Deserializer<T> deserializer) {
+        this(null, deserializer);
+    }
+
+    public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
+        this.kafkaSerializer = serializer;
+        this.kafkaDeserializer = deserializer;
+    }
+
+    public Serializer<T> getKafkaSerializer() {
+        return kafkaSerializer;
+    }
+
+    public Deserializer<T> getKafkaDeserializer() {
+        return kafkaDeserializer;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public byte[] encode(T message) {
+        checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
+        return kafkaSerializer.serialize(this.topic, message);
+    }
+
+    @Override
+    public T decode(byte[] message) {
+        checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
+        return kafkaDeserializer.deserialize(this.topic, message);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return Schema.BYTES.getSchemaInfo();
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 2c4af1e..1ded3c6 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -18,19 +18,23 @@
  */
 package org.apache.kafka.clients.producer;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
@@ -64,6 +68,24 @@ import static org.mockito.Mockito.when;
 @PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
 public class PulsarKafkaProducerTest {
 
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        @Nullable
+        private String field1;
+        @Nullable
+        private String field2;
+        private int field3;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Bar {
+        private boolean field1;
+    }
+
     @ObjectFactory
     // Necessary to make PowerMockito.mockStatic work with TestNG.
     public IObjectFactory getObjectFactory() {
@@ -103,7 +125,7 @@ public class PulsarKafkaProducerTest {
         properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
         properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
 
-        new PulsarKafkaProducer<>(properties, null, null);
+        new PulsarKafkaProducer<>(properties);
 
         verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
         verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
@@ -149,7 +171,7 @@ public class PulsarKafkaProducerTest {
         properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
 
         // Act
-        PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+        PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties);
 
         pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));
 
@@ -157,6 +179,65 @@ public class PulsarKafkaProducerTest {
         verify(mockProducerBuilder, times(1)).intercept(anyVararg());
     }
 
+    @Test
+    public void testPulsarKafkaSendAvro() throws PulsarClientException {
+        // Arrange
+        PulsarClient mockClient = mock(PulsarClient.class);
+        ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+        org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        CompletableFuture mockPartitionFuture = new CompletableFuture();
+        CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+        TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+        mockPartitionFuture.complete(new ArrayList<>());
+        mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+        doReturn(mockClient).when(mockClientBuilder).build();
+        doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(anyVararg());
+        doReturn(mockProducer).when(mockProducerBuilder).create();
+        doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+        doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+        PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+        PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+        when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+        Properties properties = new Properties();
+        List interceptors =  new ArrayList();
+        interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        // Act
+        PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
+
+        // Verify
+        verify(mockTypedMessageBuilder, times(1)).sendAsync();
+        verify(mockProducerBuilder, times(1)).intercept(anyVararg());
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
     public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
         Properties properties = new Properties();
@@ -166,7 +247,7 @@ public class PulsarKafkaProducerTest {
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
         properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
 
-        new PulsarKafkaProducer<>(properties, null, null);
+        new PulsarKafkaProducer<>(properties);
     }
 
     public static class PulsarKafkaProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index aadfce8..0f15691 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ProducerInterceptors;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
@@ -97,9 +98,12 @@ public class KafkaProducerInterceptorWrapperTest {
             }
         }).when(mockInterceptor2).onSend(any(ProducerRecord.class));
 
+
+        Schema<String> pulsarKeySerializeSchema = new PulsarKafkaSchema<>(new StringSerializer());
+        Schema<byte[]> pulsarValueSerializeSchema = new PulsarKafkaSchema<>(new ByteArraySerializer());
         ProducerInterceptors producerInterceptors = new ProducerInterceptors(Arrays.asList(new ProducerInterceptor[]{
-                new KafkaProducerInterceptorWrapper(mockInterceptor1, new StringSerializer(), new ByteArraySerializer(), topic),
-                new KafkaProducerInterceptorWrapper(mockInterceptor2, new StringSerializer(), new ByteArraySerializer(), topic)}));
+                new KafkaProducerInterceptorWrapper(mockInterceptor1, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic),
+                new KafkaProducerInterceptorWrapper(mockInterceptor2, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic)}));
 
         TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, new BytesSchema());
         typedMessageBuilder.key("original key");
diff --git a/tests/pulsar-kafka-compat-client-test/pom.xml b/tests/pulsar-kafka-compat-client-test/pom.xml
index 97b0859..0119d32 100644
--- a/tests/pulsar-kafka-compat-client-test/pom.xml
+++ b/tests/pulsar-kafka-compat-client-test/pom.xml
@@ -59,6 +59,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client-kafka</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
index 0e520de..632cee9 100644
--- a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
@@ -33,8 +33,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import lombok.Cleanup;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.avro.reflect.Nullable;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -53,12 +57,37 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
 import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
 public class KafkaApiTest extends PulsarStandaloneTestSuite {
 
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        @Nullable
+        private String field1;
+        @Nullable
+        private String field2;
+        private int field3;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Bar {
+        private boolean field1;
+    }
+
     private static String getPlainTextServiceUrl() {
         return container.getPlainTextServiceUrl();
     }
@@ -609,4 +638,252 @@ public class KafkaApiTest extends PulsarStandaloneTestSuite {
 
         producer.close();
     }
+
+    @Test
+    public void testProducerAvroSchemaWithPulsarKafkaClient() throws Exception {
+        String topic = "testProducerAvroSchemaWithPulsarKafkaClient";
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
+        org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer =
+                pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscribe();
+        Properties props = new Properties();
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
+        props.put("key.serializer", IntegerSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+
+        Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
+        for (int i = 0; i < 10; i++) {
+            Bar bar = new Bar();
+            bar.setField1(true);
+
+            Foo foo = new Foo();
+            foo.setField1("field1");
+            foo.setField2("field2");
+            foo.setField3(i);
+            producer.send(new ProducerRecord<Bar, Foo>(topic, bar, foo));
+        }
+        producer.flush();
+        producer.close();
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
+            Foo value = fooSchema.decode(msg.getValue());
+            Assert.assertEquals(value.getField1(), "field1");
+            Assert.assertEquals(value.getField2(), "field2");
+            Assert.assertEquals(value.getField3(), i);
+            pulsarConsumer.acknowledge(msg);
+        }
+    }
+
+    @Test
+    public void testConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
+        String topic = "testConsumerAvroSchemaWithPulsarKafkaClient";
+
+        StringSchema stringSchema = new StringSchema();
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        @Cleanup
+        Consumer<String, Foo> consumer = new KafkaConsumer<String, Foo>(props, new StringSchema(), fooSchema);
+        consumer.subscribe(Arrays.asList(topic));
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
+        org.apache.pulsar.client.api.Producer<Foo> pulsarProducer = pulsarClient.newProducer(fooSchema).topic(topic).create();
+
+        for (int i = 0; i < 10; i++) {
+            Foo foo = new Foo();
+            foo.setField1("field1");
+            foo.setField2("field2");
+            foo.setField3(i);
+            pulsarProducer.newMessage().keyBytes(stringSchema.encode(Integer.toString(i))).value(foo).send();
+        }
+
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
+            ConsumerRecords<String, Foo> records = consumer.poll(100);
+            if (!records.isEmpty()) {
+                records.forEach(record -> {
+                    Assert.assertEquals(record.key(), Integer.toString(received.get()));
+                    Foo value = record.value();
+                    Assert.assertEquals(value.getField1(), "field1");
+                    Assert.assertEquals(value.getField2(), "field2");
+                    Assert.assertEquals(value.getField3(), received.get());
+                    received.incrementAndGet();
+                });
+
+                consumer.commitSync();
+            }
+        }
+    }
+
+    @Test
+    public void testProducerConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
+        String topic = "testProducerConsumerAvroSchemaWithPulsarKafkaClient";
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.serializer", IntegerSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        @Cleanup
+        Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
+        consumer.subscribe(Arrays.asList(topic));
+
+        Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
+
+        for (int i = 0; i < 10; i++) {
+            Bar bar = new Bar();
+            bar.setField1(true);
+
+            Foo foo = new Foo();
+            foo.setField1("field1");
+            foo.setField2("field2");
+            foo.setField3(i);
+            producer.send(new ProducerRecord<>(topic, bar, foo));
+        }
+        producer.flush();
+        producer.close();
+
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
+            ConsumerRecords<Bar, Foo> records = consumer.poll(100);
+            if (!records.isEmpty()) {
+                records.forEach(record -> {
+                    Bar key = record.key();
+                    Assert.assertTrue(key.isField1());
+                    Foo value = record.value();
+                    Assert.assertEquals(value.getField1(), "field1");
+                    Assert.assertEquals(value.getField2(), "field2");
+                    Assert.assertEquals(value.getField3(), received.get());
+                    received.incrementAndGet();
+                });
+
+                consumer.commitSync();
+            }
+        }
+    }
+
+    @Test
+    public void testProducerConsumerJsonSchemaWithPulsarKafkaClient() throws Exception {
+        String topic = "testProducerConsumerJsonSchemaWithPulsarKafkaClient";
+
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.serializer", IntegerSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        @Cleanup
+        Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
+        consumer.subscribe(Arrays.asList(topic));
+
+        Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
+
+        for (int i = 0; i < 10; i++) {
+            Bar bar = new Bar();
+            bar.setField1(true);
+
+            Foo foo = new Foo();
+            foo.setField1("field1");
+            foo.setField2("field2");
+            foo.setField3(i);
+            producer.send(new ProducerRecord<>(topic, bar, foo));
+        }
+        producer.flush();
+        producer.close();
+
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
+            ConsumerRecords<Bar, Foo> records = consumer.poll(100);
+            if (!records.isEmpty()) {
+                records.forEach(record -> {
+                    Bar key = record.key();
+                    Assert.assertTrue(key.isField1());
+                    Foo value = record.value();
+                    Assert.assertEquals(value.getField1(), "field1");
+                    Assert.assertEquals(value.getField2(), "field2");
+                    Assert.assertEquals(value.getField3(), received.get());
+                    received.incrementAndGet();
+                });
+
+                consumer.commitSync();
+            }
+        }
+    }
+
+    @Test
+    public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
+        String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
+
+        Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
+        JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.serializer", IntegerSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        @Cleanup
+        Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
+        consumer.subscribe(Arrays.asList(topic));
+
+        Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
+
+        for (int i = 0; i < 10; i++) {
+            Foo foo = new Foo();
+            foo.setField1("field1");
+            foo.setField2("field2");
+            foo.setField3(i);
+            producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
+        }
+        producer.flush();
+        producer.close();
+
+        AtomicInteger received = new AtomicInteger();
+        while (received.get() < 10) {
+            ConsumerRecords<String, Foo> records = consumer.poll(100);
+            if (!records.isEmpty()) {
+                records.forEach(record -> {
+                    String key = record.key();
+                    Assert.assertEquals(key, "hello" + received.get());
+                    Foo value = record.value();
+                    Assert.assertEquals(value.getField1(), "field1");
+                    Assert.assertEquals(value.getField2(), "field2");
+                    Assert.assertEquals(value.getField3(), received.get());
+                    received.incrementAndGet();
+                });
+
+                consumer.commitSync();
+            }
+        }
+    }
 }