You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/08/06 16:54:04 UTC

[pulsar] branch master updated: Add support of pulsar-kafka-adapter for kafka-0.8 api (#4797)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b185db  Add support of pulsar-kafka-adapter for kafka-0.8 api (#4797)
7b185db is described below

commit 7b185dbef65fbacf0b0b22e8411905aefec51479
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Aug 6 09:53:56 2019 -0700

    Add support of pulsar-kafka-adapter for kafka-0.8 api (#4797)
    
    * Add support pulsar-kafka-adapter for kafka-0.8 api
    
    clean up pulsar-kafka adapter
    
    add tests
    
    add low level consumer
    
    add simple consumer
    
    corrected pulsar-client-kafka_0_8
    
    fix the module name
    
    add batch and partitioned-topic support
    
    fix headers
    
    add getOffset api support
    
    added pulsarOffset request/response
    
     clean up
    
    * add pulsar-kafka integration test
    
    * use earliestTime offset
    
    * add default serializer/de
---
 pulsar-client-kafka-compat/pom.xml                 |   3 +
 .../pulsar-client-kafka-shaded_0_8/pom.xml         | 433 +++++++++++++++++++++
 .../pom.xml                                        |  52 ++-
 .../compat/examples/HighLevelConsumerExample.java  | 121 ++++++
 .../compat/examples/LowLevelConsumerExample.java   | 146 +++++++
 .../kafka/compat/examples/ProducerExample.java     | 131 +++++++
 .../client/kafka/compat/examples/utils/Tweet.java  |  67 ++++
 .../pulsar-client-kafka/pom.xml                    |   5 +
 .../pom.xml                                        |  61 ++-
 .../kafka/clients/consumer/ConsumerConnector.java  | 230 +++++++++++
 .../kafka/clients/consumer/ConsumerIterator.java   | 143 +++++++
 .../clients/consumer/PulsarConsumerConfig.java     |  28 ++
 .../clients/consumer/PulsarKafkaConsumer.java      |  30 ++
 .../kafka/clients/consumer/PulsarKafkaStream.java  |  68 ++++
 .../clients/consumer/PulsarMessageAndMetadata.java |  92 +++++
 .../clients/producer/PulsarClientKafkaConfig.java  | 169 ++++++++
 .../clients/producer/PulsarKafkaProducer.java      | 303 ++++++++++++++
 .../consumer/PulsarByteBufferMessageSet.java       |  93 +++++
 .../simple/consumer/PulsarFetchResponse.java       |  47 +++
 .../simple/consumer/PulsarKafkaSimpleConsumer.java | 354 +++++++++++++++++
 .../clients/simple/consumer/PulsarMessage.java     |  41 ++
 .../simple/consumer/PulsarMsgAndOffset.java        |  40 ++
 .../simple/consumer/PulsarOffsetCommitRequest.java |  61 +++
 .../consumer/PulsarOffsetCommitResponse.java       |  43 ++
 .../simple/consumer/PulsarOffsetFetchRequest.java  |  36 ++
 .../simple/consumer/PulsarOffsetFetchResponse.java |  40 ++
 .../consumer/PulsarOffsetMetadataAndError.java     |  44 +++
 .../simple/consumer/PulsarOffsetRequest.java       |  40 ++
 .../simple/consumer/PulsarOffsetResponse.java      |  52 +++
 .../simple/consumer/PulsarPartitionMetadata.java   |  51 +++
 .../simple/consumer/PulsarTopicMetadata.java       |  42 ++
 .../consumer/PulsarTopicMetadataResponse.java      |  71 ++++
 .../clients/consumer/PulsarKafkaConsumerTest.java  |  75 ++++
 .../clients/producer/PulsarKafkaProducerTest.java  | 124 ++++++
 .../kafka/test/KafkaProducerConsumerTest.java      | 247 ++++++++++++
 .../test/KafkaProducerSimpleConsumerTest.java      | 256 ++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   5 +
 .../pulsar/client/impl/ProducerBuilderImpl.java    |   5 +
 .../pulsar/client/impl/TopicMessageIdImpl.java     |   2 +-
 39 files changed, 3825 insertions(+), 26 deletions(-)

diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml
index 569647f..5e0e896 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -38,10 +38,13 @@
 
   <modules>
     <module>pulsar-client-kafka</module>
+    <module>pulsar-client-kafka_0_8</module>
     <module>pulsar-client-kafka_0_9</module>
     <module>pulsar-client-kafka-shaded</module>
+    <module>pulsar-client-kafka-shaded_0_8</module>
     <module>pulsar-client-kafka-shaded_0_9</module>
     <module>pulsar-client-kafka-tests</module>
+    <module>pulsar-client-kafka-tests_0_8</module>
     <module>pulsar-client-kafka-tests_0_9</module>
   </modules>
 </project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_8/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_8/pom.xml
new file mode 100644
index 0000000..6cf6c84
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_8/pom.xml
@@ -0,0 +1,433 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-kafka-compat</artifactId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-kafka_0_8</artifactId>
+  <name>Pulsar Kafka compatibility 0.8 :: API</name>
+
+  <description>Drop-in replacement for Kafka client library that publishes and consumes
+  messages on Pulsar topics</description>
+
+  <properties>
+     <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-kafka_0_8-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <artifactSet>
+                <includes>
+                  <include>org.apache.kafka:kafka_2.9.2</include>
+                  <include>org.apache.pulsar:pulsar-client-kafka_0_8-original</include>
+                  <include>org.apache.pulsar:pulsar-client-original</include>
+                  <include>org.apache.commons:commons-lang3</include>
+                  <include>commons-codec:commons-codec</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
+                  <include>commons-collections:commons-collections</include>
+                  <include>org.asynchttpclient:*</include>
+                  <include>io.netty:netty-codec-http</include>
+                  <include>io.netty:netty-transport-native-epoll</include>
+                  <include>org.reactivestreams:reactive-streams</include>
+                  <include>com.typesafe.netty:netty-reactive-streams</include>
+                  <include>org.javassist:javassist</include>
+                  <include>com.google.protobuf:protobuf-java</include>
+                  <include>com.google.guava:guava</include>
+                  <include>com.google.code.gson:gson</include>
+                  <include>com.fasterxml.jackson.core</include>
+                  <include>com.fasterxml.jackson.module</include>
+                  <include>com.fasterxml.jackson.dataformat</include>
+                  <include>io.netty:netty</include>
+                  <include>io.netty:netty-*</include>
+                  <include>org.apache.pulsar:pulsar-common</include>
+                  <include>org.apache.bookkeeper:circe-checksum</include>
+                  <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
+                  <include>org.apache.httpcomponents:httpcore</include>
+                  <include>org.eclipse.jetty:*</include>
+                  <include>com.yahoo.datasketches:*</include>
+                  <include>commons-*:*</include>
+                  <include>org.yaml:snakeyaml</include>
+                  <include>org.objenesis:*</include>
+
+                  <include>org.apache.avro:*</include>
+                  <!-- Avro transitive dependencies-->
+                  <include>org.codehaus.jackson:jackson-core-asl</include>
+                  <include>org.codehaus.jackson:jackson-mapper-asl</include>
+                  <include>com.thoughtworks.paranamer:paranamer</include>
+                  <include>org.xerial.snappy:snappy-java</include>
+                  <include>org.apache.commons:commons-compress</include>
+                  <include>org.tukaani:xz</include>
+                </includes>
+              </artifactSet>
+               <filters>
+                <filter>
+                   <artifact>commons-logging:commons-logging</artifact>
+                   <includes>
+                       <include>**</include>
+                   </includes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>kafka.javaapi.producer.Producer</pattern>
+                  <shadedPattern>kafka.javaapi.producer.OriginalProducer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.producer.PulsarKafkaProducer</pattern>
+                  <shadedPattern>kafka.javaapi.producer.Producer</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.consumer.ConsumerConnector</pattern>
+                  <shadedPattern>kafka.javaapi.consumer.OriginalConsumerConnector</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.ConsumerConnector</pattern>
+                  <shadedPattern>kafka.javaapi.consumer.ConsumerConnector</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.consumer.KafkaStream</pattern>
+                  <shadedPattern>kafka.consumer.OriginalKafkaStream</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.PulsarKafkaStream</pattern>
+                  <shadedPattern>kafka.consumer.KafkaStream</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.message.MessageAndMetadata</pattern>
+                  <shadedPattern>kafka.message.OriginalMessageAndMetadata</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.PulsarMessageAndMetadata</pattern>
+                  <shadedPattern>kafka.message.MessageAndMetadata</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.consumer.ConsumerIterator</pattern>
+                  <shadedPattern>kafka.consumer.OriginalConsumerIterator</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.ConsumerIterator</pattern>
+                  <shadedPattern>kafka.consumer.ConsumerIterator</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.consumer.Consumer</pattern>
+                  <shadedPattern>kafka.consumer.OriginalConsumer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
+                  <shadedPattern>kafka.consumer.Consumer</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.PulsarConsumerConfig</pattern>
+                  <shadedPattern>kafka.consumer.ConsumerConfig</shadedPattern>
+                </relocation>
+                
+                <!-- low-level consumer -->
+                <relocation>
+                  <pattern>kafka.javaapi.consumer.SimpleConsumer</pattern>
+                  <shadedPattern>kafka.javaapi.consumer.OriginalSimpleConsumer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarKafkaSimpleConsumer</pattern>
+                  <shadedPattern>kafka.javaapi.consumer.SimpleConsumer</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.OffsetFetchResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalOffsetFetchResponse</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetFetchResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OffsetFetchResponse</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.OffsetFetchRequest</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalOffsetFetchRequest</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetFetchRequest</pattern>
+                  <shadedPattern>kafka.javaapi.OffsetFetchRequest</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.OffsetCommitRequest</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalOffsetCommitRequest</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetCommitRequest</pattern>
+                  <shadedPattern>kafka.javaapi.OffsetCommitRequest</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.FetchResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalFetchResponse</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarFetchResponse</pattern>
+                  <shadedPattern>kafka.javaapi.FetchResponse</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.message.MessageAndOffset</pattern>
+                  <shadedPattern>kafka.message.MessageAndOffset</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.message.ByteBufferMessageSet</pattern>
+                  <shadedPattern>kafka.javaapi.message.OriginalByteBufferMessageSet</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarByteBufferMessageSet</pattern>
+                  <shadedPattern>kafka.javaapi.message.ByteBufferMessageSet</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.message.Message</pattern>
+                  <shadedPattern>kafka.message.OriginalMessage</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarMessage</pattern>
+                  <shadedPattern>kafka.message.Message</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.TopicMetadataResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalTopicMetadataResponse</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarTopicMetadataResponse</pattern>
+                  <shadedPattern>kafka.javaapi.TopicMetadataResponse</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.OffsetCommitResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalOffsetCommitResponse</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetCommitResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OffsetCommitResponse</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                 <pattern>kafka.common.OffsetMetadataAndError</pattern>
+                  <shadedPattern>kafka.common.OriginalOffsetMetadataAndError</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetMetadataAndError</pattern>
+                  <shadedPattern>kafka.common.OffsetMetadataAndError</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.OffsetResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalOffsetResponse</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetResponse</pattern>
+                  <shadedPattern>kafka.javaapi.OffsetResponse</shadedPattern>
+                </relocation>
+                
+                <relocation>
+                  <pattern>kafka.javaapi.OffsetRequest</pattern>
+                  <shadedPattern>kafka.javaapi.OriginalOffsetRequest</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetRequest</pattern>
+                  <shadedPattern>kafka.javaapi.OffsetRequest</shadedPattern>
+                </relocation>
+                
+                
+                <!-- General relocation rules for Pulsar client dependencies -->
+
+                <relocation>
+                  <pattern>org.asynchttpclient</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.commons</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.fasterxml.jackson</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>io.netty</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.datasketches</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.sketches</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
+                <relocation>
+                   <pattern>org.eclipse.jetty</pattern>
+                   <shadedPattern>org.apache.pulsar.shade.org.eclipse</shadedPattern>
+                </relocation>
+                <relocation>
+                   <pattern>org.reactivestreams</pattern>
+                   <shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.typesafe</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.typesafe</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.memory</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.memory</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.objenesis</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.objenesis</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.yaml</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.avro</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.avro</shadedPattern>
+                  <excludes>
+                    <exclude>org.apache.avro.reflect.AvroAlias</exclude>
+                    <exclude>org.apache.avro.reflect.AvroDefault</exclude>
+                    <exclude>org.apache.avro.reflect.AvroEncode</exclude>
+                    <exclude>org.apache.avro.reflect.AvroIgnore</exclude>
+                    <exclude>org.apache.avro.reflect.AvroMeta</exclude>
+                    <exclude>org.apache.avro.reflect.AvroName</exclude>
+                    <exclude>org.apache.avro.reflect.AvroSchema</exclude>
+                    <exclude>org.apache.avro.reflect.Nullable</exclude>
+                    <exclude>org.apache.avro.reflect.Stringable</exclude>
+                    <exclude>org.apache.avro.reflect.Union</exclude>
+                  </excludes>
+                </relocation>
+                <!--Avro transitive dependencies-->
+                <relocation>
+                  <pattern>org.codehaus.jackson</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.codehaus.jackson</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.thoughtworks.paranamer</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.thoughtworks.paranamer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.xerial.snappy</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.xerial.snappy</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.commons</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.tukaani</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.bookkeeper</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+                </relocation>
+              </relocations>
+              <filters>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <!-- This plugin is used to run a script after the package phase in order to rename
+            libnetty_transport_native_epoll_x86_64.so from Netty into
+            liborg_apache_pulsar_shade_netty_transport_native_epoll_x86_64.so
+            to reflect the shade that is being applied.
+         -->
+        <artifactId>exec-maven-plugin</artifactId>
+        <groupId>org.codehaus.mojo</groupId>
+        <executions>
+          <execution>
+            <id>rename-epoll-library</id>
+            <phase>package</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>${project.parent.basedir}/../src/${rename.netty.native.libs}</executable>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/pom.xml
similarity index 59%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/pom.xml
index e8cffb9..8b9a009 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/pom.xml
@@ -31,37 +31,47 @@
     <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pulsar-client-kafka-original</artifactId>
-  <name>Pulsar Kafka compatibility :: API (original)</name>
+  <artifactId>pulsar-client-kafka-tests_0_8</artifactId>
+  <name>Pulsar Kafka-0.8 compatibility :: Tests</name>
 
-  <description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
+  <description>Tests to verify the correct shading configuration for the pulsar-client-kafka wrapper</description>
 
   <dependencies>
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client</artifactId>
+      <artifactId>pulsar-client-kafka_0_8</artifactId>
       <version>${project.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>${kafka-client.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>net.jpountz.lz4</groupId>
-          <artifactId>lz4</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.lz4</groupId>
-          <artifactId>lz4-java</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.xerial.snappy</groupId>
-          <artifactId>snappy-java</artifactId>
-        </exclusion>
-      </exclusions>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+      <scope>test</scope>
     </dependency>
+    
   </dependencies>
 
 </project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/HighLevelConsumerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/HighLevelConsumerExample.java
new file mode 100644
index 0000000..b00e19d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/HighLevelConsumerExample.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+public class HighLevelConsumerExample {
+
+    static class Arguments {
+
+        @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = { "-u", "--service-url" }, description = "Service url", required = false)
+        public String serviceUrl = "pulsar://localhost:6650";
+
+        @Parameter(names = { "-t", "--topic-name" }, description = "Topic name", required = false)
+        public String topicName = "persistent://public/default/test";
+
+        @Parameter(names = { "-g", "--group-name" }, description = "Group name", required = false)
+        public String groupName = "high-level";
+
+        @Parameter(names = { "-m", "--total-messages" }, description = "total number message to publish")
+        public int totalMessages = 1;
+
+        @Parameter(names = { "-a", "--auto-commit-disable" }, description = "auto commit disable")
+        public boolean autoCommitDisable;
+    }
+
+    public static void main(String[] args) {
+
+        final Arguments arguments = new Arguments();
+        JCommander jc = new JCommander(arguments);
+        jc.setProgramName("pulsar-kafka-test");
+
+        try {
+            jc.parse(args);
+        } catch (ParameterException e) {
+            System.out.println(e.getMessage());
+            jc.usage();
+            System.exit(-1);
+        }
+
+        if (arguments.help) {
+            jc.usage();
+            System.exit(-1);
+        }
+
+        consumeMessage(arguments);
+    }
+
+    private static void consumeMessage(Arguments arguments) {
+
+        Properties properties = new Properties();
+        properties.put("zookeeper.connect", arguments.serviceUrl);
+        properties.put("group.id", arguments.groupName);
+        properties.put("consumer.id", "cons1");
+        properties.put("auto.commit.enable", Boolean.toString(!arguments.autoCommitDisable));
+        properties.put("auto.commit.interval.ms", "100");
+        properties.put("queued.max.message.chunks", "100");
+
+        ConsumerConfig conSConfig = new ConsumerConfig(properties);
+        ConsumerConnector connector = Consumer.createJavaConsumerConnector(conSConfig);
+        Map<String, Integer> topicCountMap = Collections.singletonMap(arguments.topicName, 2);
+        Map<String, List<KafkaStream<String, Tweet>>> streams = connector.createMessageStreams(topicCountMap,
+                new StringDecoder(null), new Tweet.TestDecoder());
+
+        int count = 0;
+        while (count < arguments.totalMessages || arguments.totalMessages == -1) {
+            for (int i = 0; i < streams.size(); i++) {
+                List<KafkaStream<String, Tweet>> kafkaStreams = streams.get(arguments.topicName);
+                for (KafkaStream<String, Tweet> kafkaStream : kafkaStreams) {
+                    for (MessageAndMetadata<String, Tweet> record : kafkaStream) {
+                        log.info("Received tweet: {}-{}", record.message().userName, record.message().message);
+                        count++;
+                    }
+                }
+            }
+        }
+
+        connector.shutdown();
+
+        log.info("successfully consumed message {}", count);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(HighLevelConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/LowLevelConsumerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/LowLevelConsumerExample.java
new file mode 100644
index 0000000..d77617d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/LowLevelConsumerExample.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.simple.consumer.PulsarMsgAndOffset;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet.TestDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetCommitRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+
+public class LowLevelConsumerExample {
+
+    static class Arguments {
+        @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+        boolean help;
+
+        @Parameter(names = { "-u", "--service-url" }, description = "Service url", required = false)
+        public String serviceUrl = "pulsar://localhost:6650";
+
+        @Parameter(names = { "-hu",
+                "--http-service-url" }, description = "Http ervice url to make admin api call", required = false)
+        public String httpServiceUrl = "pulsar://localhost:8080";
+
+        @Parameter(names = { "-t", "--topic-name" }, description = "Topic name", required = false)
+        public String topicName = "persistent://public/default/test";
+
+        @Parameter(names = { "-g", "--group-name" }, description = "Group name", required = false)
+        public String groupName = "low-level";
+
+        @Parameter(names = { "-m", "--total-messages" }, description = "total number message to publish")
+        public int totalMessages = 1;
+
+        @Parameter(names = { "-p",
+                "--partition-index" }, description = "Partition-index (-1 if topic is not partitioned)", required = false)
+        public int partitionIndex = -1;
+    }
+
+    public static void main(String[] args) {
+
+        final Arguments arguments = new Arguments();
+        JCommander jc = new JCommander(arguments);
+        jc.setProgramName("pulsar-kafka-test");
+
+        try {
+            jc.parse(args);
+        } catch (ParameterException e) {
+            System.out.println(e.getMessage());
+            jc.usage();
+            System.exit(-1);
+        }
+
+        if (arguments.help) {
+            jc.usage();
+            System.exit(-1);
+        }
+
+        try {
+            consumeMessage(arguments);
+        } catch (Exception e) {
+            log.error("Failed to consume message", e);
+        }
+    }
+
+    private static void consumeMessage(Arguments arguments) {
+
+        Properties properties = new Properties();
+        properties.put(SimpleConsumer.HTTP_SERVICE_URL, arguments.httpServiceUrl);
+        SimpleConsumer consumer = new SimpleConsumer(arguments.serviceUrl, 0, 0, 0, "clientId", properties);
+
+        long readOffset = kafka.api.OffsetRequest.EarliestTime();
+        kafka.api.FetchRequest fReq = new FetchRequestBuilder().clientId("c1")
+                .addFetch(arguments.topicName, arguments.partitionIndex, readOffset, 100000).build();
+        FetchResponse fetchResponse = consumer.fetch(fReq);
+
+        TestDecoder decoder = new TestDecoder();
+        int count = 0;
+        while (count < arguments.totalMessages || arguments.totalMessages == -1) {
+            // 1. Read from topic without subscription/consumer-group name.
+            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(arguments.topicName,
+                    arguments.partitionIndex)) {
+                MessageId msgIdOffset = (messageAndOffset instanceof PulsarMsgAndOffset)
+                        ? ((PulsarMsgAndOffset) messageAndOffset).getFullOffset()
+                        : null;
+                long currentOffset = messageAndOffset.offset();
+                if (currentOffset < readOffset) {
+                    continue;
+                }
+
+                ByteBuffer payload = messageAndOffset.message().payload();
+
+                byte[] bytes = new byte[payload.limit()];
+                payload.get(bytes);
+                Tweet tweet = decoder.fromBytes(bytes);
+                log.info("Received tweet: {}-{}", tweet.userName, tweet.message);
+                count++;
+
+                TopicAndPartition topicPartition = new TopicAndPartition(arguments.topicName, arguments.partitionIndex);
+                OffsetMetadataAndError offsetError = new OffsetMetadataAndError(msgIdOffset, null, (short) 0);
+                Map<TopicAndPartition, OffsetMetadataAndError> requestInfo = Collections.singletonMap(topicPartition,
+                        offsetError);
+                // 2. Commit offset for a given topic and subscription-name/consumer-name.
+                OffsetCommitRequest offsetReq = new OffsetCommitRequest(arguments.groupName, requestInfo, (short) -1, 0,
+                        "c1");
+                consumer.commitOffsets(offsetReq);
+            }
+        }
+
+        consumer.close();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(LowLevelConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
new file mode 100644
index 0000000..360f79d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet.TestEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.StringEncoder;
+
+public class ProducerExample {
+
+    public static final String BROKER_URL = "metadata.broker.list";
+    public static final String PRODUCER_TYPE = "producer.type";
+    public static final String SERIALIZER_CLASS = "serializer.class";
+    public static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+    public static final String PARTITIONER_CLASS = "partitioner.class";
+    public static final String COMPRESSION_CODEC = "compression.codec";
+    public static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+    public static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+    public static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+    public static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+    public static final String CLIENT_ID = "client.id";
+
+    static class Arguments {
+
+        @Parameter(names = { "-u", "--service-url" }, description = "Service url", required = false)
+        public String serviceUrl = "pulsar://localhost:6650";
+
+        @Parameter(names = { "-t", "--topic-name" }, description = "Topic name", required = false)
+        public String topicName = "persistent://public/default/test";
+
+        @Parameter(names = { "-m", "--total-messages" }, description = "total number message to publish")
+        public int totalMessages = 1;
+
+        @Parameter(names = { "-mn", "--message-name" }, description = "Message payload value", required = false)
+        public String messageValue = "Hello-world";
+
+        @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+        boolean help;
+    }
+
+    public static void main(String[] args) {
+
+        final Arguments arguments = new Arguments();
+        JCommander jc = new JCommander(arguments);
+        jc.setProgramName("pulsar-kafka-test");
+
+        try {
+            jc.parse(args);
+        } catch (ParameterException e) {
+            System.out.println(e.getMessage());
+            jc.usage();
+            System.exit(-1);
+        }
+
+        if (arguments.help) {
+            jc.usage();
+            System.exit(-1);
+        }
+
+        publishMessage(arguments);
+    }
+
+    private static void publishMessage(Arguments arguments) {
+        // (2) Create producer
+        Properties properties2 = new Properties();
+        properties2.put(BROKER_URL, arguments.serviceUrl);
+        properties2.put(PRODUCER_TYPE, "sync");
+        properties2.put(SERIALIZER_CLASS, TestEncoder.class.getName());
+        properties2.put(KEY_SERIALIZER_CLASS, StringEncoder.class.getName());
+        properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+        properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+        properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+        properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+        properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+        properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+        properties2.put(CLIENT_ID, "test");
+        ProducerConfig config = new ProducerConfig(properties2);
+        Producer<String, Tweet> producer = new Producer<>(config);
+
+        String name = "user";
+        String msg = arguments.messageValue;
+        for (int i = 0; i < arguments.totalMessages; i++) {
+            String sendMessage = msg + i;
+            Tweet tweet = new Tweet(name, sendMessage);
+            KeyedMessage<String, Tweet> message = new KeyedMessage<>(arguments.topicName, name, tweet);
+            producer.send(message);
+        }
+
+        producer.close();
+        log.info("Successfully published messages {}", arguments.totalMessages);
+
+    }
+
+    public static class TestPartitioner implements Partitioner {
+        @Override
+        public int partition(Object obj, int totalPartition) {
+            return obj.hashCode() % totalPartition;
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Tweet.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Tweet.java
new file mode 100644
index 0000000..d23411e
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Tweet.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+
+public class Tweet implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public String userName;
+    public String message;
+
+    public Tweet(String userName, String message) {
+        super();
+        this.userName = userName;
+        this.message = message;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(userName, message);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Tweet) {
+            Tweet tweet = (Tweet) obj;
+            return Objects.equal(this.userName, tweet.userName) && Objects.equal(this.message, tweet.message);
+        }
+        return false;
+    }
+
+    public static class TestEncoder implements Encoder<Tweet> {
+        @Override
+        public byte[] toBytes(Tweet tweet) {
+            return (tweet.userName + "," + tweet.message).getBytes();
+        }
+    }
+
+    public static class TestDecoder implements Decoder<Tweet> {
+        @Override
+        public Tweet fromBytes(byte[] input) {
+            String[] tokens = (new String(input)).split(",");
+            return new Tweet(tokens[0], tokens[1]);
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index e8cffb9..8f03825 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -42,6 +42,11 @@
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
     </dependency>
+    
+    <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.kafka</groupId>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/pom.xml
similarity index 56%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_8/pom.xml
index e8cffb9..dbc1108 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/pom.xml
@@ -30,23 +30,76 @@
     <version>2.5.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
+  
+  <properties>
+     <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
+  </properties>
 
-  <artifactId>pulsar-client-kafka-original</artifactId>
-  <name>Pulsar Kafka compatibility :: API (original)</name>
+  <artifactId>pulsar-client-kafka_0_8-original</artifactId>
+  <name>Pulsar Kafka compatibility 0.8 :: API (original)</name>
 
   <description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
 
   <dependencies>
+    
+     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    
+    
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    
+    
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
     </dependency>
+    
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-admin</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+    </dependency>
+    
+    <dependency>
+        <groupId>com.google.guava</groupId>
+        <artifactId>guava</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>${kafka-client.version}</version>
+      <artifactId>kafka_2.9.2</artifactId>
+      <version>${kafka_0_8.version}</version>
       <exclusions>
         <exclusion>
           <groupId>net.jpountz.lz4</groupId>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java
new file mode 100644
index 0000000..1ca0754
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.producer.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.TopicFilter;
+import kafka.serializer.Decoder;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * It replaces : kafka.javaapi.consumer.ConsumerConnector but not extending kafka-interface because its method has
+ * KafkaStream api signature and KafkaStream is a scala class which creates unresolvable dependency conflict src:
+ * https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+ */
+@Slf4j
+public class ConsumerConnector {
+
+    private final PulsarClient client;
+    private final boolean isAutoCommit;
+    private final ConsumerBuilder<byte[]> consumerBuilder;
+    private String clientId;
+    private String groupId;
+    @SuppressWarnings("rawtypes")
+    private final Set<PulsarKafkaStream> topicStreams;
+    private SubscriptionInitialPosition strategy = null;
+    private final ScheduledExecutorService executor;
+
+    public ConsumerConnector(ConsumerConfig config) {
+        checkNotNull(config, "ConsumerConfig can't be null");
+        clientId = config.clientId();
+        groupId = config.groupId();
+        isAutoCommit = config.autoCommitEnable();
+        if ("largest".equalsIgnoreCase(config.autoOffsetReset())) {
+            strategy = SubscriptionInitialPosition.Latest;
+        } else if ("smallest".equalsIgnoreCase(config.autoOffsetReset())) {
+            strategy = SubscriptionInitialPosition.Earliest;
+        }
+        String consumerId = !config.consumerId().isEmpty() ? config.consumerId().get() : null;
+        int maxMessage = config.queuedMaxMessages();
+        String serviceUrl = config.zkConnect();
+
+        Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
+                : new Properties();
+        try {
+            client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
+        } catch (PulsarClientException e) {
+            throw new IllegalArgumentException(
+                    "Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
+        }
+
+        topicStreams = Sets.newConcurrentHashSet();
+        consumerBuilder = client.newConsumer();
+        consumerBuilder.subscriptionName(groupId);
+        if (properties.containsKey("queued.max.message.chunks") && config.queuedMaxMessages() > 0) {
+            consumerBuilder.receiverQueueSize(maxMessage);
+        }
+        if (consumerId != null) {
+            consumerBuilder.consumerName(consumerId);
+        }
+        if (properties.containsKey("auto.commit.interval.ms") && config.autoCommitIntervalMs() > 0) {
+            consumerBuilder.acknowledgmentGroupTime(config.autoCommitIntervalMs(), TimeUnit.MILLISECONDS);
+        }
+        this.executor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-kafka"));
+    }
+
+    public <K, V> Map<String, List<PulsarKafkaStream<byte[], byte[]>>> createMessageStreams(
+            Map<String, Integer> topicCountMap) {
+        return createMessageStreamsByFilter(null, topicCountMap, null, null);
+    }
+
+    public <K, V> Map<String, List<PulsarKafkaStream<K, V>>> createMessageStreams(Map<String, Integer> topicCountMap,
+            Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
+        return createMessageStreamsByFilter(null, topicCountMap, keyDecoder, valueDecoder);
+    }
+
+    public <K, V> Map<String, List<PulsarKafkaStream<K, V>>> createMessageStreamsByFilter(TopicFilter topicFilter,
+            Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
+
+        Map<String, List<PulsarKafkaStream<K, V>>> streams = Maps.newHashMap();
+
+        topicCountMap.forEach((topic, count) -> {
+            try {
+                Consumer<byte[]> consumer = consumerBuilder.topic(topic).subscribe();
+                resetOffsets(consumer, strategy);
+                log.info("Creating stream for {}-{} with config {}", topic, groupId, consumerBuilder.toString());
+                for (int i = 0; i < count; i++) {
+                    PulsarKafkaStream<K, V> stream = new PulsarKafkaStream<>(keyDecoder, valueDecoder, consumer,
+                            isAutoCommit, clientId);
+                    // if multiple thread-count present then client expects multiple streams reading from the same
+                    // topic. so, create multiple stream using the same consumer
+                    streams.computeIfAbsent(topic, key -> Lists.newArrayList()).add(stream);
+                    topicStreams.add(stream);
+                }
+            } catch (PulsarClientException e) {
+                log.error("Failed to subscribe on topic {} with group-id {}, {}", topic, groupId, e.getMessage(), e);
+                throw new RuntimeException("Failed to subscribe on topic " + topic, e);
+            }
+        });
+        return streams;
+    }
+
+    public List<PulsarKafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter) {
+        throw new UnsupportedOperationException("method not supported");
+    }
+
+    public List<PulsarKafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int arg1) {
+        throw new UnsupportedOperationException("method not supported");
+    }
+
+    public <K, V> List<PulsarKafkaStream<K, V>> createMessageStreamsByFilter(TopicFilter topicFilter, int arg1,
+            Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
+        throw new UnsupportedOperationException("method not supported");
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<CompletableFuture<Void>> commitOffsetsAsync() {
+        return topicStreams.stream().map(stream -> (CompletableFuture<Void>) stream.commitOffsets())
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Commit the offsets of all broker partitions connected by this connector.
+     */
+    public void commitOffsets() {
+        commitOffsetsAsync();
+    }
+
+    public void commitOffsets(boolean retryOnFailure) {
+        FutureUtil.waitForAll(commitOffsetsAsync()).handle((res, ex) -> {
+            if (ex != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Failed to commit offset {}, retrying {}", ex.getMessage(), retryOnFailure);
+                }
+                if (retryOnFailure) {
+                    this.executor.schedule(() -> commitOffsets(retryOnFailure), 30, TimeUnit.SECONDS);
+                }
+            }
+            return null;
+        });
+    }
+
+    /**
+     * Shut down the connector
+     */
+    public void shutdown() {
+        if (executor != null) {
+            executor.shutdown();
+        }
+        if (topicStreams != null) {
+            topicStreams.forEach(stream -> {
+                try {
+                    stream.close();
+                } catch (Exception e) {
+                    log.warn("Failed to close stream {}, {}", stream, e.getMessage());
+                }
+            });
+        }
+        try {
+            client.close();
+        } catch (PulsarClientException e) {
+            log.warn("Failed to close client {}", e.getMessage());
+        }
+    }
+
+    private void resetOffsets(Consumer<byte[]> consumer, SubscriptionInitialPosition strategy) {
+        if (strategy == null) {
+            return;
+        }
+        log.info("Resetting partition {} for group-id {} and seeking to {} position", consumer.getTopic(),
+                consumer.getSubscription(), strategy);
+        try {
+            if (strategy == SubscriptionInitialPosition.Earliest) {
+                consumer.seek(MessageId.earliest);
+            } else {
+                consumer.seek(MessageId.latest);
+            }
+        } catch (PulsarClientException e) {
+            log.warn("Failed to reset offset for consumer {} to {}, {}", consumer.getTopic(), strategy,
+                    e.getMessage(), e);
+        }
+    }
+
+    @VisibleForTesting
+    public ConsumerBuilder<byte[]> getConsumerBuilder() {
+        return consumerBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerIterator.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerIterator.java
new file mode 100644
index 0000000..926e765
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerIterator.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Base64;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+
+import kafka.serializer.Decoder;
+import kafka.serializer.StringDecoder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ConsumerIterator<K, V> implements Iterator<PulsarMessageAndMetadata<K, V>>, AutoCloseable {
+
+    private final Consumer<byte[]> consumer;
+    private final ConcurrentLinkedQueue<Message<byte[]>> receivedMessages;
+    private final Optional<Decoder<K>> keyDeSerializer;
+    private final Optional<Decoder<V>> valueDeSerializer;
+    private final boolean isAutoCommit;
+    private volatile MessageId lastConsumedMessageId;
+    private static final kafka.serializer.DefaultDecoder DEFAULT_DECODER = new kafka.serializer.DefaultDecoder(null);
+
+    public ConsumerIterator(Consumer<byte[]> consumer, ConcurrentLinkedQueue<Message<byte[]>> receivedMessages,
+            Optional<Decoder<K>> keyDeSerializer, Optional<Decoder<V>> valueDeSerializer, boolean isAutoCommit) {
+        this.consumer = consumer;
+        this.receivedMessages = receivedMessages;
+        this.keyDeSerializer = keyDeSerializer;
+        this.valueDeSerializer = valueDeSerializer;
+        this.isAutoCommit = isAutoCommit;
+    }
+
+    @Override
+    public boolean hasNext() {
+        try {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.MILLISECONDS);
+            if (msg != null) {
+                receivedMessages.offer(msg);
+                return true;
+            }
+        } catch (PulsarClientException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to receive message for {}-{}, {}", consumer.getTopic(), consumer.getSubscription(),
+                        e.getMessage());
+            }
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public PulsarMessageAndMetadata<K, V> next() {
+
+        Message<byte[]> msg = receivedMessages.poll();
+        if (msg == null) {
+            try {
+                msg = consumer.receive();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to receive message for {}-{}, {}", consumer.getTopic(), consumer.getSubscription(),
+                        e.getMessage(), e);
+                throw new RuntimeException(
+                        "failed to receive message from " + consumer.getTopic() + "-" + consumer.getSubscription());
+            }
+        }
+
+        int partition = TopicName.getPartitionIndex(consumer.getTopic());
+        long offset = MessageIdUtils.getOffset(msg.getMessageId());
+        String key = msg.getKey();
+        byte[] value = msg.getValue();
+
+        K desKey = null;
+        V desValue = null;
+
+        if (StringUtils.isNotBlank(key)) {
+            if (keyDeSerializer.isPresent() && keyDeSerializer.get() instanceof StringDecoder) {
+                desKey = (K) key;
+            } else {
+                byte[] decodedBytes = Base64.getDecoder().decode(key);
+                desKey = keyDeSerializer.isPresent() ? keyDeSerializer.get().fromBytes(decodedBytes)
+                        : (K) DEFAULT_DECODER.fromBytes(decodedBytes);
+            }
+        }
+
+        if (value != null) {
+            desValue = valueDeSerializer.isPresent() ? valueDeSerializer.get().fromBytes(msg.getData())
+                    : (V) DEFAULT_DECODER.fromBytes(msg.getData());
+        }
+
+        PulsarMessageAndMetadata<K, V> msgAndMetadata = new PulsarMessageAndMetadata<>(consumer.getTopic(), partition,
+                null, offset, keyDeSerializer.orElse(null), valueDeSerializer.orElse(null), desKey, desValue);
+
+        if (isAutoCommit) {
+            // Commit the offset of previously dequeued messages
+            consumer.acknowledgeCumulativeAsync(msg);
+        }
+
+        lastConsumedMessageId = msg.getMessageId();
+        return msgAndMetadata;
+    }
+
+    protected CompletableFuture<Void> commitOffsets() {
+        MessageId msgId = lastConsumedMessageId;
+        if (msgId != null) {
+            return this.consumer.acknowledgeCumulativeAsync(msgId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (consumer != null) {
+            consumer.close();
+        }
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerConfig.java
new file mode 100644
index 0000000..8aa0359
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerConfig.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Properties;
+
+public class PulsarConsumerConfig extends kafka.consumer.ConsumerConfig {
+
+    public PulsarConsumerConfig(Properties originalProps) {
+        super(originalProps);
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
new file mode 100644
index 0000000..cdd62a4
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+/**
+ * This class replaces "kafka.consumer.Consumer" and not extending kafka-interface as it's a final scala class.
+ *
+ */
+public class PulsarKafkaConsumer {
+
+    public static ConsumerConnector createJavaConsumerConnector(PulsarConsumerConfig config) {
+        return new ConsumerConnector(config);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaStream.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaStream.java
new file mode 100644
index 0000000..3a1231d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaStream.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+
+import com.google.common.collect.Queues;
+
+import kafka.serializer.Decoder;
+
+/**
+ * We can't extends "kafka.consumer.KafkaStream<K,V>" because it's a scala class which brings ambiguous overriden
+ * methods that gives compilation errors.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class PulsarKafkaStream<K, V> implements Iterable<PulsarMessageAndMetadata<K, V>>, AutoCloseable {
+
+    private final Optional<Decoder<K>> keyDeSerializer;
+    private final Optional<Decoder<V>> valueDeSerializer;
+    private final ConsumerIterator<K, V> iterator;
+
+    private final ConcurrentLinkedQueue<Message<byte[]>> receivedMessages = Queues.newConcurrentLinkedQueue();
+
+    public PulsarKafkaStream(Decoder<K> keyDecoder, Decoder<V> valueDecoder, Consumer<byte[]> consumer,
+            boolean isAutoCommit, String clientId) {
+        this.keyDeSerializer = Optional.ofNullable(keyDecoder);
+        this.valueDeSerializer = Optional.ofNullable(valueDecoder);
+        this.iterator = new ConsumerIterator<>(consumer, receivedMessages, keyDeSerializer, valueDeSerializer,
+                isAutoCommit);
+    }
+
+    @Override
+    public ConsumerIterator<K, V> iterator() {
+        return iterator;
+    }
+
+    public CompletableFuture<Void> commitOffsets() {
+        return iterator.commitOffsets();
+    }
+
+    @Override
+    public void close() throws Exception {
+        iterator.close();
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarMessageAndMetadata.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarMessageAndMetadata.java
new file mode 100644
index 0000000..2c76501
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarMessageAndMetadata.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import kafka.message.Message;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.Decoder;
+
+public class PulsarMessageAndMetadata<K, V> extends MessageAndMetadata<K, V> {
+
+    private static final long serialVersionUID = 1L;
+    private final String topic;
+    private final int partition;
+    private final long offset;
+    private final K key;
+    private final V value;
+    private final Decoder<K> keyDecoder;
+    private final Decoder<V> valueDecoder;
+
+    public PulsarMessageAndMetadata(String topic, int partition, Message rawMessage, long offset, Decoder<K> keyDecoder,
+            Decoder<V> valueDecoder, K key, V value) {
+        super(topic, partition, rawMessage, offset, keyDecoder, valueDecoder);
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.keyDecoder = keyDecoder;
+        this.valueDecoder = valueDecoder;
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public String topic() {
+        return topic;
+    }
+
+    @Override
+    public int productArity() {
+        return 0;
+    }
+
+    @Override
+    public int partition() {
+        return partition;
+    }
+
+    @Override
+    public long offset() {
+        return offset;
+    }
+
+    @Override
+    public V message() {
+        return this.value;
+    }
+
+    @Override
+    public K key() {
+        return key;
+    }
+
+    @Override
+    public Decoder<V> valueDecoder() {
+        return this.valueDecoder;
+    }
+
+    @Override
+    public Decoder<K> keyDecoder() {
+        return this.keyDecoder;
+    }
+
+    @Override
+    public Message rawMessage$1() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java
new file mode 100644
index 0000000..c71d7a7
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarClientKafkaConfig {
+
+    /// Config variables
+    public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
+    public static final String AUTHENTICATION_PARAMS_MAP = "pulsar.authentication.params.map";
+    public static final String AUTHENTICATION_PARAMS_STRING = "pulsar.authentication.params.string";
+    public static final String USE_TLS = "pulsar.use.tls";
+    public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
+    public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+    public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
+
+    public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
+    public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
+    public static final String NUM_IO_THREADS = "pulsar.num.io.threads";
+
+    public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker";
+    public static final String ADMIN_SERVICE_URL = "pulsar.admin.service.url";
+
+    public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";
+
+    public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
+    public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
+
+    public static ClientBuilder getClientBuilder(Properties properties) {
+        ClientBuilder clientBuilder = PulsarClient.builder();
+        if (properties == null) {
+            return clientBuilder;
+        }
+
+        if (properties.containsKey(AUTHENTICATION_CLASS)) {
+            String className = properties.getProperty(AUTHENTICATION_CLASS);
+            try {
+                if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+                    String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+                    clientBuilder.authentication(className, authParamsString);
+                } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+                    Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+                    clientBuilder.authentication(className, authParams);
+                } else {
+                    @SuppressWarnings("unchecked")
+                    Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+                    Authentication auth = clazz.newInstance();
+                    clientBuilder.authentication(auth);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+        clientBuilder.allowTlsInsecureConnection(
+                Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+        clientBuilder.enableTlsHostnameVerification(
+                Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+        if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+            clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+        }
+
+        if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+            clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+                    TimeUnit.MILLISECONDS);
+        }
+
+        if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
+            clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+                    TimeUnit.SECONDS);
+        }
+
+        if (properties.containsKey(NUM_IO_THREADS)) {
+            clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+        }
+
+        if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
+            clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+        }
+
+        if (properties.containsKey(USE_TCP_NODELAY)) {
+            clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+        }
+
+        if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
+            clientBuilder
+                    .maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+        }
+
+        if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
+            clientBuilder.maxNumberOfRejectedRequestPerConnection(
+                    Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
+        }
+
+        return clientBuilder;
+    }
+
+    public static PulsarAdminBuilder getAdminBuilder(String serviceUrl, Properties properties) {
+        PulsarAdminBuilder adminBuilder = PulsarAdmin.builder();
+        if (properties == null) {
+            return adminBuilder;
+        }
+
+        adminBuilder.serviceHttpUrl(properties.getProperty(ADMIN_SERVICE_URL, serviceUrl));
+
+        if (properties.containsKey(AUTHENTICATION_CLASS)) {
+            String className = properties.getProperty(AUTHENTICATION_CLASS);
+            try {
+                if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+                    String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+                    adminBuilder.authentication(className, authParamsString);
+                } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+                    Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+                    adminBuilder.authentication(className, authParams);
+                } else {
+                    @SuppressWarnings("unchecked")
+                    Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+                    Authentication auth = clazz.newInstance();
+                    adminBuilder.authentication(auth);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        adminBuilder.allowTlsInsecureConnection(
+                Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+        adminBuilder.enableTlsHostnameVerification(
+                Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+        if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+            adminBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+        }
+
+        if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+            adminBuilder.connectionTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+                    TimeUnit.MILLISECONDS);
+        }
+
+        return adminBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
new file mode 100644
index 0000000..bec2c35
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.Base64;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+
+//Questions
+/**
+ * 1. What's the auth method
+ * 2. What if message publish fails with async
+ */
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.DefaultPartitioner;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Encoder;
+import kafka.serializer.StringEncoder;
+import kafka.utils.VerifiableProperties;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarKafkaProducer<K, V> extends Producer<K, V> {
+
+    private final PulsarClient client;
+    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
+    private final Partitioner partitioner;
+    private final Encoder<K> keySerializer;
+    private final Encoder<V> valueSerializer;
+    private final boolean isSendAsync;
+
+    public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS = "queue.buffering.max.ms";
+    public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = "queue.buffering.max.messages";
+    public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages";
+    public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
+
+    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public PulsarKafkaProducer(ProducerConfig config) {
+        super((kafka.producer.Producer) null);
+        partitioner = config.partitionerClass() != null
+                ? newInstance(config.partitionerClass(), Partitioner.class, config.props())
+                : new DefaultPartitioner(config.props());
+        // kafka-config returns default serializer if client doesn't configure it
+        checkNotNull(config.keySerializerClass(), "key-serializer class can't be null");
+        checkNotNull(config.serializerClass(), "value-serializer class can't be null");
+        keySerializer = newInstance(config.keySerializerClass(), Encoder.class, config.props());
+        valueSerializer = newInstance(config.serializerClass(), Encoder.class, config.props());
+
+        Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
+                : new Properties();
+        String serviceUrl = config.brokerList();
+        try {
+            client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
+        } catch (PulsarClientException e) {
+            throw new IllegalArgumentException(
+                    "Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
+        }
+        pulsarProducerBuilder = client.newProducer();
+
+        // doc: https://kafka.apache.org/08/documentation.html#producerapi
+        // api-doc:
+        // https://github.com/apache/kafka/blob/0.8.2.2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+
+        // queue.enqueue.timeout.ms: The amount of time to block before dropping messages when running in async mode and
+        // the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or
+        // dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block
+        // indefinitely and never willingly drop a send.
+        boolean blockIfQueueFull = config.queueEnqueueTimeoutMs() == -1 ? true : false;
+        // This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values
+        // are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we
+        // allow batching together of requests (which is great for throughput) but open the possibility of a failure of
+        // the client machine dropping unsent data.
+        isSendAsync = "async".equalsIgnoreCase(config.producerType());
+        CompressionType compressionType = CompressionType.NONE;
+        // Valid values are "none", "gzip" and "snappy".
+        if ("gzip".equals(config.compressionCodec().name())) {
+            compressionType = CompressionType.ZLIB;
+        } else if ("snappy".equals(config.compressionCodec().name())) {
+            compressionType = CompressionType.SNAPPY;
+        }
+        long batchDelayMs = config.queueBufferingMaxMs();
+
+        if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES)) {
+            pulsarProducerBuilder.maxPendingMessages(config.queueBufferingMaxMessages());
+        }
+        if (properties.containsKey(KAFKA_KEY_MAX_BATCH_MESSAGES)) {
+            pulsarProducerBuilder.batchingMaxMessages(config.batchNumMessages());
+        }
+        if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS)) {
+            pulsarProducerBuilder.batchingMaxPublishDelay(batchDelayMs, TimeUnit.MILLISECONDS);
+        }
+        if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
+            pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), TimeUnit.MILLISECONDS);
+        }
+
+        pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType);
+
+    }
+
+    public PulsarKafkaProducer(kafka.producer.Producer<K, V> producer) {
+        this(producer.config());
+    }
+
+    @Override
+    public void send(KeyedMessage<K, V> message) {
+        org.apache.pulsar.client.api.Producer<byte[]> producer;
+
+        try {
+            producer = producers.computeIfAbsent(message.topic(), topic -> createNewProducer(topic));
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Failed to create producer for " + message.topic(), e);
+        }
+
+        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+        buildMessage(messageBuilder, message);
+
+        if (isSendAsync) {
+            // what if message publish fails:
+            // according to : https://kafka.apache.org/08/documentation.html#producerapi
+            // async: opens the possibility of a failure of the client machine dropping unsent data
+            messageBuilder.sendAsync().handle((res, ex) -> {
+                if (ex != null) {
+                    log.warn("publish failed for {}", producer.getTopic(), ex);
+                }
+                return null;
+            });
+        } else {
+            try {
+                messageBuilder.send();
+            } catch (PulsarClientException e) {
+                log.warn("publish failed for {}", producer.getTopic(), e);
+                throw new IllegalStateException("Failed to publish message " + message.topic(), e);
+            }
+        }
+
+    }
+
+    @Override
+    public void send(List<KeyedMessage<K, V>> messages) {
+        if (messages != null) {
+            messages.forEach(this::send);
+        }
+    }
+
+    private void buildMessage(TypedMessageBuilder<byte[]> builder, KeyedMessage<K, V> message) {
+        if (message.key() != null) {
+            String key = getKey(message.topic(), message.key());
+            builder.key(key);
+        }
+
+        byte[] value = valueSerializer.toBytes(message.message());
+        builder.value(value);
+    }
+
+    private String getKey(String topic, K key) {
+        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+        if (keySerializer!=null && keySerializer instanceof StringEncoder) {
+            return (String) key;
+        } else {
+            byte[] keyBytes = keySerializer.toBytes(key);
+            return Base64.getEncoder().encodeToString(keyBytes);
+        }
+    }
+
+    private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
+        try {
+            pulsarProducerBuilder.messageRoutingMode(MessageRoutingMode.CustomPartition);
+            pulsarProducerBuilder.messageRouter(new MessageRouter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+                    // https://kafka.apache.org/08/documentation.html#producerapi
+                    // The default partitioner is based on the hash of the key.
+                    return partitioner.partition(msg.getKey(), metadata.numPartitions());
+                }
+            });
+            log.info("Creating producer for topic {} with config {}", topic, pulsarProducerBuilder.toString());
+            return pulsarProducerBuilder.clone().topic(topic).create();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public <T> T newInstance(String key, Class<T> t, VerifiableProperties properties) {
+        Class<?> c = null;
+        try {
+            c = Class.forName(key);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("class not found for :" + key);
+        }
+        if (c == null)
+            return null;
+        Object o = newInstance(c, properties);
+        if (!t.isInstance(o)) {
+            throw new IllegalArgumentException(c.getName() + " is not an instance of " + t.getName());
+        }
+        return t.cast(o);
+    }
+
+    public static <T> T newInstance(Class<T> c, VerifiableProperties properties) {
+        try {
+            try {
+                Constructor<T> constructor = c.getConstructor(VerifiableProperties.class);
+                constructor.setAccessible(true);
+                return constructor.newInstance(properties);
+            } catch (Exception e) {
+                // Ok.. not a default implementation class
+            }
+            return c.newInstance();
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("Could not instantiate class " + c.getName(), e);
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException(
+                    "Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?",
+                    e);
+        } catch (NullPointerException e) {
+            throw new IllegalArgumentException("Requested class was null", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (producers != null) {
+            producers.forEach((topic, producer) -> {
+                try {
+                    producer.close();
+                } catch (PulsarClientException e) {
+                    log.warn("Failed to close producer for {}", topic, e.getMessage());
+                }
+            });
+        }
+        if (client != null) {
+            try {
+                client.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close pulsar-client {}", e.getMessage());
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public ProducerBuilder<byte[]> getPulsarProducerBuilder() {
+        return pulsarProducerBuilder;
+    }
+
+    @VisibleForTesting
+    public Partitioner getPartitioner() {
+        return partitioner;
+    }
+
+    @VisibleForTesting
+    public Encoder<K> getKeySerializer() {
+        return keySerializer;
+    }
+
+    @VisibleForTesting
+    public Encoder<V> getValueSerializer() {
+        return valueSerializer;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarByteBufferMessageSet.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarByteBufferMessageSet.java
new file mode 100644
index 0000000..68ee247
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarByteBufferMessageSet.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+
+import com.google.common.collect.Queues;
+
+import kafka.message.MessageAndOffset;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarByteBufferMessageSet extends kafka.javaapi.message.ByteBufferMessageSet {
+    private final MessageAndOffsetIterator iterator;
+
+    public PulsarByteBufferMessageSet(Reader<byte[]> reader) {
+        super(Collections.emptyList());
+        this.iterator = new MessageAndOffsetIterator(reader);
+    }
+
+    @Override
+    public Iterator<MessageAndOffset> iterator() {
+        return iterator;
+    }
+
+    public static class MessageAndOffsetIterator implements Iterator<MessageAndOffset> {
+
+        private final Reader<byte[]> reader;
+        private final ConcurrentLinkedQueue<org.apache.pulsar.client.api.Message<byte[]>> receivedMessages;
+
+        public MessageAndOffsetIterator(Reader<byte[]> reader) {
+            this.reader = reader;
+            this.receivedMessages = Queues.newConcurrentLinkedQueue();
+        }
+
+        @Override
+        public boolean hasNext() {
+            try {
+                org.apache.pulsar.client.api.Message<byte[]> msg = reader.readNext(10, TimeUnit.MILLISECONDS);
+                if (msg != null) {
+                    receivedMessages.offer(msg);
+                    return true;
+                }
+            } catch (PulsarClientException e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Failed to receive message for {}, {}", reader.getTopic(), e.getMessage());
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public PulsarMsgAndOffset next() {
+
+            org.apache.pulsar.client.api.Message<byte[]> msg = receivedMessages.poll();
+            if (msg == null) {
+                try {
+                    msg = reader.readNext();
+                } catch (PulsarClientException e) {
+                    log.warn("Failed to receive message for {}, {}", reader.getTopic(), e.getMessage(), e);
+                    throw new RuntimeException("failed to receive message from " + reader.getTopic());
+                }
+            }
+
+            String key = msg.getKey();
+            byte[] value = msg.getValue();
+
+            return new PulsarMsgAndOffset(new PulsarMessage(key, value), msg.getMessageId());
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarFetchResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarFetchResponse.java
new file mode 100644
index 0000000..6f47e8b
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarFetchResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import org.apache.pulsar.client.api.Reader;
+
+import kafka.javaapi.FetchResponse;
+
+public class PulsarFetchResponse extends FetchResponse {
+
+    private final Map<String, Reader<byte[]>> topicReaderMap;
+    private boolean hasError = false;
+
+    public PulsarFetchResponse(Map<String, Reader<byte[]>> topicConsumerMap, boolean hasError) {
+        super(null);
+        this.topicReaderMap = topicConsumerMap;
+        this.hasError = hasError;
+    }
+
+    @Override
+    public boolean hasError() {
+        return this.hasError;
+    }
+
+    @Override
+    public PulsarByteBufferMessageSet messageSet(String topic, int partition) {
+        return new PulsarByteBufferMessageSet(topicReaderMap.get(topic));
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java
new file mode 100644
index 0000000..b2d39a3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kafka.clients.producer.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
+
+import com.google.common.collect.Maps;
+
+import kafka.api.FetchRequest;
+import kafka.api.PartitionFetchInfo;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetCommitResponse;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Note: <br/>
+ * - SimpleConsumer doesn't work well with pulsar-batch messages because client-app uses raw-offset to commit offset for
+ * a given group-id. <br/>
+ * - In order to work with partitioned-topic and batch messages: use custom api
+ * {@link #PulsarMsgAndOffset::getFullOffset()} to fetch offset with {@link MessageId} and commit offset with the same
+ * message-Id {@link PulsarOffsetMetadataAndError::PulsarOffsetMetadataAndError(messageId..)}.
+ *
+ */
+@Slf4j
+public class PulsarKafkaSimpleConsumer extends SimpleConsumer {
+
+    private final String host;
+    private final int port;
+    private final String clientId;
+    private final PulsarClient client;
+    private final PulsarAdmin admin;
+    private final Map<TopicGroup, Consumer<byte[]>> topicConsumerMap;
+    private final SubscriptionType subscriptionType;
+    public static final String SUBSCRIPTION_TYPE = "pulsar.subscription.type";
+    public static final String HTTP_SERVICE_URL = "pulsar.http.service.url";
+
+    public PulsarKafkaSimpleConsumer(String host, int port, int soTimeout, int bufferSize, String clientId) {
+        this(host, port, soTimeout, bufferSize, clientId, new Properties());
+    }
+
+    /**
+     * 
+     * @param host
+     *            pulsar-broker service url
+     * @param port
+     *            n/a
+     * @param soTimeout
+     *            n/a
+     * @param bufferSize
+     *            n/a
+     * @param clientId
+     *            client-id
+     * @param properties
+     *            properties to retrieve authentication params by {@link PulsarClientKafkaConfig}
+     */
+    public PulsarKafkaSimpleConsumer(String host, int port, int soTimeout, int bufferSize, String clientId,
+            Properties properties) {
+        super(host, port, soTimeout, bufferSize, clientId);
+        this.host = host;
+        this.port = port;
+        this.clientId = clientId;
+        try {
+            client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(host).build();
+        } catch (PulsarClientException e) {
+            log.warn("Failed to create pulsar client for {} and properties {}", host, properties);
+            throw new RuntimeException("Failed to create pulsar client " + host, e);
+        }
+        try {
+            String url = properties.getProperty(HTTP_SERVICE_URL, host);
+            admin = PulsarClientKafkaConfig.getAdminBuilder(url, properties).build();
+        } catch (PulsarClientException e) {
+            log.warn("Failed to create pulsar admin for {} and properties {}", host, properties);
+            throw new RuntimeException("Failed to create pulsar admin " + host, e);
+        }
+        this.topicConsumerMap = new ConcurrentHashMap<>(8, 0.75f, 1);
+        this.subscriptionType = getSubscriptionType(properties);
+    }
+
+    @Override
+    public PulsarFetchResponse fetch(FetchRequest request) {
+        try {
+            Map<String, Reader<byte[]>> topicReaderMap = createTopicReaders(request);
+            return new PulsarFetchResponse(topicReaderMap, false);
+        } catch (Exception e) {
+            log.warn("Failed to process fetch request{}, {}", request, e.getMessage());
+            return new PulsarFetchResponse(null, true);
+        }
+    }
+
+    private Map<String, Reader<byte[]>> createTopicReaders(FetchRequest request) {
+        Map<String, Reader<byte[]>> topicReaderMap = Maps.newHashMap();
+        scala.collection.immutable.Map<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> reqInfo = request
+                .requestInfoGroupedByTopic();
+        Map<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> topicPartitionMap = scala.collection.JavaConverters
+                .mapAsJavaMapConverter(reqInfo).asJava();
+        for (Entry<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> topicPartition : topicPartitionMap
+                .entrySet()) {
+            final String topicName = topicPartition.getKey();
+            Map<TopicAndPartition, PartitionFetchInfo> topicOffsetMap = scala.collection.JavaConverters
+                    .mapAsJavaMapConverter(topicPartition.getValue()).asJava();
+            if (topicOffsetMap != null && !topicOffsetMap.isEmpty()) {
+                // pulsar-kafka adapter doesn't deal with partition so, assuming only 1 topic-metadata per topic name
+                Entry<TopicAndPartition, PartitionFetchInfo> topicOffset = topicOffsetMap.entrySet().iterator().next();
+                long offset = topicOffset.getValue().offset();
+                String topic = getTopicName(topicOffset.getKey());
+                MessageId msgId = getMessageId(offset);
+                try {
+                    Reader<byte[]> reader = client.newReader().readerName(clientId).topic(topic).startMessageId(msgId)
+                            .create();
+                    log.info("Successfully created reader for {} at msg-id {}", topic, msgId);
+                    topicReaderMap.put(topicName, reader);
+                } catch (PulsarClientException e) {
+                    log.warn("Failed to create reader for topic {}", topic, e);
+                    throw new RuntimeException("Failed to create reader for " + topic, e);
+                }
+            }
+        }
+        return topicReaderMap;
+    }
+
+    private MessageId getMessageId(long offset) {
+        if (kafka.api.OffsetRequest.EarliestTime() == offset) {
+            return MessageId.earliest;
+        } else if (kafka.api.OffsetRequest.LatestTime() == offset) {
+            return MessageId.latest;
+        } else {
+            return MessageIdUtils.getMessageId(offset);
+        }
+    }
+
+    @Override
+    public PulsarTopicMetadataResponse send(TopicMetadataRequest request) {
+        List<String> topics = request.topics();
+        PulsarTopicMetadataResponse response = new PulsarTopicMetadataResponse(admin, host, port, topics);
+        return response;
+    }
+
+    // It's @Overriden method of: OffsetResponse getOffsetsBefore(OffsetRequest or)
+    public PulsarOffsetResponse getOffsetsBefore(PulsarOffsetRequest or) {
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> request = or.getRequestInfo();
+        Map<TopicAndPartition, Long> offsetResoponse = Maps.newHashMap();
+        for (Entry<TopicAndPartition, PartitionOffsetRequestInfo> topicPartitionRequest : request.entrySet()) {
+            TopicAndPartition topic = topicPartitionRequest.getKey();
+            long time = topicPartitionRequest.getValue().time();
+            if (time != kafka.api.OffsetRequest.EarliestTime() && time != kafka.api.OffsetRequest.LatestTime()) {
+                throw new IllegalArgumentException("Time has to be from EarliestTime or LatestTime");
+            }
+            offsetResoponse.put(topic, time);
+        }
+        return new PulsarOffsetResponse(offsetResoponse);
+    }
+
+    /**
+     * <pre>
+     * Overriden method: OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
+     * 
+     * Note:
+     * created PulsarOffsetCommitResponse as OffsetCommitRequest doesn't provide getters
+     * 
+     * </pre>
+     */
+    public OffsetCommitResponse commitOffsets(PulsarOffsetCommitRequest request) {
+
+        PulsarOffsetCommitResponse response = new PulsarOffsetCommitResponse(null);
+        for (Entry<String, MessageId> topicOffset : request.getTopicOffsetMap().entrySet()) {
+            final String topic = topicOffset.getKey();
+            final String groupId = request.getGroupId();
+            try {
+                Consumer<byte[]> consumer = getConsumer(topic, groupId);
+                consumer.acknowledgeCumulative(topicOffset.getValue());
+            } catch (Exception e) {
+                log.warn("Failed to ack message for topic {}-{}", topic, topicOffset.getValue(), e);
+                response.hasError = true;
+                TopicAndPartition topicPartition = new TopicAndPartition(topic, 0);
+                response.errors.computeIfAbsent(topicPartition, tp -> ErrorMapping.UnknownCode());
+            }
+        }
+
+        return response;
+    }
+
+    /**
+     * <pre>
+     * Overriden method: OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
+     * 
+     * Note:
+     * created PulsarOffsetFetchRequest as OffsetFetchRequest doesn't have getters for any field
+     * and PulsarOffsetFetchResponse created as base-class doesn't have setters to set state
+     * &#64;param request
+     * &#64;return
+     * 
+     * </pre>
+     */
+    public PulsarOffsetFetchResponse fetchOffsets(PulsarOffsetFetchRequest request) {
+        final String groupId = request.groupId;
+        Map<TopicAndPartition, OffsetMetadataAndError> responseMap = Maps.newHashMap();
+        PulsarOffsetFetchResponse response = new PulsarOffsetFetchResponse(responseMap);
+        for (TopicAndPartition topicMetadata : request.requestInfo) {
+            final String topicName = getTopicName(topicMetadata);
+            try {
+                PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+                CursorStats cursor = stats.cursors != null ? stats.cursors.get(groupId) : null;
+                if (cursor != null) {
+                    String readPosition = cursor.readPosition;
+                    MessageId msgId = null;
+                    if (readPosition != null && readPosition.contains(":")) {
+                        try {
+                            String[] position = readPosition.split(":");
+                            msgId = new MessageIdImpl(Long.parseLong(position[0]), Long.parseLong(position[1]), -1);
+                        } catch (Exception e) {
+                            log.warn("Invalid read-position {} for {}-{}", readPosition, topicName, groupId);
+                        }
+                    }
+                    msgId = msgId == null ? MessageId.earliest : msgId;
+                    OffsetMetadataAndError oE = new OffsetMetadataAndError(MessageIdUtils.getOffset(msgId), null,
+                            ErrorMapping.NoError());
+                    responseMap.put(topicMetadata, oE);
+                }
+            } catch (Exception e) {
+                OffsetMetadataAndError oE = new OffsetMetadataAndError(0, null, ErrorMapping.UnknownCode());
+                responseMap.put(topicMetadata, oE);
+            }
+        }
+        return response;
+    }
+
+    public static String getTopicName(TopicAndPartition topicMetadata) {
+        return topicMetadata.partition() > -1
+                ? TopicName.get(topicMetadata.topic()).getPartition(topicMetadata.partition()).toString()
+                : topicMetadata.topic();
+    }
+
+    @Override
+    public void close() {
+
+        if (topicConsumerMap != null) {
+            topicConsumerMap.forEach((topic, consumer) -> {
+                try {
+                    consumer.close();
+                } catch (PulsarClientException e) {
+                    log.warn("Failed to close consumer for topic {}", topic, e);
+                }
+            });
+            topicConsumerMap.clear();
+        }
+
+        if (client != null) {
+            try {
+                client.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close pulsar-client ", e);
+            }
+        }
+
+        if (admin != null) {
+            try {
+                admin.close();
+            } catch (Exception e) {
+                log.warn("Failed to close pulsar-admin ", e);
+            }
+        }
+    }
+
+    private Consumer<byte[]> getConsumer(String topic, String groupId) {
+        TopicGroup topicGroup = new TopicGroup(topic, groupId);
+
+        return topicConsumerMap.computeIfAbsent(topicGroup, (topicName) -> {
+            try {
+                return client.newConsumer().topic(topic).subscriptionName(groupId).subscriptionType(subscriptionType)
+                        .subscribe();
+            } catch (PulsarClientException e) {
+                log.error("Failed to create consumer for topic {}", topic, e);
+                throw new RuntimeException("Failed to create consumer for topic " + topic, e);
+            }
+        });
+    }
+
+    public static class TopicGroup {
+        protected String topic;
+        protected String grouoId;
+
+        public TopicGroup(String topic, String grouoId) {
+            super();
+            this.topic = topic;
+            this.grouoId = grouoId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topic, grouoId);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof TopicGroup) {
+                TopicGroup t = (TopicGroup) obj;
+                return Objects.equals(topic, t.topic) && Objects.equals(grouoId, t.grouoId);
+            }
+            return false;
+        }
+
+    }
+
+    private SubscriptionType getSubscriptionType(Properties properties) {
+        String subType = properties != null && properties.contains(SUBSCRIPTION_TYPE)
+                ? properties.getProperty(SUBSCRIPTION_TYPE)
+                : SubscriptionType.Failover.toString();
+        try {
+            return SubscriptionType.valueOf(subType);
+        } catch (IllegalArgumentException ie) {
+            return SubscriptionType.Failover;
+        }
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMessage.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMessage.java
new file mode 100644
index 0000000..b32e39a
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMessage.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.nio.ByteBuffer;
+
+public class PulsarMessage extends kafka.message.Message {
+
+    private final String key;
+
+    public PulsarMessage(String key, byte[] bytes) {
+        super(bytes);
+        this.key = key;
+    }
+
+    @Override
+    public ByteBuffer key() {
+        return key != null ? ByteBuffer.wrap(key.getBytes()) : null;
+    }
+
+    @Override
+    public boolean hasKey() {
+        return key != null;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMsgAndOffset.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMsgAndOffset.java
new file mode 100644
index 0000000..ebeec88
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMsgAndOffset.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.util.MessageIdUtils;
+
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
+public class PulsarMsgAndOffset extends MessageAndOffset {
+
+    private static final long serialVersionUID = 1L;
+    private final MessageId messageId;
+
+    public PulsarMsgAndOffset(Message message, MessageId messageId) {
+        super(message, MessageIdUtils.getOffset(messageId));
+        this.messageId = messageId;
+    }
+
+    public MessageId getFullOffset() {
+        return this.messageId;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitRequest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitRequest.java
new file mode 100644
index 0000000..ab13c9c
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitRequest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.util.MessageIdUtils;
+
+import com.google.common.collect.Maps;
+
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetCommitRequest;
+
+public class PulsarOffsetCommitRequest extends OffsetCommitRequest {
+    private final String groupId;
+    private final Map<String, MessageId> topicOffsetMap = Maps.newHashMap();
+
+    public PulsarOffsetCommitRequest(String groupId, Map<TopicAndPartition, PulsarOffsetMetadataAndError> requestInfo,
+            short versionId, int correlationId, String clientId) {
+        super(groupId, Collections.emptyMap(), versionId, correlationId, clientId);
+        this.groupId = groupId;
+        for (Entry<TopicAndPartition, PulsarOffsetMetadataAndError> topicOffset : requestInfo.entrySet()) {
+            String topicName = PulsarKafkaSimpleConsumer.getTopicName(topicOffset.getKey());
+            OffsetMetadataAndError offsetMetadata = topicOffset.getValue();
+            MessageId msgId = null;
+            if (offsetMetadata instanceof PulsarOffsetMetadataAndError) {
+                msgId = ((PulsarOffsetMetadataAndError) offsetMetadata).getMessageId();
+            }
+            msgId = msgId == null ? MessageIdUtils.getMessageId(topicOffset.getValue().offset()) : msgId;
+            topicOffsetMap.put(topicName, msgId);
+        }
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public Map<String, MessageId> getTopicOffsetMap() {
+        return topicOffsetMap;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitResponse.java
new file mode 100644
index 0000000..e32ffc5
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitResponse.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetCommitResponse;
+
+public class PulsarOffsetCommitResponse extends OffsetCommitResponse {
+
+    protected boolean hasError = false;
+    protected Map<TopicAndPartition, Object> errors;
+
+    public PulsarOffsetCommitResponse(kafka.api.OffsetCommitResponse underlying) {
+        super(underlying);
+    }
+
+    public boolean hasError() {
+        return hasError;
+    }
+
+    @Override
+    public Map<TopicAndPartition, Object> errors() {
+        return errors;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchRequest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchRequest.java
new file mode 100644
index 0000000..98ab6ca
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.List;
+
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetFetchRequest;
+
+public class PulsarOffsetFetchRequest extends OffsetFetchRequest {
+    protected final String groupId;
+    protected final List<TopicAndPartition> requestInfo;
+
+    public PulsarOffsetFetchRequest(String groupId, List<TopicAndPartition> requestInfo, short versionId,
+            int correlationId, String clientId) {
+        super(groupId, requestInfo, versionId, correlationId, clientId);
+        this.groupId = groupId;
+        this.requestInfo = requestInfo;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchResponse.java
new file mode 100644
index 0000000..cf8d60b
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchResponse.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetFetchResponse;
+
+public class PulsarOffsetFetchResponse extends OffsetFetchResponse {
+
+    private final Map<TopicAndPartition, OffsetMetadataAndError> response;
+
+    public PulsarOffsetFetchResponse(Map<TopicAndPartition, OffsetMetadataAndError> response) {
+        super(null);
+        this.response = response;
+    }
+
+    @Override
+    public Map<TopicAndPartition, OffsetMetadataAndError> offsets() {
+        return response;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetMetadataAndError.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetMetadataAndError.java
new file mode 100644
index 0000000..06e8c3d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetMetadataAndError.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.util.MessageIdUtils;
+
+import kafka.common.OffsetMetadataAndError;
+
+public class PulsarOffsetMetadataAndError extends OffsetMetadataAndError {
+
+    private static final long serialVersionUID = 1L;
+    private final MessageId messageId;
+
+    public PulsarOffsetMetadataAndError(long offset, String metadata, short error) {
+        super(offset, metadata, error);
+        this.messageId = null;
+    }
+
+    public PulsarOffsetMetadataAndError(MessageId messageId, String metadata, short error) {
+        super(MessageIdUtils.getOffset(messageId), metadata, error);
+        this.messageId = messageId;
+    }
+
+    public MessageId getMessageId() {
+        return messageId;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetRequest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetRequest.java
new file mode 100644
index 0000000..bb7666e
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetRequest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetRequest;
+
+public class PulsarOffsetRequest extends OffsetRequest {
+
+    private final Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo;
+
+    public PulsarOffsetRequest(Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo, short versionId,
+            String clientId) {
+        super(requestInfo, versionId, clientId);
+        this.requestInfo = requestInfo;
+    }
+
+    public Map<TopicAndPartition, PartitionOffsetRequestInfo> getRequestInfo() {
+        return requestInfo;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetResponse.java
new file mode 100644
index 0000000..f7ce44d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetResponse.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+
+public class PulsarOffsetResponse extends OffsetResponse {
+
+    private final Map<TopicAndPartition, Long> offsetResoponse;
+
+    public PulsarOffsetResponse(Map<TopicAndPartition, Long> offsetResoponse) {
+        super(null);
+        this.offsetResoponse = offsetResoponse;
+    }
+
+    @Override
+    public long[] offsets(String topic, int partition) {
+        Long offset = offsetResoponse.get(new TopicAndPartition(topic, partition));
+        long[] offsets = { offset != null ? offset : 0 };
+        return offsets;
+    }
+
+    @Override
+    public boolean hasError() {
+        return false;
+    }
+
+    @Override
+    public short errorCode(String topic, int partition) {
+        return ErrorMapping.NoError();
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarPartitionMetadata.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarPartitionMetadata.java
new file mode 100644
index 0000000..f61e7d3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarPartitionMetadata.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.List;
+
+import kafka.cluster.Broker;
+import kafka.javaapi.PartitionMetadata;
+
+public class PulsarPartitionMetadata extends PartitionMetadata {
+
+    private final List<Broker> hosts;
+
+    public PulsarPartitionMetadata(String hostUrl, int port) {
+        super(null);
+        this.hosts = Collections.singletonList(new Broker(0, hostUrl, port));
+    }
+
+    @Override
+    public List<Broker> replicas() {
+        return hosts;
+    }
+
+    @Override
+    public Broker leader() {
+        return hosts.get(0);
+    }
+
+    @Override
+    public int partitionId() {
+        // it always returns partition=-1
+        return -1;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadata.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadata.java
new file mode 100644
index 0000000..4dd6660
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadata.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.List;
+
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+
+public class PulsarTopicMetadata extends TopicMetadata {
+
+    private final String hostUrl;
+    private final int port;
+
+    public PulsarTopicMetadata(String hostUrl, int port, String topic) {
+        super(null);
+        this.hostUrl = hostUrl;
+        this.port = port;
+    }
+
+    @Override
+    public List<PartitionMetadata> partitionsMetadata() {
+        return Collections.singletonList(new PulsarPartitionMetadata(hostUrl, port));
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadataResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadataResponse.java
new file mode 100644
index 0000000..e1d39c5
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadataResponse.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.List;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+
+import com.google.common.collect.Lists;
+
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataResponse;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarTopicMetadataResponse extends TopicMetadataResponse {
+
+    private final List<String> topics;
+    private final String hostUrl;
+    private final int port;
+    private final PulsarAdmin admin;
+
+    public PulsarTopicMetadataResponse(PulsarAdmin admin, String hostUrl, int port, List<String> topics) {
+        super(null);
+        this.hostUrl = hostUrl;
+        this.port = port;
+        this.topics = topics;
+        this.admin = admin;
+    }
+
+    @Override
+    public List<TopicMetadata> topicsMetadata() {
+        List<TopicMetadata> metadataList = Lists.newArrayList();
+        topics.forEach(topic -> {
+            try {
+                int partitions;
+                partitions = admin.topics().getPartitionedTopicMetadata(topic).partitions;
+                if (partitions > 0) {
+                    for (int partition = 0; partition < partitions; partition++) {
+                        String topicName = TopicName.get(topic).getPartition(partition).toString();
+                        metadataList.add(new PulsarTopicMetadata(hostUrl, port, topicName));
+                    }
+                } else {
+                    metadataList.add(new PulsarTopicMetadata(hostUrl, port, topic));
+                }
+            } catch (PulsarAdminException e) {
+                log.error("Failed to get partitioned metadata for {}", topic, e);
+                throw new RuntimeException("Failed to get partitioned-metadata", e);
+            }
+        });
+        return metadataList;
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java
new file mode 100644
index 0000000..77d7e8c
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static org.testng.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.testng.annotations.Test;
+
+import kafka.consumer.ConsumerConfig;
+
+public class PulsarKafkaConsumerTest {
+
+    @Test
+    public void testPulsarKafkaConsumerWithDefaultConfig() throws Exception {
+        // https://kafka.apache.org/08/documentation.html#consumerconfigs
+        Properties properties = new Properties();
+        properties.put("zookeeper.connect", "http://localhost:8080/");
+        properties.put("group.id", "group1");
+
+        ConsumerConfig config = new ConsumerConfig(properties);
+        ConsumerConnector connector = new ConsumerConnector(config);
+        ConsumerBuilderImpl<byte[]> consumerBuilder = (ConsumerBuilderImpl<byte[]>) connector.getConsumerBuilder();
+        Field confField = consumerBuilder.getClass().getDeclaredField("conf");
+        confField.setAccessible(true);
+        ConsumerConfigurationData conf = (ConsumerConfigurationData) confField.get(consumerBuilder);
+        assertEquals(conf.getSubscriptionName(), "group1");
+        assertEquals(conf.getReceiverQueueSize(), 1000);
+    }
+    
+    @Test
+    public void testPulsarKafkaConsumerConfig() throws Exception {
+        // https://kafka.apache.org/08/documentation.html#consumerconfigs
+        Properties properties = new Properties();
+        properties.put("zookeeper.connect", "http://localhost:8080/");
+        properties.put("group.id", "group1");
+        properties.put("consumer.id", "cons1");
+        properties.put("auto.commit.enable", "true");
+        properties.put("auto.commit.interval.ms", "100");
+        properties.put("queued.max.message.chunks", "100");
+
+        ConsumerConfig config = new ConsumerConfig(properties);
+        ConsumerConnector connector = new ConsumerConnector(config);
+        ConsumerBuilderImpl<byte[]> consumerBuilder = (ConsumerBuilderImpl<byte[]>) connector.getConsumerBuilder();
+        Field confField = consumerBuilder.getClass().getDeclaredField("conf");
+        confField.setAccessible(true);
+        ConsumerConfigurationData conf = (ConsumerConfigurationData) confField.get(consumerBuilder);
+        assertEquals(conf.getSubscriptionName(), "group1");
+        assertEquals(conf.getReceiverQueueSize(), 100);
+        assertEquals(conf.getAcknowledgementsGroupTimeMicros(), TimeUnit.MILLISECONDS.toMicros(100));
+        System.out.println(conf);
+    }
+    
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
new file mode 100644
index 0000000..4cccf83
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static org.testng.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.testng.annotations.Test;
+
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Encoder;
+
+public class PulsarKafkaProducerTest {
+
+    private static final String BROKER_URL = "metadata.broker.list";
+    private static final String PRODUCER_TYPE = "producer.type";
+    private static final String SERIALIZER_CLASS = "serializer.class";
+    private static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+    private static final String PARTITIONER_CLASS = "partitioner.class";
+    private static final String COMPRESSION_CODEC = "compression.codec";
+    private static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+    private static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+    private static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+    private static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+    private static final String CLIENT_ID = "client.id";
+
+    @Test
+    public void testPulsarKafkaProducerWithDefaultConfig() throws Exception {
+        // https://kafka.apache.org/08/documentation.html#producerconfigs
+        Properties properties = new Properties();
+        properties.put(BROKER_URL, "http://localhost:8080/");
+
+        ProducerConfig config = new ProducerConfig(properties);
+        PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+        ProducerBuilderImpl<byte[]> producerBuilder = (ProducerBuilderImpl<byte[]>) producer.getPulsarProducerBuilder();
+        Field field = ProducerBuilderImpl.class.getDeclaredField("conf");
+        field.setAccessible(true);
+        ProducerConfigurationData conf = (ProducerConfigurationData) field.get(producerBuilder);
+        System.out.println("getMaxPendingMessages= " + conf.getMaxPendingMessages());
+        assertEquals(conf.getCompressionType(), CompressionType.NONE);
+        assertEquals(conf.isBlockIfQueueFull(), true);
+        assertEquals(conf.getMaxPendingMessages(), 1000);
+        assertEquals(conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MILLISECONDS.toMicros(1));
+        assertEquals(conf.getBatchingMaxMessages(), 1000);
+    }
+
+    @Test
+    public void testPulsarKafkaProducer() throws Exception {
+        // https://kafka.apache.org/08/documentation.html#producerconfigs
+        Properties properties = new Properties();
+        properties.put(BROKER_URL, "http://localhost:8080/");
+        properties.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+        properties.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+        properties.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+        properties.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+        properties.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+        properties.put(CLIENT_ID, "test");
+        ProducerConfig config = new ProducerConfig(properties);
+        PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+        ProducerBuilderImpl<byte[]> producerBuilder = (ProducerBuilderImpl<byte[]>) producer.getPulsarProducerBuilder();
+        Field field = ProducerBuilderImpl.class.getDeclaredField("conf");
+        field.setAccessible(true);
+        ProducerConfigurationData conf = (ProducerConfigurationData) field.get(producerBuilder);
+        assertEquals(conf.getCompressionType(), CompressionType.ZLIB);
+        assertEquals(conf.isBlockIfQueueFull(), true);
+        assertEquals(conf.getMaxPendingMessages(), 6000);
+        assertEquals(conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MILLISECONDS.toMicros(100));
+        assertEquals(conf.getBatchingMaxMessages(), 500);
+    }
+
+    @Test
+    public void testPulsarKafkaProducerWithSerializer() throws Exception {
+        Properties properties = new Properties();
+        properties.put(BROKER_URL, "http://localhost:8080/");
+        properties.put(PRODUCER_TYPE, "sync");
+        properties.put(SERIALIZER_CLASS, TestEncoder.class.getName());
+        properties.put(KEY_SERIALIZER_CLASS, TestEncoder.class.getName());
+        properties.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+        ProducerConfig config = new ProducerConfig(properties);
+        PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+        assertEquals(producer.getKeySerializer().getClass(), TestEncoder.class);
+        assertEquals(producer.getValueSerializer().getClass(), TestEncoder.class);
+        assertEquals(producer.getPartitioner().getClass(), TestPartitioner.class);
+
+    }
+
+    public static class TestEncoder implements Encoder<String> {
+        @Override
+        public byte[] toBytes(String value) {
+            return value.getBytes();
+        }
+    }
+
+    public static class TestPartitioner implements Partitioner {
+        @Override
+        public int partition(Object obj, int totalPartition) {
+            return obj.hashCode() % totalPartition;
+        }
+    }
+
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerConsumerTest.java
new file mode 100644
index 0000000..bbf61be
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerConsumerTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.ConsumerConnector;
+import org.apache.kafka.clients.consumer.PulsarKafkaStream;
+import org.apache.kafka.clients.consumer.PulsarMessageAndMetadata;
+import org.apache.kafka.clients.producer.PulsarKafkaProducer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.serializer.StringDecoder;
+import kafka.serializer.StringEncoder;
+
+public class KafkaProducerConsumerTest extends ProducerConsumerBase {
+
+    private static final String BROKER_URL = "metadata.broker.list";
+    private static final String PRODUCER_TYPE = "producer.type";
+    private static final String SERIALIZER_CLASS = "serializer.class";
+    private static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+    private static final String PARTITIONER_CLASS = "partitioner.class";
+    private static final String COMPRESSION_CODEC = "compression.codec";
+    private static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+    private static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+    private static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+    private static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+    private static final String CLIENT_ID = "client.id";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPulsarKafkaProducerWithSerializer() throws Exception {
+        final String serviceUrl = lookupUrl.toString();
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        // (1) Create consumer
+        Properties properties = new Properties();
+        properties.put("zookeeper.connect", serviceUrl);
+        properties.put("group.id", "group1");
+        properties.put("consumer.id", "cons1");
+        properties.put("auto.commit.enable", "true");
+        properties.put("auto.commit.interval.ms", "100");
+        properties.put("queued.max.message.chunks", "100");
+
+        ConsumerConfig conSConfig = new ConsumerConfig(properties);
+        ConsumerConnector connector = new ConsumerConnector(conSConfig);
+        Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 2);
+        Map<String, List<PulsarKafkaStream<String, Tweet>>> streams = connector.createMessageStreams(topicCountMap,
+                new StringDecoder(null), new TestDecoder());
+
+        // (2) Create producer
+        Properties properties2 = new Properties();
+        properties2.put(BROKER_URL, serviceUrl);
+        properties2.put(PRODUCER_TYPE, "sync");
+        properties2.put(SERIALIZER_CLASS, TestEncoder.class.getName());
+        properties2.put(KEY_SERIALIZER_CLASS, StringEncoder.class.getName());
+        properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+        properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+        properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+        properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+        properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+        properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+        properties2.put(CLIENT_ID, "test");
+        ProducerConfig config = new ProducerConfig(properties2);
+        PulsarKafkaProducer<String, Tweet> producer = new PulsarKafkaProducer<>(config);
+
+        String name = "user";
+        String msg = "Hello World!";
+        Set<Tweet> published = Sets.newHashSet();
+        Set<Tweet> received = Sets.newHashSet();
+        int total = 10;
+        for (int i = 0; i < total; i++) {
+            String sendMessage = msg + i;
+            Tweet tweet = new Tweet(name, sendMessage);
+            KeyedMessage<String, Tweet> message = new KeyedMessage<>(topicName, name, tweet);
+            published.add(tweet);
+            producer.send(message);
+        }
+        while (received.size() < total) {
+            for (int i = 0; i < streams.size(); i++) {
+                List<PulsarKafkaStream<String, Tweet>> kafkaStreams = streams.get(topicName);
+                assertEquals(kafkaStreams.size(), 2);
+                for (PulsarKafkaStream<String, Tweet> kafkaStream : kafkaStreams) {
+                    for (PulsarMessageAndMetadata<String, KafkaProducerConsumerTest.Tweet> record : kafkaStream) {
+                        received.add(record.message());
+                        assertEquals(record.key(), name);
+                    }
+                }
+            }
+        }
+        assertEquals(published.size(), received.size());
+        published.removeAll(received);
+        assertTrue(published.isEmpty());
+    }
+
+    public static class Tweet implements Serializable {
+        private static final long serialVersionUID = 1L;
+        public String userName;
+        public String message;
+
+        public Tweet(String userName, String message) {
+            super();
+            this.userName = userName;
+            this.message = message;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(userName, message);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof Tweet) {
+                Tweet tweet = (Tweet) obj;
+                return Objects.equal(this.userName, tweet.userName) && Objects.equal(this.message, tweet.message);
+            }
+            return false;
+        }
+    }
+
+    public static class TestEncoder implements Encoder<Tweet> {
+        @Override
+        public byte[] toBytes(Tweet tweet) {
+            return (tweet.userName + "," + tweet.message).getBytes();
+        }
+    }
+
+    public static class TestDecoder implements Decoder<Tweet> {
+        @Override
+        public Tweet fromBytes(byte[] input) {
+            String[] tokens = (new String(input)).split(",");
+            return new Tweet(tokens[0], tokens[1]);
+        }
+    }
+
+    public static class TestPartitioner implements Partitioner {
+        @Override
+        public int partition(Object obj, int totalPartition) {
+            return obj.hashCode() % totalPartition;
+        }
+    }
+
+    
+    @Test
+    public void testProducerConsumerWithoutSerializer() throws Exception {
+        final String serviceUrl = lookupUrl.toString();
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        // (1) Create consumer
+        Properties properties = new Properties();
+        properties.put("zookeeper.connect", serviceUrl);
+        properties.put("group.id", "group1");
+        properties.put("consumer.id", "cons1");
+        properties.put("auto.commit.enable", "true");
+        properties.put("auto.commit.interval.ms", "100");
+        properties.put("queued.max.message.chunks", "100");
+
+        ConsumerConfig conSConfig = new ConsumerConfig(properties);
+        ConsumerConnector connector = new ConsumerConnector(conSConfig);
+        Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 2);
+        Map<String, List<PulsarKafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);
+
+        // (2) Create producer
+        Properties properties2 = new Properties();
+        properties2.put(BROKER_URL, serviceUrl);
+        properties2.put(PRODUCER_TYPE, "sync");
+        properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+        properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+        properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+        properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+        properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+        properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+        properties2.put(CLIENT_ID, "test");
+        ProducerConfig config = new ProducerConfig(properties2);
+        PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+
+        String name = "user";
+        String msg = "Hello World!";
+        int total = 10;
+        for (int i = 0; i < total; i++) {
+            String sendMessage = msg + i;
+            KeyedMessage<byte[], byte[]> message = new KeyedMessage<>(topicName, name.getBytes(), sendMessage.getBytes());
+            producer.send(message);
+        }
+        int count = 0;
+        while (count < total) {
+            for (int i = 0; i < streams.size(); i++) {
+                List<PulsarKafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+                assertEquals(kafkaStreams.size(), 2);
+                for (PulsarKafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
+                    for (PulsarMessageAndMetadata<byte[], byte[]> record : kafkaStream) {
+                        count++;
+                        System.out.println(new String(record.message()));
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
new file mode 100644
index 0000000..3010c54
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.PulsarKafkaProducer;
+import org.apache.kafka.clients.simple.consumer.PulsarKafkaSimpleConsumer;
+import org.apache.kafka.clients.simple.consumer.PulsarMsgAndOffset;
+import org.apache.kafka.clients.simple.consumer.PulsarOffsetCommitRequest;
+import org.apache.kafka.clients.simple.consumer.PulsarOffsetFetchRequest;
+import org.apache.kafka.clients.simple.consumer.PulsarOffsetMetadataAndError;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.message.MessageAndOffset;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.serializer.StringEncoder;
+
+public class KafkaProducerSimpleConsumerTest extends ProducerConsumerBase {
+
+    private static final String BROKER_URL = "metadata.broker.list";
+    private static final String PRODUCER_TYPE = "producer.type";
+    private static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+    private static final String PARTITIONER_CLASS = "partitioner.class";
+    private static final String COMPRESSION_CODEC = "compression.codec";
+    private static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+    private static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+    private static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+    private static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+    private static final String CLIENT_ID = "client.id";
+    
+    private final static int publishPartition = 1;
+    
+    @DataProvider(name = "partitions")
+    public Object[][] totalPartitions() {
+        return new Object[][] { { 0 }, { 10 } };
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(dataProvider="partitions")
+    public void testPulsarKafkaProducerWithSerializer(int partitions) throws Exception {
+        final String serviceUrl = lookupUrl.toString();
+        final String topicName = "persistent://my-property/my-ns/my-topic";
+        final String groupId = "group1";
+        
+        int partition = -1;
+        if (partitions > 0) {
+            admin.topics().createPartitionedTopic(topicName, 10);
+            partition = publishPartition;
+        }
+        
+        // create subscription
+        Consumer<byte[]> cons = pulsarClient.newConsumer().topic(topicName).subscriptionName(groupId).subscribe();
+        cons.close();
+
+        // (2) Create producer
+        Properties properties2 = new Properties();
+        properties2.put(BROKER_URL, serviceUrl);
+        properties2.put(PRODUCER_TYPE, "sync");
+        properties2.put(KEY_SERIALIZER_CLASS, StringEncoder.class.getName());
+        properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+        properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+        properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+        properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+        properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+        properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+        properties2.put(CLIENT_ID, "test");
+        ProducerConfig config = new ProducerConfig(properties2);
+        PulsarKafkaProducer<String, byte[]> producer = new PulsarKafkaProducer<>(config);
+
+        String name = "user";
+        String msg = "Hello World!";
+        Set<String> published = Sets.newHashSet();
+        Set<String> received = Sets.newHashSet();
+        int total = 10;
+        for (int i = 0; i < total; i++) {
+            String sendMessage = msg + i;
+            KeyedMessage<String, byte[]> message = new KeyedMessage<>(topicName, name, sendMessage.getBytes());
+            published.add(sendMessage);
+            producer.send(message);
+        }
+        
+        
+        // (2) Consume using simple consumer
+        PulsarKafkaSimpleConsumer consumer = new PulsarKafkaSimpleConsumer(serviceUrl, 0, 0, 0, "clientId");
+        List<String> topics = Collections.singletonList(topicName);
+        TopicMetadataRequest req = new TopicMetadataRequest(topics);
+        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+        
+        List<TopicMetadata> metaData = resp.topicsMetadata();
+        PartitionMetadata part = metaData.get(0).partitionsMetadata().get(0);
+        
+        long readOffset = kafka.api.OffsetRequest.EarliestTime();
+        FetchRequest fReq = new FetchRequestBuilder()
+                .clientId("c1")
+                .addFetch(topicName, partition, readOffset, 100000)
+                .build();
+        FetchResponse fetchResponse = consumer.fetch(fReq);
+        
+        long lastOffset = 0;
+        MessageId offset = null;
+        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topicName, partition)) {
+            long currentOffset = messageAndOffset.offset();
+            if (currentOffset < readOffset) {
+                continue;
+            }
+            offset = ((PulsarMsgAndOffset)messageAndOffset).getFullOffset();
+            lastOffset = messageAndOffset.offset();
+            ByteBuffer payload = messageAndOffset.message().payload();
+
+            byte[] bytes = new byte[payload.limit()];
+            payload.get(bytes);
+            received.add(new String(bytes, "UTF-8"));
+        }
+        lastOffset -= 1;
+        
+        assertEquals(published.size(), received.size());
+        published.removeAll(received);
+        assertTrue(published.isEmpty());
+        
+        TopicAndPartition topicPartition = new TopicAndPartition(topicName, partition);
+        PulsarOffsetMetadataAndError offsetError = new PulsarOffsetMetadataAndError(offset, null, (short) 0);
+        Map<TopicAndPartition, PulsarOffsetMetadataAndError> requestInfo = Collections.singletonMap(topicPartition,
+                offsetError);
+        PulsarOffsetCommitRequest offsetReq = new PulsarOffsetCommitRequest(groupId, requestInfo, (short) -1, 0, "c1");
+        consumer.commitOffsets(offsetReq);
+        
+        final long expectedReadOffsetPosition = lastOffset;
+        
+        retryStrategically((test) -> fetchOffset(consumer, topicPartition, groupId) == expectedReadOffsetPosition, 10, 150);
+        
+        long offset1 = fetchOffset(consumer, topicPartition, groupId);
+        MessageIdImpl actualMsgId = ((MessageIdImpl)MessageIdUtils.getMessageId(offset1));
+        
+        MessageIdImpl expectedMsgId = (MessageIdImpl) offset;
+        assertEquals(actualMsgId.getLedgerId(), expectedMsgId.getLedgerId());
+        assertEquals(actualMsgId.getEntryId(), expectedMsgId.getEntryId() + 1);
+    }
+
+    private long fetchOffset(PulsarKafkaSimpleConsumer consumer, TopicAndPartition topicPartition, String groupId) {
+        List<TopicAndPartition> fetchReqInfo = Collections.singletonList(topicPartition);
+        PulsarOffsetFetchRequest fetchOffsetRequest = new PulsarOffsetFetchRequest(groupId, fetchReqInfo, (short)-1, 0, "test");
+        OffsetMetadataAndError offsetResponse = consumer.fetchOffsets(fetchOffsetRequest).offsets().get(topicPartition);
+        return offsetResponse.offset();
+    }
+
+    public static class Tweet implements Serializable {
+        private static final long serialVersionUID = 1L;
+        public String userName;
+        public String message;
+
+        public Tweet(String userName, String message) {
+            super();
+            this.userName = userName;
+            this.message = message;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(userName, message);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof Tweet) {
+                Tweet tweet = (Tweet) obj;
+                return Objects.equal(this.userName, tweet.userName) && Objects.equal(this.message, tweet.message);
+            }
+            return false;
+        }
+    }
+
+    public static class TestEncoder implements Encoder<Tweet> {
+        @Override
+        public byte[] toBytes(Tweet tweet) {
+            return (tweet.userName + "," + tweet.message).getBytes();
+        }
+    }
+
+    public static class TestDecoder implements Decoder<Tweet> {
+        @Override
+        public Tweet fromBytes(byte[] input) {
+            String[] tokens = (new String(input)).split(",");
+            return new Tweet(tokens[0], tokens[1]);
+        }
+    }
+
+    public static class TestPartitioner implements Partitioner {
+        @Override
+        public int partition(Object obj, int totalPartition) {
+            //return obj.hashCode() % totalPartition;
+            return publishPartition;
+        }
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 9eed128..6965378 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -317,4 +317,9 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     public ConsumerConfigurationData<T> getConf() {
         return conf;
     }
+    
+    @Override
+    public String toString() {
+        return conf != null ? conf.toString() : null;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index d8ad226..7976a04 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -270,4 +270,9 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
                     "should be set as " + MessageRoutingMode.CustomPartition);
         }
     }
+    
+    @Override
+    public String toString() {
+        return conf != null ? conf.toString() : null;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index ed3fa09..626d662 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -28,7 +28,7 @@ public class TopicMessageIdImpl implements MessageId {
     private final String topicName;
     private final MessageId messageId;
 
-    TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
+    public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
         this.messageId = messageId;
         this.topicPartitionName = topicPartitionName;
         this.topicName = topicName;