You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/06 03:10:49 UTC

[pulsar] branch master updated: Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f6ed037  Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)
f6ed037 is described below

commit f6ed0377fb47705a2a339bfcc72e8a519b266fb0
Author: Xiaobing Fang <bi...@qq.com>
AuthorDate: Tue Aug 6 11:10:43 2019 +0800

    Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)
    
    Fixes #4791
    
    **Motivation**
    
    Currently the Pulsar Kafka wrapper is using Kafka 0.10.x version. However, there are users who use legacy-kafka version in their system and willing to move to pulsar. This PR provides pulsar-kafka adapter for kafka-api-version 0.9.X. So, this adapter can help users in their migration process from kafka-0.9 to pulsar.
---
 pulsar-client-kafka-compat/pom.xml                 |   3 +
 .../pulsar-client-kafka-shaded_0_9/pom.xml         | 271 +++++++++++++++++
 .../kafka/compat/examples/ProducerAvroExample.java |   1 +
 .../kafka/compat/examples/ProducerExample.java     |   1 +
 .../{ => pulsar-client-kafka-tests_0_9}/pom.xml    |  44 ++-
 .../compat/examples/ConsumerAvroExample.java}      |  39 ++-
 .../kafka/compat/examples/ConsumerExample.java}    |  44 +--
 .../kafka/compat/examples/ProducerAvroExample.java |   1 +
 .../kafka/compat/examples/ProducerExample.java     |   2 +-
 .../client/kafka/compat/examples/utils/Bar.java    |  30 ++
 .../client/kafka/compat/examples/utils/Foo.java    |  35 +++
 .../clients/consumer/PulsarKafkaConsumer.java      |   2 +-
 .../clients/producer/PulsarKafkaProducer.java      |   2 +-
 .../{ => pulsar-client-kafka_0_9}/pom.xml          |  45 ++-
 .../clients/consumer/PulsarKafkaConsumer.java      | 328 ++++++---------------
 .../clients/producer/PulsarKafkaProducer.java      | 131 +++-----
 .../client/kafka/compat/KafkaMessageRouter.java}   |  36 +--
 .../kafka/compat/PulsarClientKafkaConfig.java      | 117 ++++++++
 .../kafka/compat/PulsarConsumerKafkaConfig.java    |  72 +++++
 .../client/kafka/compat/PulsarKafkaSchema.java     |  77 +++++
 .../kafka/compat/PulsarProducerKafkaConfig.java    |  65 ++++
 .../clients/producer/PulsarKafkaProducerTest.java  | 192 ++++++++++++
 .../apache/pulsar/client/util}/MessageIdUtils.java |   2 +-
 23 files changed, 1143 insertions(+), 397 deletions(-)

diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml
index f25274e..569647f 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -38,7 +38,10 @@
 
   <modules>
     <module>pulsar-client-kafka</module>
+    <module>pulsar-client-kafka_0_9</module>
     <module>pulsar-client-kafka-shaded</module>
+    <module>pulsar-client-kafka-shaded_0_9</module>
     <module>pulsar-client-kafka-tests</module>
+    <module>pulsar-client-kafka-tests_0_9</module>
   </modules>
 </project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
new file mode 100644
index 0000000..a6a7ba3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
@@ -0,0 +1,271 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-kafka-compat</artifactId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-kafka_0_9</artifactId>
+  <name>Pulsar Kafka compatibility 0.9 :: API</name>
+
+  <description>Drop-in replacement for Kafka client library that publishes and consumes
+  messages on Pulsar topics</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <artifactSet>
+                <includes>
+                  <include>org.apache.kafka:kafka-clients</include>
+                  <include>org.apache.pulsar:pulsar-client-kafka_0_9-original</include>
+                  <include>org.apache.pulsar:pulsar-client-original</include>
+                  <include>org.apache.commons:commons-lang3</include>
+                  <include>commons-codec:commons-codec</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
+                  <include>commons-collections:commons-collections</include>
+                  <include>org.asynchttpclient:*</include>
+                  <include>io.netty:netty-codec-http</include>
+                  <include>io.netty:netty-transport-native-epoll</include>
+                  <include>org.reactivestreams:reactive-streams</include>
+                  <include>com.typesafe.netty:netty-reactive-streams</include>
+                  <include>org.javassist:javassist</include>
+                  <include>com.google.protobuf:protobuf-java</include>
+                  <include>com.google.guava:guava</include>
+                  <include>com.google.code.gson:gson</include>
+                  <include>com.fasterxml.jackson.core</include>
+                  <include>com.fasterxml.jackson.module</include>
+                  <include>com.fasterxml.jackson.dataformat</include>
+                  <include>io.netty:netty</include>
+                  <include>io.netty:netty-*</include>
+                  <include>org.apache.pulsar:pulsar-common</include>
+                  <include>org.apache.bookkeeper:circe-checksum</include>
+                  <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
+                  <include>org.apache.httpcomponents:httpcore</include>
+                  <include>org.eclipse.jetty:*</include>
+                  <include>com.yahoo.datasketches:*</include>
+                  <include>commons-*:*</include>
+                  <include>org.yaml:snakeyaml</include>
+                  <include>org.objenesis:*</include>
+
+                  <include>org.apache.avro:*</include>
+                  <!-- Avro transitive dependencies-->
+                  <include>org.codehaus.jackson:jackson-core-asl</include>
+                  <include>org.codehaus.jackson:jackson-mapper-asl</include>
+                  <include>com.thoughtworks.paranamer:paranamer</include>
+                  <include>org.xerial.snappy:snappy-java</include>
+                  <include>org.apache.commons:commons-compress</include>
+                  <include>org.tukaani:xz</include>
+                </includes>
+              </artifactSet>
+               <filters>
+                <filter>
+                   <artifact>commons-logging:commons-logging</artifact>
+                   <includes>
+                       <include>**</include>
+                   </includes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.producer.KafkaProducer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.producer.OriginalKafkaProducer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.producer.PulsarKafkaProducer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.producer.KafkaProducer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.KafkaConsumer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.consumer.OriginalKafkaConsumer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.consumer.KafkaConsumer</shadedPattern>
+                </relocation>
+
+                <!-- General relocation rules for Pulsar client dependencies -->
+
+                <relocation>
+                  <pattern>org.asynchttpclient</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.commons</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.fasterxml.jackson</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>io.netty</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.datasketches</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.sketches</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
+                <relocation>
+                   <pattern>org.eclipse.jetty</pattern>
+                   <shadedPattern>org.apache.pulsar.shade.org.eclipse</shadedPattern>
+                </relocation>
+                <relocation>
+                   <pattern>org.reactivestreams</pattern>
+                   <shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.typesafe</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.typesafe</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.memory</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.memory</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.objenesis</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.objenesis</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.yaml</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.avro</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.avro</shadedPattern>
+                  <excludes>
+                    <exclude>org.apache.avro.reflect.AvroAlias</exclude>
+                    <exclude>org.apache.avro.reflect.AvroDefault</exclude>
+                    <exclude>org.apache.avro.reflect.AvroEncode</exclude>
+                    <exclude>org.apache.avro.reflect.AvroIgnore</exclude>
+                    <exclude>org.apache.avro.reflect.AvroMeta</exclude>
+                    <exclude>org.apache.avro.reflect.AvroName</exclude>
+                    <exclude>org.apache.avro.reflect.AvroSchema</exclude>
+                    <exclude>org.apache.avro.reflect.Nullable</exclude>
+                    <exclude>org.apache.avro.reflect.Stringable</exclude>
+                    <exclude>org.apache.avro.reflect.Union</exclude>
+                  </excludes>
+                </relocation>
+                <!--Avro transitive dependencies-->
+                <relocation>
+                  <pattern>org.codehaus.jackson</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.codehaus.jackson</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.thoughtworks.paranamer</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.thoughtworks.paranamer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.xerial.snappy</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.xerial.snappy</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.commons</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.tukaani</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.bookkeeper</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+                </relocation>
+              </relocations>
+              <filters>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <!-- This plugin is used to run a script after the package phase in order to rename
+            libnetty_transport_native_epoll_x86_64.so from Netty into
+            liborg_apache_pulsar_shade_netty_transport_native_epoll_x86_64.so
+            to reflect the shade that is being applied.
+         -->
+        <artifactId>exec-maven-plugin</artifactId>
+        <groupId>org.codehaus.mojo</groupId>
+        <executions>
+          <execution>
+            <id>rename-epoll-library</id>
+            <phase>package</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>${project.parent.basedir}/../src/${rename.netty.native.libs}</executable>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
index aa5e29a..5d9fcfc 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -61,6 +61,7 @@ public class ProducerAvroExample {
             log.info("Message {} sent successfully", i);
         }
 
+        producer.flush();
         producer.close();
     }
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
index a95413c..34e008e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -45,6 +45,7 @@ public class ProducerExample {
             log.info("Message {} sent successfully", i);
         }
 
+        producer.flush();
         producer.close();
     }
 
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
similarity index 52%
copy from pulsar-client-kafka-compat/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
index f25274e..c63c8dc 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
@@ -26,19 +26,45 @@
 
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
+    <artifactId>pulsar-client-kafka-compat</artifactId>
     <version>2.5.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pulsar-client-kafka-compat</artifactId>
-  <name>Pulsar Kafka compatibility</name>
+  <artifactId>pulsar-client-kafka_0_9-tests</artifactId>
+  <name>Pulsar Kafka compatibility 0.9 :: Tests</name>
 
-  <packaging>pom</packaging>
+  <description>Tests to verify the correct shading configuration for the pulsar-client-kafka wrapper</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-kafka_0_9</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+  </dependencies>
 
-  <modules>
-    <module>pulsar-client-kafka</module>
-    <module>pulsar-client-kafka-shaded</module>
-    <module>pulsar-client-kafka-tests</module>
-  </modules>
 </project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
similarity index 62%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
index aa5e29a..3e39b8d 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pulsar.client.kafka.compat.examples;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
@@ -30,17 +30,20 @@ import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Properties;
 
-public class ProducerAvroExample {
+public class ConsumerAvroExample {
+
     public static void main(String[] args) {
         String topic = "persistent://public/default/test-avro";
 
         Properties props = new Properties();
         props.put("bootstrap.servers", "pulsar://localhost:6650");
-
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", IntegerDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
 
         AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
         AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
@@ -53,16 +56,20 @@ public class ProducerAvroExample {
         foo.setField2("field2");
         foo.setField3(3);
 
+        @SuppressWarnings("resource")
+        Consumer<Foo, Bar> consumer = new KafkaConsumer<>(props, fooSchema, barSchema);
+        consumer.subscribe(Arrays.asList(topic));
 
-        Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
+        while (true) {
+            ConsumerRecords<Foo, Bar> records = consumer.poll(100);
+            records.forEach(record -> {
+                log.info("Received record: {}", record);
+            });
 
-        for (int i = 0; i < 10; i++) {
-            producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
-            log.info("Message {} sent successfully", i);
+            // Commit last offset
+            consumer.commitSync();
         }
-
-        producer.close();
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+    private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
similarity index 51%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
index a95413c..983d7b7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
@@ -18,35 +18,43 @@
  */
 package org.apache.pulsar.client.kafka.compat.examples;
 
+import java.util.Arrays;
 import java.util.Properties;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ProducerExample {
+public class ConsumerExample {
+
     public static void main(String[] args) {
         String topic = "persistent://public/default/test";
 
         Properties props = new Properties();
         props.put("bootstrap.servers", "pulsar://localhost:6650");
-
-        props.put("key.serializer", IntegerSerializer.class.getName());
-        props.put("value.serializer", StringSerializer.class.getName());
-
-        Producer<Integer, String> producer = new KafkaProducer<>(props);
-
-        for (int i = 0; i < 10; i++) {
-            producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
-            log.info("Message {} sent successfully", i);
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", IntegerDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        @SuppressWarnings("resource")
+        Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList(topic));
+
+        while (true) {
+            ConsumerRecords<Integer, String> records = consumer.poll(100);
+            records.forEach(record -> {
+                log.info("Received record: {}", record);
+            });
+
+            // Commit last offset
+            consumer.commitSync();
         }
-
-        producer.close();
     }
 
-    private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+    private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
similarity index 99%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
index aa5e29a..5d9fcfc 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -61,6 +61,7 @@ public class ProducerAvroExample {
             log.info("Message {} sent successfully", i);
         }
 
+        producer.flush();
         producer.close();
     }
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
similarity index 98%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
index a95413c..f089b26 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -34,7 +34,6 @@ public class ProducerExample {
 
         Properties props = new Properties();
         props.put("bootstrap.servers", "pulsar://localhost:6650");
-
         props.put("key.serializer", IntegerSerializer.class.getName());
         props.put("value.serializer", StringSerializer.class.getName());
 
@@ -45,6 +44,7 @@ public class ProducerExample {
             log.info("Message {} sent successfully", i);
         }
 
+        producer.flush();
         producer.close();
     }
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
new file mode 100644
index 0000000..8120900
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Bar {
+    private boolean field1;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
new file mode 100644
index 0000000..d584f51
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Foo {
+    @Nullable
+    private String field1;
+    @Nullable
+    private String field2;
+    private int field3;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 15e23a2..06b1c6e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -60,7 +60,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 14dd78b..cacca60 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -57,7 +57,7 @@ import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
 import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
similarity index 52%
copy from pulsar-client-kafka-compat/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
index f25274e..0740e78 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
@@ -26,19 +26,46 @@
 
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
+    <artifactId>pulsar-client-kafka-compat</artifactId>
     <version>2.5.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pulsar-client-kafka-compat</artifactId>
-  <name>Pulsar Kafka compatibility</name>
+  <properties>
+    <kafka_0_9.version>0.9.0.1</kafka_0_9.version>
+  </properties>
 
-  <packaging>pom</packaging>
+  <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+  <name>Pulsar Kafka compatibility 0.9 :: API (original)</name>
+
+  <description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka_0_9.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.lz4</groupId>
+          <artifactId>lz4-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.xerial.snappy</groupId>
+          <artifactId>snappy-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
 
-  <modules>
-    <module>pulsar-client-kafka</module>
-    <module>pulsar-client-kafka-shaded</module>
-    <module>pulsar-client-kafka-tests</module>
-  </modules>
 </project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
similarity index 72%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 15e23a2..6d3c383 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -18,11 +18,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Base64;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -45,7 +42,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.pulsar.client.api.MessageListener;
@@ -60,11 +57,11 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
 import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
@@ -88,14 +85,13 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
     private final SubscriptionInitialPosition strategy;
 
-    private List<ConsumerInterceptor<K, V>> interceptors;
-
     private volatile boolean closed = false;
 
     private final int maxRecordsInSinglePoll;
 
     private final Properties properties;
 
+
     private static class QueueItem {
         final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
         final Message<byte[]> message;
@@ -117,7 +113,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
                                Deserializer<V> valueDeserializer) {
         this(new ConsumerConfig(configs),
-                new PulsarKafkaSchema<K>(keyDeserializer), new PulsarKafkaSchema<V>(valueDeserializer));
+                new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
     }
 
     public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
@@ -168,18 +164,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
         String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
 
-        // If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
-        if(consumerConfig.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
-            maxRecordsInSinglePoll = consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
-        } else {
-            maxRecordsInSinglePoll = 1000;
-        }
-
-        interceptors = (List) consumerConfig.getConfiguredInstances(
-                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
+        // there is not this config in kafka 0.9, so use default value.
+        maxRecordsInSinglePoll = 1000;
 
         this.properties = new Properties();
-        consumerConfig.originals().forEach((k, v) -> properties.put(k, v));
+        consumerConfig.originals().forEach(properties::put);
         ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
         // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
         // all the acknowledgments sent to broker within a short time frame
@@ -192,26 +181,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     private SubscriptionInitialPosition getStrategy(final String strategy) {
-        switch(strategy) {
-    	    case "earliest":
-    	        return SubscriptionInitialPosition.Earliest;
-    	    default:
-                return SubscriptionInitialPosition.Latest;
-    	}
-    }
-    
-    @Override
-    public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
-        // Block listener thread if the application is slowing down
-        try {
-            receivedMessages.put(new QueueItem(consumer, msg));
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            if (closed) {
-                // Consumer was closed and the thread was interrupted. Nothing to worry about here
-            } else {
-                throw new RuntimeException(e);
-            }
+        if ("earliest".equals(strategy)) {
+            return SubscriptionInitialPosition.Earliest;
+        } else {
+            return SubscriptionInitialPosition.Latest;
         }
     }
 
@@ -220,24 +193,18 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         throw new UnsupportedOperationException("Cannot access the partitions assignements");
     }
 
-    /**
-     * Get the current subscription. Will return the same topics used in the most recent call to
-     * {@link #subscribe(Collection, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
-     *
-     * @return The set of topics currently subscribed to
-     */
     @Override
     public Set<String> subscription() {
         return consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
     }
 
     @Override
-    public void subscribe(Collection<String> topics) {
+    public void subscribe(List<String> topics) {
         subscribe(topics, null);
     }
 
     @Override
-    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+    public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
         List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
 
         List<TopicPartition> topicPartitions = new ArrayList<>();
@@ -251,6 +218,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 consumerBuilder.subscriptionType(SubscriptionType.Failover);
                 consumerBuilder.messageListener(this);
                 consumerBuilder.subscriptionName(groupId);
+                consumerBuilder.topics(topics);
                 if (numberOfPartitions > 1) {
                     // Subscribe to each partition
                     consumerBuilder.consumerName(ConsumerName.generateRandomName());
@@ -260,8 +228,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                                 .topic(partitionName).subscribeAsync();
                         int partitionIndex = i;
                         TopicPartition tp = new TopicPartition(
-                            TopicName.get(topic).getPartitionedTopicName(),
-                            partitionIndex);
+                                TopicName.get(topic).getPartitionedTopicName(),
+                                partitionIndex);
                         futures.add(future.thenApply(consumer -> {
                             log.info("Add consumer {} for partition {}", consumer, tp);
                             consumers.putIfAbsent(tp, consumer);
@@ -274,8 +242,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                     CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
                             .subscribeAsync();
                     TopicPartition tp = new TopicPartition(
-                        TopicName.get(topic).getPartitionedTopicName(),
-                        0);
+                            TopicName.get(topic).getPartitionedTopicName(),
+                            0);
                     futures.add(future.thenApply(consumer -> {
                         log.info("Add consumer {} for partition {}", consumer, tp);
                         consumers.putIfAbsent(tp, consumer);
@@ -285,7 +253,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 }
             }
             unpolledPartitions.addAll(topicPartitions);
-            
+
             // Wait for all consumers to be ready
             futures.forEach(CompletableFuture::join);
 
@@ -309,17 +277,12 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
-    public void assign(Collection<TopicPartition> partitions) {
+    public void assign(List<TopicPartition> list) {
         throw new UnsupportedOperationException("Cannot manually assign partitions");
     }
 
     @Override
-    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
-        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
-    }
-
-    @Override
-    public void subscribe(Pattern pattern) {
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
         throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
     }
 
@@ -334,7 +297,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         });
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public ConsumerRecords<K, V> poll(long timeoutMillis) {
         try {
@@ -357,8 +319,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
                 TopicPartition tp = new TopicPartition(topic, partition);
                 if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
-                	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
-                	resetOffsets(tp);
+                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
+                    resetOffsets(tp);
                 }
 
                 K key = getKey(topic, msg);
@@ -367,17 +329,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 }
                 V value = valueSchema.decode(msg.getData());
 
-                TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
-                long timestamp = msg.getPublishTime();
-
-                if (msg.getEventTime() > 0) {
-                    // If we have Event time, use that in preference
-                    timestamp = msg.getEventTime();
-                    timestampType = TimestampType.CREATE_TIME;
-                }
-
-                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
-                        timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
+                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, key, value);
 
                 records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);
 
@@ -398,39 +350,13 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 commitAsync();
             }
 
-            // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
-            return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
+            return new ConsumerRecords<>(records);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
     }
 
     @Override
-    public ConsumerRecords<K, V> poll(Duration duration) {
-        return poll(duration.toMillis());
-    }
-
-    @SuppressWarnings("unchecked")
-    private K getKey(String topic, Message<byte[]> msg) {
-        if (!msg.hasKey()) {
-            return null;
-        }
-
-        if (keySchema instanceof PulsarKafkaSchema) {
-            PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
-            Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
-            if (kafkaDeserializer instanceof StringDeserializer) {
-                return (K) msg.getKey();
-            }
-            pulsarKafkaSchema.setTopic(topic);
-        }
-        // Assume base64 encoding
-        byte[] data = Base64.getDecoder().decode(msg.getKey());
-        return keySchema.decode(data);
-
-    }
-
-    @Override
     public void commitSync() {
         try {
             doCommitOffsets(getCurrentOffsetsMap()).get();
@@ -440,11 +366,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
-    public void commitSync(Duration duration) {
-        commitSync();
-    }
-
-    @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
         try {
             doCommitOffsets(offsets).get();
@@ -454,11 +375,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
-    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
-        commitSync(map);
-    }
-
-    @Override
     public void commitAsync() {
         doCommitOffsets(getCurrentOffsetsMap());
     }
@@ -483,7 +399,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        applyConsumerInterceptorsOnCommit(interceptors, offsets);
         offsets.forEach((topicPartition, offsetAndMetadata) -> {
             org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
             lastCommittedOffset.put(topicPartition, offsetAndMetadata);
@@ -503,43 +418,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         return offsets;
     }
 
-    /**
-     * Apply all onConsume methods in a list of ConsumerInterceptors.
-     * Catch any exception during the process.
-     *
-     * @param interceptors     Interceptors provided.
-     * @param consumerRecords  ConsumerRecords returned by calling {@link this#poll(long)}.
-     * @return                 ConsumerRecords after applying all ConsumerInterceptor in interceptors list.
-     */
-    private ConsumerRecords applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>> interceptors, ConsumerRecords consumerRecords) {
-        ConsumerRecords processedConsumerRecords = consumerRecords;
-        for (ConsumerInterceptor interceptor : interceptors) {
-            try {
-                processedConsumerRecords = interceptor.onConsume(processedConsumerRecords);
-            } catch (Exception e) {
-                log.warn("Error executing onConsume for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
-            }
-        }
-        return processedConsumerRecords;
-    }
-
-    /**
-     * Apply all onCommit methods in a list of ConsumerInterceptors.
-     * Catch any exception during the process.
-     *
-     * @param interceptors   Interceptors provided.
-     * @param offsets        Offsets need to be commit.
-     */
-    private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
-        for (ConsumerInterceptor interceptor : interceptors) {
-            try {
-                interceptor.onCommit(offsets);
-            } catch (Exception e) {
-                log.warn("Error executing onCommit for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
-            }
-        }
-    }
-
     @Override
     public void seek(TopicPartition partition, long offset) {
         MessageId msgId = MessageIdUtils.getMessageId(offset);
@@ -556,20 +434,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
-    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
-        seek(topicPartition, offsetAndMetadata.offset());
-    }
-
-    @Override
-    public void seekToBeginning(Collection<TopicPartition> partitions) {
+    public void seekToBeginning(TopicPartition... partitions) {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        if (partitions.isEmpty()) {
-            partitions = consumers.keySet();
+        if (partitions.length == 0) {
+            partitions = consumers.keySet().toArray(new TopicPartition[0]);
         }
         lastCommittedOffset.clear();
         lastReceivedOffset.clear();
-        
+
         for (TopicPartition tp : partitions) {
             org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
             if (c == null) {
@@ -584,11 +457,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
-    public void seekToEnd(Collection<TopicPartition> partitions) {
+    public void seekToEnd(TopicPartition... partitions) {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        if (partitions.isEmpty()) {
-            partitions = consumers.keySet();
+        if (partitions.length == 0) {
+            partitions = consumers.keySet().toArray(new TopicPartition[0]);
         }
         lastCommittedOffset.clear();
         lastReceivedOffset.clear();
@@ -616,42 +489,17 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
-    public long position(TopicPartition topicPartition, Duration duration) {
-        return position(topicPartition);
-    }
-
-    private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
-    	log.info("Resetting partition {} and seeking to {} position", partition, strategy);
-        if (strategy == SubscriptionInitialPosition.Earliest) {
-            seekToBeginning(Collections.singleton(partition));
-        } else {
-            seekToEnd(Collections.singleton(partition));
-        } 
-        return strategy;
-    }
-    
-    @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
         return lastCommittedOffset.get(partition);
     }
 
     @Override
-    public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
-        return committed(topicPartition);
-    }
-
-    @Override
     public Map<MetricName, ? extends Metric> metrics() {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public List<PartitionInfo> partitionsFor(String topic) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<PartitionInfo> partitionsFor(String s, Duration duration) {
+    public List<PartitionInfo> partitionsFor(String s) {
         throw new UnsupportedOperationException();
     }
 
@@ -661,62 +509,17 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
-    public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Set<TopicPartition> paused() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void pause(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void resume(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
+    public void pause(TopicPartition... topicPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
+    public void resume(TopicPartition... topicPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public void close() {
-        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void close(long timeout, TimeUnit unit) {
         try {
             closed = true;
 
@@ -724,20 +527,71 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 commitAsync();
             }
 
-            client.closeAsync().get(timeout, unit);
+            client.closeAsync().get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             throw new RuntimeException(e);
         }
     }
 
     @Override
-    public void close(Duration duration) {
-        close(duration.toMillis(), TimeUnit.MILLISECONDS);
+    public void wakeup() {
+        throw new UnsupportedOperationException();
     }
 
+    /**
+     * This method is called whenever a new message is received.
+     * <p>
+     * Messages are guaranteed to be delivered in order and from the same thread for a single consumer
+     * <p>
+     * This method will only be called once for each message, unless either application or broker crashes.
+     * <p>
+     * Application is responsible of handling any exception that could be thrown while processing the message.
+     *
+     * @param consumer the consumer that received the message
+     * @param msg
+     */
     @Override
-    public void wakeup() {
-        throw new UnsupportedOperationException();
+    public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
+        // Block listener thread if the application is slowing down
+        try {
+            receivedMessages.put(new QueueItem(consumer, msg));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            if (closed) {
+                // Consumer was closed and the thread was interrupted. Nothing to worry about here
+            } else {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
+        log.info("Resetting partition {} and seeking to {} position", partition, strategy);
+        if (strategy == SubscriptionInitialPosition.Earliest) {
+            seekToBeginning(partition);
+        } else {
+            seekToEnd(partition);
+        }
+        return strategy;
     }
 
+    @SuppressWarnings("unchecked")
+    private K getKey(String topic, Message<byte[]> msg) {
+        if (!msg.hasKey()) {
+            return null;
+        }
+
+        if (keySchema instanceof PulsarKafkaSchema) {
+            PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
+            Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
+            if (kafkaDeserializer instanceof StringDeserializer) {
+                return (K) msg.getKey();
+            }
+            pulsarKafkaSchema.setTopic(topic);
+        }
+        // Assume base64 encoding
+        byte[] data = Base64.getDecoder().decode(msg.getKey());
+        return keySchema.decode(data);
+
+    }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
similarity index 75%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 14dd78b..b853459 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -19,13 +19,13 @@
 package org.apache.kafka.clients.producer;
 
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import java.util.Base64;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -34,30 +34,25 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
-import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,10 +69,10 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
     private final Partitioner partitioner;
     private volatile Cluster cluster = Cluster.empty();
 
-    private List<ProducerInterceptor<K, V>> interceptors;
-
     private final Properties properties;
 
+    private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
+
     public PulsarKafkaProducer(Map<String, Object> configs) {
         this(new ProducerConfig(configs), null, null);
     }
@@ -128,7 +123,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         partitioner.configure(producerConfig.originals());
 
         this.properties = new Properties();
-        producerConfig.originals().forEach((k, v) -> properties.put(k, v));
+        producerConfig.originals().forEach(properties::put);
 
         long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
 
@@ -137,9 +132,13 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
             // Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
             // If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
             int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
-            client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS).build();
+            client = PulsarClientKafkaConfig.getClientBuilder(properties)
+                                            .serviceUrl(serviceUrl)
+                                            .keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS)
+                                            .build();
         } catch (ArithmeticException e) {
-            String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
+            String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. " +
+                    "Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
             logger.error(errorMessage);
             throw new IllegalArgumentException(errorMessage);
         } catch (PulsarClientException e) {
@@ -170,47 +169,20 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
         boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
         pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
-
-        interceptors = (List) producerConfig.getConfiguredInstances(
-                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
-    }
-
-    @Override
-    public void initTransactions() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void beginTransaction() throws ProducerFencedException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s) throws ProducerFencedException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void commitTransaction() throws ProducerFencedException {
-        throw new UnsupportedOperationException();
     }
 
-    @Override
-    public void abortTransaction() throws ProducerFencedException {
-        throw new UnsupportedOperationException();
-    }
 
     @Override
-    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
-        return send(record, null);
+    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
+        return send(producerRecord, null);
     }
 
     @Override
-    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
         org.apache.pulsar.client.api.Producer<byte[]> producer;
 
         try {
-            producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic));
+            producer = producers.computeIfAbsent(producerRecord.topic(), topic -> createNewProducer(topic));
         } catch (Exception e) {
             if (callback != null) {
                 callback.onCompletion(null, e);
@@ -220,12 +192,11 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
             return future;
         }
 
-        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
-        int messageSize = buildMessage(messageBuilder, record);
+        TypedMessageBuilder<byte[]> messageBuilder = buildMessage(producer, producerRecord);
 
         CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
         messageBuilder.sendAsync().thenAccept((messageId) -> {
-            future.complete(getRecordMetadata(record.topic(), messageBuilder, messageId, messageSize));
+            future.complete(getRecordMetadata(producerRecord.topic(), messageId));
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
@@ -251,7 +222,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
     }
 
     @Override
-    public List<PartitionInfo> partitionsFor(String topic) {
+    public List<PartitionInfo> partitionsFor(String s) {
         throw new UnsupportedOperationException();
     }
 
@@ -275,44 +246,41 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    @Override
-    public void close(Duration duration) {
-        close(duration.toMillis(), TimeUnit.MILLISECONDS);
-    }
-
     private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
         try {
             // Add the partitions info for the new topic
-            synchronized (this){
-                cluster = cluster.withPartitions(readPartitionsInfo(topic));
-            }
-            List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
-                    .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
-                    .collect(Collectors.toList());
+            cluster = addPartitionsInfo(cluster, topic);
             return pulsarProducerBuilder.clone()
                     .topic(topic)
-                    .intercept(wrappedInterceptors.toArray(new org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()]))
                     .create();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
     }
 
-    private Map<TopicPartition, PartitionInfo> readPartitionsInfo(String topic) {
+    /**
+     * Add the partitions info for the new topic.
+     * Need to ensure the atomicity of the update operation.
+     */
+    private synchronized Cluster addPartitionsInfo(Cluster cluster, String topic) {
         List<String> partitions = client.getPartitionsForTopic(topic).join();
-
-        Map<TopicPartition, PartitionInfo> partitionsInfo = new HashMap<>();
-
+        // read partitions info
+        Set<PartitionInfo> partitionsInfo = new HashSet<>();
         for (int i = 0; i < partitions.size(); i++) {
-            TopicPartition tp = new TopicPartition(topic, i);
-            PartitionInfo pi = new PartitionInfo(topic, i, null, null, null);
-            partitionsInfo.put(tp, pi);
+            partitionsInfo.add(new PartitionInfo(topic, i, null, null, null));
         }
-
-        return partitionsInfo;
+        // create cluster with new partitions info
+        Set<PartitionInfo> combinedPartitions = new HashSet<>();
+        if (cluster.partitionsForTopic(topic) != null) {
+            combinedPartitions.addAll(cluster.partitionsForTopic(topic));
+        }
+        combinedPartitions.addAll(partitionsInfo);
+        return new Cluster(cluster.nodes(), combinedPartitions, new HashSet(cluster.unauthorizedTopics()));
     }
 
-    private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K, V> record) {
+    private TypedMessageBuilder<byte[]> buildMessage(org.apache.pulsar.client.api.Producer<byte[]> producer, ProducerRecord<K, V> record) {
+        TypedMessageBuilder<byte[]> builder = producer.newMessage();
+
         byte[] keyBytes = null;
         if (record.key() != null) {
             String key = getKey(record.topic(), record.key());
@@ -320,10 +288,6 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
             builder.key(key);
         }
 
-        if (record.timestamp() != null) {
-            builder.eventTime(record.timestamp());
-        }
-
         if (valueSchema instanceof PulsarKafkaSchema) {
             ((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
         }
@@ -338,8 +302,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
             int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
             builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
         }
-
-        return value.length;
+        return builder;
     }
 
     private String getKey(String topic, K key) {
@@ -354,8 +317,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         return Base64.getEncoder().encodeToString(keyBytes);
     }
 
-    private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
-            int size) {
+    private RecordMetadata getRecordMetadata(String topic, MessageId messageId) {
         MessageIdImpl msgId = (MessageIdImpl) messageId;
 
         // Combine ledger id and entry id to form offset
@@ -363,9 +325,6 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         int partition = msgId.getPartitionIndex();
 
         TopicPartition tp = new TopicPartition(topic, partition);
-        TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
-        return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
+        return new RecordMetadata(tp, offset, 0L);
     }
-
-    private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
similarity index 50%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
index d8f7680..d3fb915 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
@@ -18,27 +18,27 @@
  */
 package org.apache.pulsar.client.kafka.compat;
 
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import java.util.concurrent.ThreadLocalRandom;
 
-public class MessageIdUtils {
-    public static final long getOffset(MessageId messageId) {
-        MessageIdImpl msgId = (MessageIdImpl) messageId;
-        long ledgerId = msgId.getLedgerId();
-        long entryId = msgId.getEntryId();
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
 
-        // Combine ledger id and entry id to form offset
-        // Use less than 32 bits to represent entry id since it will get
-        // rolled over way before overflowing the max int range
-        long offset = (ledgerId << 28) | entryId;
-        return offset;
-    }
+public class KafkaMessageRouter extends RoundRobinPartitionMessageRouterImpl {
+
+    public static final String PARTITION_ID = "pulsar.partition.id";
 
-    public static final MessageId getMessageId(long offset) {
-        // Demultiplex ledgerId and entryId from offset
-        long ledgerId = offset >>> 28;
-        long entryId = offset & 0x0F_FF_FF_FFL;
+    public KafkaMessageRouter(long maxBatchingDelayMs) {
+        super(HashingScheme.JavaStringHash, ThreadLocalRandom.current().nextInt(), true, maxBatchingDelayMs);
+    }
 
-        return new MessageIdImpl(ledgerId, entryId, -1);
+    @Override
+    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+        if (msg.hasProperty(PARTITION_ID)) {
+            return Integer.parseInt(msg.getProperty(PARTITION_ID));
+        } else {
+            return super.choosePartition(msg, metadata);
+        }
     }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
new file mode 100644
index 0000000..c38d93d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarClientKafkaConfig {
+
+    /// Config variables
+    public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
+    public static final String AUTHENTICATION_PARAMS_MAP = "pulsar.authentication.params.map";
+    public static final String AUTHENTICATION_PARAMS_STRING = "pulsar.authentication.params.string";
+    public static final String USE_TLS = "pulsar.use.tls";
+    public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
+    public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+    public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
+
+    public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
+    public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
+    public static final String NUM_IO_THREADS = "pulsar.num.io.threads";
+
+    public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker";
+
+    public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";
+
+    public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
+    public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
+
+    public static ClientBuilder getClientBuilder(Properties properties) {
+        ClientBuilder clientBuilder = PulsarClient.builder();
+
+        if (properties.containsKey(AUTHENTICATION_CLASS)) {
+            String className = properties.getProperty(AUTHENTICATION_CLASS);
+            try {
+                if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+                    String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+                    clientBuilder.authentication(className, authParamsString);
+                } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+                    Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+                    clientBuilder.authentication(className, authParams);
+                } else {
+                    @SuppressWarnings("unchecked")
+                    Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+                    Authentication auth = clazz.newInstance();
+                    clientBuilder.authentication(auth);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+        clientBuilder.allowTlsInsecureConnection(
+                Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+        clientBuilder.enableTlsHostnameVerification(
+                Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+        if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+            clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+        }
+
+        if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+            clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+                    TimeUnit.MILLISECONDS);
+        }
+
+        if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
+            clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+                    TimeUnit.SECONDS);
+        }
+
+        if (properties.containsKey(NUM_IO_THREADS)) {
+            clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+        }
+
+        if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
+            clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+        }
+
+        if (properties.containsKey(USE_TCP_NODELAY)) {
+            clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+        }
+
+        if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
+            clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+        }
+
+        if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
+            clientBuilder.maxNumberOfRejectedRequestPerConnection(
+                    Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
+        }
+
+        return clientBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
new file mode 100644
index 0000000..a527827
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+
+public class PulsarConsumerKafkaConfig {
+
+    /// Config variables
+    public static final String CONSUMER_NAME = "pulsar.consumer.name";
+    public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
+    public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = "pulsar.consumer.acknowledgments.group.time.millis";
+    public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
+    public static final String SUBSCRIPTION_TOPICS_MODE = "pulsar.consumer.subscription.topics.mode";
+
+    public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
+
+        if (properties.containsKey(CONSUMER_NAME)) {
+            consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
+        }
+
+        if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
+            consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+        }
+
+        if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
+            consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
+                    Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
+        }
+
+        if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) {
+            consumerBuilder.acknowledgmentGroupTime(
+                    Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
+        }
+
+        if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
+            RegexSubscriptionMode mode;
+            try {
+                mode = RegexSubscriptionMode.valueOf(properties.getProperty(SUBSCRIPTION_TOPICS_MODE));
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Illegal subscription mode, valid values are: "
+                    + Arrays.asList(RegexSubscriptionMode.values()));
+            }
+            consumerBuilder.subscriptionTopicsMode(mode);
+        }
+
+        return consumerBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
new file mode 100644
index 0000000..807f482
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PulsarKafkaSchema<T> implements Schema<T> {
+
+    private final Serializer<T> kafkaSerializer;
+
+    private final Deserializer<T> kafkaDeserializer;
+
+    private String topic;
+
+    public PulsarKafkaSchema(Serializer<T> serializer) {
+        this(serializer, null);
+    }
+
+    public PulsarKafkaSchema(Deserializer<T> deserializer) {
+        this(null, deserializer);
+    }
+
+    public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
+        this.kafkaSerializer = serializer;
+        this.kafkaDeserializer = deserializer;
+    }
+
+    public Serializer<T> getKafkaSerializer() {
+        return kafkaSerializer;
+    }
+
+    public Deserializer<T> getKafkaDeserializer() {
+        return kafkaDeserializer;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public byte[] encode(T message) {
+        checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
+        return kafkaSerializer.serialize(this.topic, message);
+    }
+
+    @Override
+    public T decode(byte[] message) {
+        checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
+        return kafkaDeserializer.deserialize(this.topic, message);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return Schema.BYTES.getSchemaInfo();
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
new file mode 100644
index 0000000..5a9a651
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarProducerKafkaConfig {
+
+    /// Config variables
+    public static final String PRODUCER_NAME = "pulsar.producer.name";
+    public static final String INITIAL_SEQUENCE_ID = "pulsar.producer.initial.sequence.id";
+
+    public static final String MAX_PENDING_MESSAGES = "pulsar.producer.max.pending.messages";
+    public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
+    public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
+    public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+
+    public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer();
+
+        if (properties.containsKey(PRODUCER_NAME)) {
+            producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
+        }
+
+        if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
+            producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+        }
+
+        if (properties.containsKey(MAX_PENDING_MESSAGES)) {
+            producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+        }
+
+        if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
+            producerBuilder.maxPendingMessagesAcrossPartitions(
+                    Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
+        }
+
+        producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
+
+        if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
+            producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+        }
+
+        return producerBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
new file mode 100644
index 0000000..4d12418
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
+public class PulsarKafkaProducerTest {
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        @Nullable
+        private String field1;
+        @Nullable
+        private String field2;
+        private int field3;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Bar {
+        private boolean field1;
+    }
+
+    @ObjectFactory
+    // Necessary to make PowerMockito.mockStatic work with TestNG.
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testPulsarKafkaProducer() {
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+            return mockProducerBuilder;
+        }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+            return mockClientBuilder;
+        }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+
+        PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+        PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+        when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
+
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+        new PulsarKafkaProducer<>(properties);
+
+        verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
+        verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testPulsarKafkaSendAvro() throws PulsarClientException {
+        // Arrange
+        PulsarClient mockClient = mock(PulsarClient.class);
+        ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+        org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        CompletableFuture mockPartitionFuture = new CompletableFuture();
+        CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+        TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+        mockPartitionFuture.complete(new ArrayList<>());
+        mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+        doReturn(mockClient).when(mockClientBuilder).build();
+        doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+        doReturn(mockProducer).when(mockProducerBuilder).create();
+        doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+        doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+        PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+        PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+        when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        // Act
+        PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, foo, bar));
+
+        // Verify
+        verify(mockTypedMessageBuilder).sendAsync();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
+    public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
+
+        new PulsarKafkaProducer<>(properties);
+    }
+
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
similarity index 97%
rename from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
index d8f7680..34920c3 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.kafka.compat;
+package org.apache.pulsar.client.util;
 
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;