You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/08/06 16:54:04 UTC
[pulsar] branch master updated: Add support of pulsar-kafka-adapter
for kafka-0.8 api (#4797)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b185db Add support of pulsar-kafka-adapter for kafka-0.8 api (#4797)
7b185db is described below
commit 7b185dbef65fbacf0b0b22e8411905aefec51479
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Aug 6 09:53:56 2019 -0700
Add support of pulsar-kafka-adapter for kafka-0.8 api (#4797)
* Add support pulsar-kafka-adapter for kafka-0.8 api
clean up pulsar-kafka adapter
add tests
add low level consumer
add simple consumer
corrected pulsar-client-kafka_0_8
fix the module name
add batch and partitioned-topic support
fix headers
add getOffset api support
added pulsarOffset request/response
clean up
* add pulsar-kafka integration test
* use earliestTime offset
* add default serializer/de
---
pulsar-client-kafka-compat/pom.xml | 3 +
.../pulsar-client-kafka-shaded_0_8/pom.xml | 433 +++++++++++++++++++++
.../pom.xml | 52 ++-
.../compat/examples/HighLevelConsumerExample.java | 121 ++++++
.../compat/examples/LowLevelConsumerExample.java | 146 +++++++
.../kafka/compat/examples/ProducerExample.java | 131 +++++++
.../client/kafka/compat/examples/utils/Tweet.java | 67 ++++
.../pulsar-client-kafka/pom.xml | 5 +
.../pom.xml | 61 ++-
.../kafka/clients/consumer/ConsumerConnector.java | 230 +++++++++++
.../kafka/clients/consumer/ConsumerIterator.java | 143 +++++++
.../clients/consumer/PulsarConsumerConfig.java | 28 ++
.../clients/consumer/PulsarKafkaConsumer.java | 30 ++
.../kafka/clients/consumer/PulsarKafkaStream.java | 68 ++++
.../clients/consumer/PulsarMessageAndMetadata.java | 92 +++++
.../clients/producer/PulsarClientKafkaConfig.java | 169 ++++++++
.../clients/producer/PulsarKafkaProducer.java | 303 ++++++++++++++
.../consumer/PulsarByteBufferMessageSet.java | 93 +++++
.../simple/consumer/PulsarFetchResponse.java | 47 +++
.../simple/consumer/PulsarKafkaSimpleConsumer.java | 354 +++++++++++++++++
.../clients/simple/consumer/PulsarMessage.java | 41 ++
.../simple/consumer/PulsarMsgAndOffset.java | 40 ++
.../simple/consumer/PulsarOffsetCommitRequest.java | 61 +++
.../consumer/PulsarOffsetCommitResponse.java | 43 ++
.../simple/consumer/PulsarOffsetFetchRequest.java | 36 ++
.../simple/consumer/PulsarOffsetFetchResponse.java | 40 ++
.../consumer/PulsarOffsetMetadataAndError.java | 44 +++
.../simple/consumer/PulsarOffsetRequest.java | 40 ++
.../simple/consumer/PulsarOffsetResponse.java | 52 +++
.../simple/consumer/PulsarPartitionMetadata.java | 51 +++
.../simple/consumer/PulsarTopicMetadata.java | 42 ++
.../consumer/PulsarTopicMetadataResponse.java | 71 ++++
.../clients/consumer/PulsarKafkaConsumerTest.java | 75 ++++
.../clients/producer/PulsarKafkaProducerTest.java | 124 ++++++
.../kafka/test/KafkaProducerConsumerTest.java | 247 ++++++++++++
.../test/KafkaProducerSimpleConsumerTest.java | 256 ++++++++++++
.../pulsar/client/impl/ConsumerBuilderImpl.java | 5 +
.../pulsar/client/impl/ProducerBuilderImpl.java | 5 +
.../pulsar/client/impl/TopicMessageIdImpl.java | 2 +-
39 files changed, 3825 insertions(+), 26 deletions(-)
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml
index 569647f..5e0e896 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -38,10 +38,13 @@
<modules>
<module>pulsar-client-kafka</module>
+ <module>pulsar-client-kafka_0_8</module>
<module>pulsar-client-kafka_0_9</module>
<module>pulsar-client-kafka-shaded</module>
+ <module>pulsar-client-kafka-shaded_0_8</module>
<module>pulsar-client-kafka-shaded_0_9</module>
<module>pulsar-client-kafka-tests</module>
+ <module>pulsar-client-kafka-tests_0_8</module>
<module>pulsar-client-kafka-tests_0_9</module>
</modules>
</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_8/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_8/pom.xml
new file mode 100644
index 0000000..6cf6c84
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_8/pom.xml
@@ -0,0 +1,433 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-kafka-compat</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-client-kafka_0_8</artifactId>
+ <name>Pulsar Kafka compatibility 0.8 :: API</name>
+
+ <description>Drop-in replacement for Kafka client library that publishes and consumes
+ messages on Pulsar topics</description>
+
+ <properties>
+ <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-kafka_0_8-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <artifactSet>
+ <includes>
+ <include>org.apache.kafka:kafka_2.9.2</include>
+ <include>org.apache.pulsar:pulsar-client-kafka_0_8-original</include>
+ <include>org.apache.pulsar:pulsar-client-original</include>
+ <include>org.apache.commons:commons-lang3</include>
+ <include>commons-codec:commons-codec</include>
+ <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
+ <include>commons-collections:commons-collections</include>
+ <include>org.asynchttpclient:*</include>
+ <include>io.netty:netty-codec-http</include>
+ <include>io.netty:netty-transport-native-epoll</include>
+ <include>org.reactivestreams:reactive-streams</include>
+ <include>com.typesafe.netty:netty-reactive-streams</include>
+ <include>org.javassist:javassist</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>com.google.code.gson:gson</include>
+ <include>com.fasterxml.jackson.core</include>
+ <include>com.fasterxml.jackson.module</include>
+ <include>com.fasterxml.jackson.dataformat</include>
+ <include>io.netty:netty</include>
+ <include>io.netty:netty-*</include>
+ <include>org.apache.pulsar:pulsar-common</include>
+ <include>org.apache.bookkeeper:circe-checksum</include>
+ <include>com.yahoo.datasketches:sketches-core</include>
+ <include>org.apache.httpcomponents:httpclient</include>
+ <include>commons-logging:commons-logging</include>
+ <include>org.apache.httpcomponents:httpcore</include>
+ <include>org.eclipse.jetty:*</include>
+ <include>com.yahoo.datasketches:*</include>
+ <include>commons-*:*</include>
+ <include>org.yaml:snakeyaml</include>
+ <include>org.objenesis:*</include>
+
+ <include>org.apache.avro:*</include>
+ <!-- Avro transitive dependencies-->
+ <include>org.codehaus.jackson:jackson-core-asl</include>
+ <include>org.codehaus.jackson:jackson-mapper-asl</include>
+ <include>com.thoughtworks.paranamer:paranamer</include>
+ <include>org.xerial.snappy:snappy-java</include>
+ <include>org.apache.commons:commons-compress</include>
+ <include>org.tukaani:xz</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>commons-logging:commons-logging</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>kafka.javaapi.producer.Producer</pattern>
+ <shadedPattern>kafka.javaapi.producer.OriginalProducer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.producer.PulsarKafkaProducer</pattern>
+ <shadedPattern>kafka.javaapi.producer.Producer</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.consumer.ConsumerConnector</pattern>
+ <shadedPattern>kafka.javaapi.consumer.OriginalConsumerConnector</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.ConsumerConnector</pattern>
+ <shadedPattern>kafka.javaapi.consumer.ConsumerConnector</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.consumer.KafkaStream</pattern>
+ <shadedPattern>kafka.consumer.OriginalKafkaStream</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.PulsarKafkaStream</pattern>
+ <shadedPattern>kafka.consumer.KafkaStream</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.message.MessageAndMetadata</pattern>
+ <shadedPattern>kafka.message.OriginalMessageAndMetadata</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.PulsarMessageAndMetadata</pattern>
+ <shadedPattern>kafka.message.MessageAndMetadata</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.consumer.ConsumerIterator</pattern>
+ <shadedPattern>kafka.consumer.OriginalConsumerIterator</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.ConsumerIterator</pattern>
+ <shadedPattern>kafka.consumer.ConsumerIterator</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.consumer.Consumer</pattern>
+ <shadedPattern>kafka.consumer.OriginalConsumer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
+ <shadedPattern>kafka.consumer.Consumer</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.PulsarConsumerConfig</pattern>
+ <shadedPattern>kafka.consumer.ConsumerConfig</shadedPattern>
+ </relocation>
+
+ <!-- low-level consumer -->
+ <relocation>
+ <pattern>kafka.javaapi.consumer.SimpleConsumer</pattern>
+ <shadedPattern>kafka.javaapi.consumer.OriginalSimpleConsumer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarKafkaSimpleConsumer</pattern>
+ <shadedPattern>kafka.javaapi.consumer.SimpleConsumer</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.OffsetFetchResponse</pattern>
+ <shadedPattern>kafka.javaapi.OriginalOffsetFetchResponse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetFetchResponse</pattern>
+ <shadedPattern>kafka.javaapi.OffsetFetchResponse</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.OffsetFetchRequest</pattern>
+ <shadedPattern>kafka.javaapi.OriginalOffsetFetchRequest</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetFetchRequest</pattern>
+ <shadedPattern>kafka.javaapi.OffsetFetchRequest</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.OffsetCommitRequest</pattern>
+ <shadedPattern>kafka.javaapi.OriginalOffsetCommitRequest</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetCommitRequest</pattern>
+ <shadedPattern>kafka.javaapi.OffsetCommitRequest</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.FetchResponse</pattern>
+ <shadedPattern>kafka.javaapi.OriginalFetchResponse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarFetchResponse</pattern>
+ <shadedPattern>kafka.javaapi.FetchResponse</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.message.MessageAndOffset</pattern>
+ <shadedPattern>kafka.message.MessageAndOffset</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.message.ByteBufferMessageSet</pattern>
+ <shadedPattern>kafka.javaapi.message.OriginalByteBufferMessageSet</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarByteBufferMessageSet</pattern>
+ <shadedPattern>kafka.javaapi.message.ByteBufferMessageSet</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.message.Message</pattern>
+ <shadedPattern>kafka.message.OriginalMessage</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarMessage</pattern>
+ <shadedPattern>kafka.message.Message</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.TopicMetadataResponse</pattern>
+ <shadedPattern>kafka.javaapi.OriginalTopicMetadataResponse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarTopicMetadataResponse</pattern>
+ <shadedPattern>kafka.javaapi.TopicMetadataResponse</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.OffsetCommitResponse</pattern>
+ <shadedPattern>kafka.javaapi.OriginalOffsetCommitResponse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetCommitResponse</pattern>
+ <shadedPattern>kafka.javaapi.OffsetCommitResponse</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.common.OffsetMetadataAndError</pattern>
+ <shadedPattern>kafka.common.OriginalOffsetMetadataAndError</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetMetadataAndError</pattern>
+ <shadedPattern>kafka.common.OffsetMetadataAndError</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.OffsetResponse</pattern>
+ <shadedPattern>kafka.javaapi.OriginalOffsetResponse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetResponse</pattern>
+ <shadedPattern>kafka.javaapi.OffsetResponse</shadedPattern>
+ </relocation>
+
+ <relocation>
+ <pattern>kafka.javaapi.OffsetRequest</pattern>
+ <shadedPattern>kafka.javaapi.OriginalOffsetRequest</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.simple.consumer.PulsarOffsetRequest</pattern>
+ <shadedPattern>kafka.javaapi.OffsetRequest</shadedPattern>
+ </relocation>
+
+
+ <!-- General relocation rules for Pulsar client dependencies -->
+
+ <relocation>
+ <pattern>org.asynchttpclient</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.datasketches</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.sketches</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.eclipse.jetty</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.eclipse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.reactivestreams</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.typesafe</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.typesafe</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.memory</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.memory</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.objenesis</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.objenesis</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.yaml</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.avro</shadedPattern>
+ <excludes>
+ <exclude>org.apache.avro.reflect.AvroAlias</exclude>
+ <exclude>org.apache.avro.reflect.AvroDefault</exclude>
+ <exclude>org.apache.avro.reflect.AvroEncode</exclude>
+ <exclude>org.apache.avro.reflect.AvroIgnore</exclude>
+ <exclude>org.apache.avro.reflect.AvroMeta</exclude>
+ <exclude>org.apache.avro.reflect.AvroName</exclude>
+ <exclude>org.apache.avro.reflect.AvroSchema</exclude>
+ <exclude>org.apache.avro.reflect.Nullable</exclude>
+ <exclude>org.apache.avro.reflect.Stringable</exclude>
+ <exclude>org.apache.avro.reflect.Union</exclude>
+ </excludes>
+ </relocation>
+ <!--Avro transitive dependencies-->
+ <relocation>
+ <pattern>org.codehaus.jackson</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.codehaus.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.thoughtworks.paranamer</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.thoughtworks.paranamer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.xerial.snappy</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.xerial.snappy</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.tukaani</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.bookkeeper</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- This plugin is used to run a script after the package phase in order to rename
+ libnetty_transport_native_epoll_x86_64.so from Netty into
+ liborg_apache_pulsar_shade_netty_transport_native_epoll_x86_64.so
+ to reflect the shade that is being applied.
+ -->
+ <artifactId>exec-maven-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <executions>
+ <execution>
+ <id>rename-epoll-library</id>
+ <phase>package</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${project.parent.basedir}/../src/${rename.netty.native.libs}</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/pom.xml
similarity index 59%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/pom.xml
index e8cffb9..8b9a009 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/pom.xml
@@ -31,37 +31,47 @@
<relativePath>..</relativePath>
</parent>
- <artifactId>pulsar-client-kafka-original</artifactId>
- <name>Pulsar Kafka compatibility :: API (original)</name>
+ <artifactId>pulsar-client-kafka-tests_0_8</artifactId>
+ <name>Pulsar Kafka-0.8 compatibility :: Tests</name>
- <description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
+ <description>Tests to verify the correct shading configuration for the pulsar-client-kafka wrapper</description>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client</artifactId>
+ <artifactId>pulsar-client-kafka_0_8</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka-client.version}</version>
- <exclusions>
- <exclusion>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.lz4</groupId>
- <artifactId>lz4-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ <scope>test</scope>
</dependency>
+
</dependencies>
</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/HighLevelConsumerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/HighLevelConsumerExample.java
new file mode 100644
index 0000000..b00e19d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/HighLevelConsumerExample.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+public class HighLevelConsumerExample {
+
+ static class Arguments {
+
+ @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+ boolean help;
+
+ @Parameter(names = { "-u", "--service-url" }, description = "Service url", required = false)
+ public String serviceUrl = "pulsar://localhost:6650";
+
+ @Parameter(names = { "-t", "--topic-name" }, description = "Topic name", required = false)
+ public String topicName = "persistent://public/default/test";
+
+ @Parameter(names = { "-g", "--group-name" }, description = "Group name", required = false)
+ public String groupName = "high-level";
+
+ @Parameter(names = { "-m", "--total-messages" }, description = "total number message to publish")
+ public int totalMessages = 1;
+
+ @Parameter(names = { "-a", "--auto-commit-disable" }, description = "auto commit disable")
+ public boolean autoCommitDisable;
+ }
+
+ public static void main(String[] args) {
+
+ final Arguments arguments = new Arguments();
+ JCommander jc = new JCommander(arguments);
+ jc.setProgramName("pulsar-kafka-test");
+
+ try {
+ jc.parse(args);
+ } catch (ParameterException e) {
+ System.out.println(e.getMessage());
+ jc.usage();
+ System.exit(-1);
+ }
+
+ if (arguments.help) {
+ jc.usage();
+ System.exit(-1);
+ }
+
+ consumeMessage(arguments);
+ }
+
+ private static void consumeMessage(Arguments arguments) {
+
+ Properties properties = new Properties();
+ properties.put("zookeeper.connect", arguments.serviceUrl);
+ properties.put("group.id", arguments.groupName);
+ properties.put("consumer.id", "cons1");
+ properties.put("auto.commit.enable", Boolean.toString(!arguments.autoCommitDisable));
+ properties.put("auto.commit.interval.ms", "100");
+ properties.put("queued.max.message.chunks", "100");
+
+ ConsumerConfig conSConfig = new ConsumerConfig(properties);
+ ConsumerConnector connector = Consumer.createJavaConsumerConnector(conSConfig);
+ Map<String, Integer> topicCountMap = Collections.singletonMap(arguments.topicName, 2);
+ Map<String, List<KafkaStream<String, Tweet>>> streams = connector.createMessageStreams(topicCountMap,
+ new StringDecoder(null), new Tweet.TestDecoder());
+
+ int count = 0;
+ while (count < arguments.totalMessages || arguments.totalMessages == -1) {
+ for (int i = 0; i < streams.size(); i++) {
+ List<KafkaStream<String, Tweet>> kafkaStreams = streams.get(arguments.topicName);
+ for (KafkaStream<String, Tweet> kafkaStream : kafkaStreams) {
+ for (MessageAndMetadata<String, Tweet> record : kafkaStream) {
+ log.info("Received tweet: {}-{}", record.message().userName, record.message().message);
+ count++;
+ }
+ }
+ }
+ }
+
+ connector.shutdown();
+
+ log.info("successfully consumed message {}", count);
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(HighLevelConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/LowLevelConsumerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/LowLevelConsumerExample.java
new file mode 100644
index 0000000..d77617d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/LowLevelConsumerExample.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.simple.consumer.PulsarMsgAndOffset;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet.TestDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetCommitRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+
+public class LowLevelConsumerExample {
+
+ static class Arguments {
+ @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+ boolean help;
+
+ @Parameter(names = { "-u", "--service-url" }, description = "Service url", required = false)
+ public String serviceUrl = "pulsar://localhost:6650";
+
+ @Parameter(names = { "-hu",
+ "--http-service-url" }, description = "Http ervice url to make admin api call", required = false)
+ public String httpServiceUrl = "pulsar://localhost:8080";
+
+ @Parameter(names = { "-t", "--topic-name" }, description = "Topic name", required = false)
+ public String topicName = "persistent://public/default/test";
+
+ @Parameter(names = { "-g", "--group-name" }, description = "Group name", required = false)
+ public String groupName = "low-level";
+
+ @Parameter(names = { "-m", "--total-messages" }, description = "total number message to publish")
+ public int totalMessages = 1;
+
+ @Parameter(names = { "-p",
+ "--partition-index" }, description = "Partition-index (-1 if topic is not partitioned)", required = false)
+ public int partitionIndex = -1;
+ }
+
+ public static void main(String[] args) {
+
+ final Arguments arguments = new Arguments();
+ JCommander jc = new JCommander(arguments);
+ jc.setProgramName("pulsar-kafka-test");
+
+ try {
+ jc.parse(args);
+ } catch (ParameterException e) {
+ System.out.println(e.getMessage());
+ jc.usage();
+ System.exit(-1);
+ }
+
+ if (arguments.help) {
+ jc.usage();
+ System.exit(-1);
+ }
+
+ try {
+ consumeMessage(arguments);
+ } catch (Exception e) {
+ log.error("Failed to consume message", e);
+ }
+ }
+
+ private static void consumeMessage(Arguments arguments) {
+
+ Properties properties = new Properties();
+ properties.put(SimpleConsumer.HTTP_SERVICE_URL, arguments.httpServiceUrl);
+ SimpleConsumer consumer = new SimpleConsumer(arguments.serviceUrl, 0, 0, 0, "clientId", properties);
+
+ long readOffset = kafka.api.OffsetRequest.EarliestTime();
+ kafka.api.FetchRequest fReq = new FetchRequestBuilder().clientId("c1")
+ .addFetch(arguments.topicName, arguments.partitionIndex, readOffset, 100000).build();
+ FetchResponse fetchResponse = consumer.fetch(fReq);
+
+ TestDecoder decoder = new TestDecoder();
+ int count = 0;
+ while (count < arguments.totalMessages || arguments.totalMessages == -1) {
+ // 1. Read from topic without subscription/consumer-group name.
+ for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(arguments.topicName,
+ arguments.partitionIndex)) {
+ MessageId msgIdOffset = (messageAndOffset instanceof PulsarMsgAndOffset)
+ ? ((PulsarMsgAndOffset) messageAndOffset).getFullOffset()
+ : null;
+ long currentOffset = messageAndOffset.offset();
+ if (currentOffset < readOffset) {
+ continue;
+ }
+
+ ByteBuffer payload = messageAndOffset.message().payload();
+
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ Tweet tweet = decoder.fromBytes(bytes);
+ log.info("Received tweet: {}-{}", tweet.userName, tweet.message);
+ count++;
+
+ TopicAndPartition topicPartition = new TopicAndPartition(arguments.topicName, arguments.partitionIndex);
+ OffsetMetadataAndError offsetError = new OffsetMetadataAndError(msgIdOffset, null, (short) 0);
+ Map<TopicAndPartition, OffsetMetadataAndError> requestInfo = Collections.singletonMap(topicPartition,
+ offsetError);
+ // 2. Commit offset for a given topic and subscription-name/consumer-name.
+ OffsetCommitRequest offsetReq = new OffsetCommitRequest(arguments.groupName, requestInfo, (short) -1, 0,
+ "c1");
+ consumer.commitOffsets(offsetReq);
+ }
+ }
+
+ consumer.close();
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(LowLevelConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
new file mode 100644
index 0000000..360f79d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Tweet.TestEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.StringEncoder;
+
+public class ProducerExample {
+
+ public static final String BROKER_URL = "metadata.broker.list";
+ public static final String PRODUCER_TYPE = "producer.type";
+ public static final String SERIALIZER_CLASS = "serializer.class";
+ public static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+ public static final String PARTITIONER_CLASS = "partitioner.class";
+ public static final String COMPRESSION_CODEC = "compression.codec";
+ public static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+ public static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+ public static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+ public static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+ public static final String CLIENT_ID = "client.id";
+
+ static class Arguments {
+
+ @Parameter(names = { "-u", "--service-url" }, description = "Service url", required = false)
+ public String serviceUrl = "pulsar://localhost:6650";
+
+ @Parameter(names = { "-t", "--topic-name" }, description = "Topic name", required = false)
+ public String topicName = "persistent://public/default/test";
+
+ @Parameter(names = { "-m", "--total-messages" }, description = "total number message to publish")
+ public int totalMessages = 1;
+
+ @Parameter(names = { "-mn", "--message-name" }, description = "Message payload value", required = false)
+ public String messageValue = "Hello-world";
+
+ @Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
+ boolean help;
+ }
+
+ public static void main(String[] args) {
+
+ final Arguments arguments = new Arguments();
+ JCommander jc = new JCommander(arguments);
+ jc.setProgramName("pulsar-kafka-test");
+
+ try {
+ jc.parse(args);
+ } catch (ParameterException e) {
+ System.out.println(e.getMessage());
+ jc.usage();
+ System.exit(-1);
+ }
+
+ if (arguments.help) {
+ jc.usage();
+ System.exit(-1);
+ }
+
+ publishMessage(arguments);
+ }
+
+ private static void publishMessage(Arguments arguments) {
+ // (2) Create producer
+ Properties properties2 = new Properties();
+ properties2.put(BROKER_URL, arguments.serviceUrl);
+ properties2.put(PRODUCER_TYPE, "sync");
+ properties2.put(SERIALIZER_CLASS, TestEncoder.class.getName());
+ properties2.put(KEY_SERIALIZER_CLASS, StringEncoder.class.getName());
+ properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+ properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+ properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+ properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+ properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+ properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+ properties2.put(CLIENT_ID, "test");
+ ProducerConfig config = new ProducerConfig(properties2);
+ Producer<String, Tweet> producer = new Producer<>(config);
+
+ String name = "user";
+ String msg = arguments.messageValue;
+ for (int i = 0; i < arguments.totalMessages; i++) {
+ String sendMessage = msg + i;
+ Tweet tweet = new Tweet(name, sendMessage);
+ KeyedMessage<String, Tweet> message = new KeyedMessage<>(arguments.topicName, name, tweet);
+ producer.send(message);
+ }
+
+ producer.close();
+ log.info("Successfully published messages {}", arguments.totalMessages);
+
+ }
+
+ public static class TestPartitioner implements Partitioner {
+ @Override
+ public int partition(Object obj, int totalPartition) {
+ return obj.hashCode() % totalPartition;
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Tweet.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Tweet.java
new file mode 100644
index 0000000..d23411e
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_8/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Tweet.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+
+public class Tweet implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public String userName;
+ public String message;
+
+ public Tweet(String userName, String message) {
+ super();
+ this.userName = userName;
+ this.message = message;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(userName, message);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tweet) {
+ Tweet tweet = (Tweet) obj;
+ return Objects.equal(this.userName, tweet.userName) && Objects.equal(this.message, tweet.message);
+ }
+ return false;
+ }
+
+ public static class TestEncoder implements Encoder<Tweet> {
+ @Override
+ public byte[] toBytes(Tweet tweet) {
+ return (tweet.userName + "," + tweet.message).getBytes();
+ }
+ }
+
+ public static class TestDecoder implements Decoder<Tweet> {
+ @Override
+ public Tweet fromBytes(byte[] input) {
+ String[] tokens = (new String(input)).split(",");
+ return new Tweet(tokens[0], tokens[1]);
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index e8cffb9..8f03825 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -42,6 +42,11 @@
<artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/pom.xml
similarity index 56%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_8/pom.xml
index e8cffb9..dbc1108 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/pom.xml
@@ -30,23 +30,76 @@
<version>2.5.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
+
+ <properties>
+ <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
+ </properties>
- <artifactId>pulsar-client-kafka-original</artifactId>
- <name>Pulsar Kafka compatibility :: API (original)</name>
+ <artifactId>pulsar-client-kafka_0_8-original</artifactId>
+ <name>Pulsar Kafka compatibility 0.8 :: API (original)</name>
<description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
<dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-admin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka-client.version}</version>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${kafka_0_8.version}</version>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java
new file mode 100644
index 0000000..1ca0754
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.producer.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.TopicFilter;
+import kafka.serializer.Decoder;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * It replaces : kafka.javaapi.consumer.ConsumerConnector but not extending kafka-interface because its method has
+ * KafkaStream api signature and KafkaStream is a scala class which creates unresolvable dependency conflict src:
+ * https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+ */
+@Slf4j
+public class ConsumerConnector {
+
+ private final PulsarClient client;
+ private final boolean isAutoCommit;
+ private final ConsumerBuilder<byte[]> consumerBuilder;
+ private String clientId;
+ private String groupId;
+ @SuppressWarnings("rawtypes")
+ private final Set<PulsarKafkaStream> topicStreams;
+ private SubscriptionInitialPosition strategy = null;
+ private final ScheduledExecutorService executor;
+
+ public ConsumerConnector(ConsumerConfig config) {
+ checkNotNull(config, "ConsumerConfig can't be null");
+ clientId = config.clientId();
+ groupId = config.groupId();
+ isAutoCommit = config.autoCommitEnable();
+ if ("largest".equalsIgnoreCase(config.autoOffsetReset())) {
+ strategy = SubscriptionInitialPosition.Latest;
+ } else if ("smallest".equalsIgnoreCase(config.autoOffsetReset())) {
+ strategy = SubscriptionInitialPosition.Earliest;
+ }
+ String consumerId = !config.consumerId().isEmpty() ? config.consumerId().get() : null;
+ int maxMessage = config.queuedMaxMessages();
+ String serviceUrl = config.zkConnect();
+
+ Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
+ : new Properties();
+ try {
+ client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
+ } catch (PulsarClientException e) {
+ throw new IllegalArgumentException(
+ "Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
+ }
+
+ topicStreams = Sets.newConcurrentHashSet();
+ consumerBuilder = client.newConsumer();
+ consumerBuilder.subscriptionName(groupId);
+ if (properties.containsKey("queued.max.message.chunks") && config.queuedMaxMessages() > 0) {
+ consumerBuilder.receiverQueueSize(maxMessage);
+ }
+ if (consumerId != null) {
+ consumerBuilder.consumerName(consumerId);
+ }
+ if (properties.containsKey("auto.commit.interval.ms") && config.autoCommitIntervalMs() > 0) {
+ consumerBuilder.acknowledgmentGroupTime(config.autoCommitIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+ this.executor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-kafka"));
+ }
+
+ public <K, V> Map<String, List<PulsarKafkaStream<byte[], byte[]>>> createMessageStreams(
+ Map<String, Integer> topicCountMap) {
+ return createMessageStreamsByFilter(null, topicCountMap, null, null);
+ }
+
+ public <K, V> Map<String, List<PulsarKafkaStream<K, V>>> createMessageStreams(Map<String, Integer> topicCountMap,
+ Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
+ return createMessageStreamsByFilter(null, topicCountMap, keyDecoder, valueDecoder);
+ }
+
+ public <K, V> Map<String, List<PulsarKafkaStream<K, V>>> createMessageStreamsByFilter(TopicFilter topicFilter,
+ Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
+
+ Map<String, List<PulsarKafkaStream<K, V>>> streams = Maps.newHashMap();
+
+ topicCountMap.forEach((topic, count) -> {
+ try {
+ Consumer<byte[]> consumer = consumerBuilder.topic(topic).subscribe();
+ resetOffsets(consumer, strategy);
+ log.info("Creating stream for {}-{} with config {}", topic, groupId, consumerBuilder.toString());
+ for (int i = 0; i < count; i++) {
+ PulsarKafkaStream<K, V> stream = new PulsarKafkaStream<>(keyDecoder, valueDecoder, consumer,
+ isAutoCommit, clientId);
+ // if multiple thread-count present then client expects multiple streams reading from the same
+ // topic. so, create multiple stream using the same consumer
+ streams.computeIfAbsent(topic, key -> Lists.newArrayList()).add(stream);
+ topicStreams.add(stream);
+ }
+ } catch (PulsarClientException e) {
+ log.error("Failed to subscribe on topic {} with group-id {}, {}", topic, groupId, e.getMessage(), e);
+ throw new RuntimeException("Failed to subscribe on topic " + topic, e);
+ }
+ });
+ return streams;
+ }
+
+ public List<PulsarKafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter) {
+ throw new UnsupportedOperationException("method not supported");
+ }
+
+ public List<PulsarKafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int arg1) {
+ throw new UnsupportedOperationException("method not supported");
+ }
+
+ public <K, V> List<PulsarKafkaStream<K, V>> createMessageStreamsByFilter(TopicFilter topicFilter, int arg1,
+ Decoder<K> keyDecoder, Decoder<V> valueDecoder) {
+ throw new UnsupportedOperationException("method not supported");
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<CompletableFuture<Void>> commitOffsetsAsync() {
+ return topicStreams.stream().map(stream -> (CompletableFuture<Void>) stream.commitOffsets())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Commit the offsets of all broker partitions connected by this connector.
+ */
+ public void commitOffsets() {
+ commitOffsetsAsync();
+ }
+
+ public void commitOffsets(boolean retryOnFailure) {
+ FutureUtil.waitForAll(commitOffsetsAsync()).handle((res, ex) -> {
+ if (ex != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to commit offset {}, retrying {}", ex.getMessage(), retryOnFailure);
+ }
+ if (retryOnFailure) {
+ this.executor.schedule(() -> commitOffsets(retryOnFailure), 30, TimeUnit.SECONDS);
+ }
+ }
+ return null;
+ });
+ }
+
+ /**
+ * Shut down the connector
+ */
+ public void shutdown() {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ if (topicStreams != null) {
+ topicStreams.forEach(stream -> {
+ try {
+ stream.close();
+ } catch (Exception e) {
+ log.warn("Failed to close stream {}, {}", stream, e.getMessage());
+ }
+ });
+ }
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close client {}", e.getMessage());
+ }
+ }
+
+ private void resetOffsets(Consumer<byte[]> consumer, SubscriptionInitialPosition strategy) {
+ if (strategy == null) {
+ return;
+ }
+ log.info("Resetting partition {} for group-id {} and seeking to {} position", consumer.getTopic(),
+ consumer.getSubscription(), strategy);
+ try {
+ if (strategy == SubscriptionInitialPosition.Earliest) {
+ consumer.seek(MessageId.earliest);
+ } else {
+ consumer.seek(MessageId.latest);
+ }
+ } catch (PulsarClientException e) {
+ log.warn("Failed to reset offset for consumer {} to {}, {}", consumer.getTopic(), strategy,
+ e.getMessage(), e);
+ }
+ }
+
+ @VisibleForTesting
+ public ConsumerBuilder<byte[]> getConsumerBuilder() {
+ return consumerBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerIterator.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerIterator.java
new file mode 100644
index 0000000..926e765
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerIterator.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Base64;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+
+import kafka.serializer.Decoder;
+import kafka.serializer.StringDecoder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ConsumerIterator<K, V> implements Iterator<PulsarMessageAndMetadata<K, V>>, AutoCloseable {
+
+ private final Consumer<byte[]> consumer;
+ private final ConcurrentLinkedQueue<Message<byte[]>> receivedMessages;
+ private final Optional<Decoder<K>> keyDeSerializer;
+ private final Optional<Decoder<V>> valueDeSerializer;
+ private final boolean isAutoCommit;
+ private volatile MessageId lastConsumedMessageId;
+ private static final kafka.serializer.DefaultDecoder DEFAULT_DECODER = new kafka.serializer.DefaultDecoder(null);
+
+ public ConsumerIterator(Consumer<byte[]> consumer, ConcurrentLinkedQueue<Message<byte[]>> receivedMessages,
+ Optional<Decoder<K>> keyDeSerializer, Optional<Decoder<V>> valueDeSerializer, boolean isAutoCommit) {
+ this.consumer = consumer;
+ this.receivedMessages = receivedMessages;
+ this.keyDeSerializer = keyDeSerializer;
+ this.valueDeSerializer = valueDeSerializer;
+ this.isAutoCommit = isAutoCommit;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ Message<byte[]> msg = consumer.receive(10, TimeUnit.MILLISECONDS);
+ if (msg != null) {
+ receivedMessages.offer(msg);
+ return true;
+ }
+ } catch (PulsarClientException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to receive message for {}-{}, {}", consumer.getTopic(), consumer.getSubscription(),
+ e.getMessage());
+ }
+ }
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public PulsarMessageAndMetadata<K, V> next() {
+
+ Message<byte[]> msg = receivedMessages.poll();
+ if (msg == null) {
+ try {
+ msg = consumer.receive();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to receive message for {}-{}, {}", consumer.getTopic(), consumer.getSubscription(),
+ e.getMessage(), e);
+ throw new RuntimeException(
+ "failed to receive message from " + consumer.getTopic() + "-" + consumer.getSubscription());
+ }
+ }
+
+ int partition = TopicName.getPartitionIndex(consumer.getTopic());
+ long offset = MessageIdUtils.getOffset(msg.getMessageId());
+ String key = msg.getKey();
+ byte[] value = msg.getValue();
+
+ K desKey = null;
+ V desValue = null;
+
+ if (StringUtils.isNotBlank(key)) {
+ if (keyDeSerializer.isPresent() && keyDeSerializer.get() instanceof StringDecoder) {
+ desKey = (K) key;
+ } else {
+ byte[] decodedBytes = Base64.getDecoder().decode(key);
+ desKey = keyDeSerializer.isPresent() ? keyDeSerializer.get().fromBytes(decodedBytes)
+ : (K) DEFAULT_DECODER.fromBytes(decodedBytes);
+ }
+ }
+
+ if (value != null) {
+ desValue = valueDeSerializer.isPresent() ? valueDeSerializer.get().fromBytes(msg.getData())
+ : (V) DEFAULT_DECODER.fromBytes(msg.getData());
+ }
+
+ PulsarMessageAndMetadata<K, V> msgAndMetadata = new PulsarMessageAndMetadata<>(consumer.getTopic(), partition,
+ null, offset, keyDeSerializer.orElse(null), valueDeSerializer.orElse(null), desKey, desValue);
+
+ if (isAutoCommit) {
+ // Commit the offset of previously dequeued messages
+ consumer.acknowledgeCumulativeAsync(msg);
+ }
+
+ lastConsumedMessageId = msg.getMessageId();
+ return msgAndMetadata;
+ }
+
+ protected CompletableFuture<Void> commitOffsets() {
+ MessageId msgId = lastConsumedMessageId;
+ if (msgId != null) {
+ return this.consumer.acknowledgeCumulativeAsync(msgId);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerConfig.java
new file mode 100644
index 0000000..8aa0359
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerConfig.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Properties;
+
+public class PulsarConsumerConfig extends kafka.consumer.ConsumerConfig {
+
+ public PulsarConsumerConfig(Properties originalProps) {
+ super(originalProps);
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
new file mode 100644
index 0000000..cdd62a4
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+/**
+ * This class replaces "kafka.consumer.Consumer" and not extending kafka-interface as it's a final scala class.
+ *
+ */
+public class PulsarKafkaConsumer {
+
+ public static ConsumerConnector createJavaConsumerConnector(PulsarConsumerConfig config) {
+ return new ConsumerConnector(config);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaStream.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaStream.java
new file mode 100644
index 0000000..3a1231d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaStream.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+
+import com.google.common.collect.Queues;
+
+import kafka.serializer.Decoder;
+
+/**
+ * We can't extends "kafka.consumer.KafkaStream<K,V>" because it's a scala class which brings ambiguous overriden
+ * methods that gives compilation errors.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class PulsarKafkaStream<K, V> implements Iterable<PulsarMessageAndMetadata<K, V>>, AutoCloseable {
+
+ private final Optional<Decoder<K>> keyDeSerializer;
+ private final Optional<Decoder<V>> valueDeSerializer;
+ private final ConsumerIterator<K, V> iterator;
+
+ private final ConcurrentLinkedQueue<Message<byte[]>> receivedMessages = Queues.newConcurrentLinkedQueue();
+
+ public PulsarKafkaStream(Decoder<K> keyDecoder, Decoder<V> valueDecoder, Consumer<byte[]> consumer,
+ boolean isAutoCommit, String clientId) {
+ this.keyDeSerializer = Optional.ofNullable(keyDecoder);
+ this.valueDeSerializer = Optional.ofNullable(valueDecoder);
+ this.iterator = new ConsumerIterator<>(consumer, receivedMessages, keyDeSerializer, valueDeSerializer,
+ isAutoCommit);
+ }
+
+ @Override
+ public ConsumerIterator<K, V> iterator() {
+ return iterator;
+ }
+
+ public CompletableFuture<Void> commitOffsets() {
+ return iterator.commitOffsets();
+ }
+
+ @Override
+ public void close() throws Exception {
+ iterator.close();
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarMessageAndMetadata.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarMessageAndMetadata.java
new file mode 100644
index 0000000..2c76501
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/PulsarMessageAndMetadata.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import kafka.message.Message;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.Decoder;
+
+public class PulsarMessageAndMetadata<K, V> extends MessageAndMetadata<K, V> {
+
+ private static final long serialVersionUID = 1L;
+ private final String topic;
+ private final int partition;
+ private final long offset;
+ private final K key;
+ private final V value;
+ private final Decoder<K> keyDecoder;
+ private final Decoder<V> valueDecoder;
+
+ public PulsarMessageAndMetadata(String topic, int partition, Message rawMessage, long offset, Decoder<K> keyDecoder,
+ Decoder<V> valueDecoder, K key, V value) {
+ super(topic, partition, rawMessage, offset, keyDecoder, valueDecoder);
+ this.topic = topic;
+ this.partition = partition;
+ this.offset = offset;
+ this.keyDecoder = keyDecoder;
+ this.valueDecoder = valueDecoder;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public int productArity() {
+ return 0;
+ }
+
+ @Override
+ public int partition() {
+ return partition;
+ }
+
+ @Override
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public V message() {
+ return this.value;
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public Decoder<V> valueDecoder() {
+ return this.valueDecoder;
+ }
+
+ @Override
+ public Decoder<K> keyDecoder() {
+ return this.keyDecoder;
+ }
+
+ @Override
+ public Message rawMessage$1() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java
new file mode 100644
index 0000000..c71d7a7
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarClientKafkaConfig {
+
+ /// Config variables
+ public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
+ public static final String AUTHENTICATION_PARAMS_MAP = "pulsar.authentication.params.map";
+ public static final String AUTHENTICATION_PARAMS_STRING = "pulsar.authentication.params.string";
+ public static final String USE_TLS = "pulsar.use.tls";
+ public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
+ public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+ public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
+
+ public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
+ public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
+ public static final String NUM_IO_THREADS = "pulsar.num.io.threads";
+
+ public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker";
+ public static final String ADMIN_SERVICE_URL = "pulsar.admin.service.url";
+
+ public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";
+
+ public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
+ public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
+
+ public static ClientBuilder getClientBuilder(Properties properties) {
+ ClientBuilder clientBuilder = PulsarClient.builder();
+ if (properties == null) {
+ return clientBuilder;
+ }
+
+ if (properties.containsKey(AUTHENTICATION_CLASS)) {
+ String className = properties.getProperty(AUTHENTICATION_CLASS);
+ try {
+ if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+ String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+ clientBuilder.authentication(className, authParamsString);
+ } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+ Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+ clientBuilder.authentication(className, authParams);
+ } else {
+ @SuppressWarnings("unchecked")
+ Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+ Authentication auth = clazz.newInstance();
+ clientBuilder.authentication(auth);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+ clientBuilder.allowTlsInsecureConnection(
+ Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+ clientBuilder.enableTlsHostnameVerification(
+ Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+ if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+ clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+ }
+
+ if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+ clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+ TimeUnit.MILLISECONDS);
+ }
+
+ if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
+ clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+ TimeUnit.SECONDS);
+ }
+
+ if (properties.containsKey(NUM_IO_THREADS)) {
+ clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+ }
+
+ if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
+ clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+ }
+
+ if (properties.containsKey(USE_TCP_NODELAY)) {
+ clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+ }
+
+ if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
+ clientBuilder
+ .maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+ }
+
+ if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
+ clientBuilder.maxNumberOfRejectedRequestPerConnection(
+ Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
+ }
+
+ return clientBuilder;
+ }
+
+ public static PulsarAdminBuilder getAdminBuilder(String serviceUrl, Properties properties) {
+ PulsarAdminBuilder adminBuilder = PulsarAdmin.builder();
+ if (properties == null) {
+ return adminBuilder;
+ }
+
+ adminBuilder.serviceHttpUrl(properties.getProperty(ADMIN_SERVICE_URL, serviceUrl));
+
+ if (properties.containsKey(AUTHENTICATION_CLASS)) {
+ String className = properties.getProperty(AUTHENTICATION_CLASS);
+ try {
+ if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+ String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+ adminBuilder.authentication(className, authParamsString);
+ } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+ Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+ adminBuilder.authentication(className, authParams);
+ } else {
+ @SuppressWarnings("unchecked")
+ Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+ Authentication auth = clazz.newInstance();
+ adminBuilder.authentication(auth);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ adminBuilder.allowTlsInsecureConnection(
+ Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+ adminBuilder.enableTlsHostnameVerification(
+ Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+ if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+ adminBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+ }
+
+ if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+ adminBuilder.connectionTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+ TimeUnit.MILLISECONDS);
+ }
+
+ return adminBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
new file mode 100644
index 0000000..bec2c35
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.Base64;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+
+//Questions
+/**
+ * 1. What's the auth method
+ * 2. What if message publish fails with async
+ */
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.DefaultPartitioner;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Encoder;
+import kafka.serializer.StringEncoder;
+import kafka.utils.VerifiableProperties;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarKafkaProducer<K, V> extends Producer<K, V> {
+
+ private final PulsarClient client;
+ private final ProducerBuilder<byte[]> pulsarProducerBuilder;
+ private final Partitioner partitioner;
+ private final Encoder<K> keySerializer;
+ private final Encoder<V> valueSerializer;
+ private final boolean isSendAsync;
+
+ public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS = "queue.buffering.max.ms";
+ public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = "queue.buffering.max.messages";
+ public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages";
+ public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
+
+ private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public PulsarKafkaProducer(ProducerConfig config) {
+ super((kafka.producer.Producer) null);
+ partitioner = config.partitionerClass() != null
+ ? newInstance(config.partitionerClass(), Partitioner.class, config.props())
+ : new DefaultPartitioner(config.props());
+ // kafka-config returns default serializer if client doesn't configure it
+ checkNotNull(config.keySerializerClass(), "key-serializer class can't be null");
+ checkNotNull(config.serializerClass(), "value-serializer class can't be null");
+ keySerializer = newInstance(config.keySerializerClass(), Encoder.class, config.props());
+ valueSerializer = newInstance(config.serializerClass(), Encoder.class, config.props());
+
+ Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
+ : new Properties();
+ String serviceUrl = config.brokerList();
+ try {
+ client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
+ } catch (PulsarClientException e) {
+ throw new IllegalArgumentException(
+ "Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
+ }
+ pulsarProducerBuilder = client.newProducer();
+
+ // doc: https://kafka.apache.org/08/documentation.html#producerapi
+ // api-doc:
+ // https://github.com/apache/kafka/blob/0.8.2.2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+
+ // queue.enqueue.timeout.ms: The amount of time to block before dropping messages when running in async mode and
+ // the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or
+ // dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block
+ // indefinitely and never willingly drop a send.
+ boolean blockIfQueueFull = config.queueEnqueueTimeoutMs() == -1 ? true : false;
+ // This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values
+ // are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we
+ // allow batching together of requests (which is great for throughput) but open the possibility of a failure of
+ // the client machine dropping unsent data.
+ isSendAsync = "async".equalsIgnoreCase(config.producerType());
+ CompressionType compressionType = CompressionType.NONE;
+ // Valid values are "none", "gzip" and "snappy".
+ if ("gzip".equals(config.compressionCodec().name())) {
+ compressionType = CompressionType.ZLIB;
+ } else if ("snappy".equals(config.compressionCodec().name())) {
+ compressionType = CompressionType.SNAPPY;
+ }
+ long batchDelayMs = config.queueBufferingMaxMs();
+
+ if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES)) {
+ pulsarProducerBuilder.maxPendingMessages(config.queueBufferingMaxMessages());
+ }
+ if (properties.containsKey(KAFKA_KEY_MAX_BATCH_MESSAGES)) {
+ pulsarProducerBuilder.batchingMaxMessages(config.batchNumMessages());
+ }
+ if (properties.containsKey(KAFKA_KEY_MAX_QUEUE_BUFFERING_TIME_MS)) {
+ pulsarProducerBuilder.batchingMaxPublishDelay(batchDelayMs, TimeUnit.MILLISECONDS);
+ }
+ if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
+ pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), TimeUnit.MILLISECONDS);
+ }
+
+ pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType);
+
+ }
+
+ public PulsarKafkaProducer(kafka.producer.Producer<K, V> producer) {
+ this(producer.config());
+ }
+
+ @Override
+ public void send(KeyedMessage<K, V> message) {
+ org.apache.pulsar.client.api.Producer<byte[]> producer;
+
+ try {
+ producer = producers.computeIfAbsent(message.topic(), topic -> createNewProducer(topic));
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to create producer for " + message.topic(), e);
+ }
+
+ TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+ buildMessage(messageBuilder, message);
+
+ if (isSendAsync) {
+ // what if message publish fails:
+ // according to : https://kafka.apache.org/08/documentation.html#producerapi
+ // async: opens the possibility of a failure of the client machine dropping unsent data
+ messageBuilder.sendAsync().handle((res, ex) -> {
+ if (ex != null) {
+ log.warn("publish failed for {}", producer.getTopic(), ex);
+ }
+ return null;
+ });
+ } else {
+ try {
+ messageBuilder.send();
+ } catch (PulsarClientException e) {
+ log.warn("publish failed for {}", producer.getTopic(), e);
+ throw new IllegalStateException("Failed to publish message " + message.topic(), e);
+ }
+ }
+
+ }
+
+ @Override
+ public void send(List<KeyedMessage<K, V>> messages) {
+ if (messages != null) {
+ messages.forEach(this::send);
+ }
+ }
+
+ private void buildMessage(TypedMessageBuilder<byte[]> builder, KeyedMessage<K, V> message) {
+ if (message.key() != null) {
+ String key = getKey(message.topic(), message.key());
+ builder.key(key);
+ }
+
+ byte[] value = valueSerializer.toBytes(message.message());
+ builder.value(value);
+ }
+
+ private String getKey(String topic, K key) {
+ // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+ if (keySerializer!=null && keySerializer instanceof StringEncoder) {
+ return (String) key;
+ } else {
+ byte[] keyBytes = keySerializer.toBytes(key);
+ return Base64.getEncoder().encodeToString(keyBytes);
+ }
+ }
+
+ private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
+ try {
+ pulsarProducerBuilder.messageRoutingMode(MessageRoutingMode.CustomPartition);
+ pulsarProducerBuilder.messageRouter(new MessageRouter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+ // https://kafka.apache.org/08/documentation.html#producerapi
+ // The default partitioner is based on the hash of the key.
+ return partitioner.partition(msg.getKey(), metadata.numPartitions());
+ }
+ });
+ log.info("Creating producer for topic {} with config {}", topic, pulsarProducerBuilder.toString());
+ return pulsarProducerBuilder.clone().topic(topic).create();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public <T> T newInstance(String key, Class<T> t, VerifiableProperties properties) {
+ Class<?> c = null;
+ try {
+ c = Class.forName(key);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("class not found for :" + key);
+ }
+ if (c == null)
+ return null;
+ Object o = newInstance(c, properties);
+ if (!t.isInstance(o)) {
+ throw new IllegalArgumentException(c.getName() + " is not an instance of " + t.getName());
+ }
+ return t.cast(o);
+ }
+
+ public static <T> T newInstance(Class<T> c, VerifiableProperties properties) {
+ try {
+ try {
+ Constructor<T> constructor = c.getConstructor(VerifiableProperties.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(properties);
+ } catch (Exception e) {
+ // Ok.. not a default implementation class
+ }
+ return c.newInstance();
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Could not instantiate class " + c.getName(), e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(
+ "Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?",
+ e);
+ } catch (NullPointerException e) {
+ throw new IllegalArgumentException("Requested class was null", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (producers != null) {
+ producers.forEach((topic, producer) -> {
+ try {
+ producer.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close producer for {}", topic, e.getMessage());
+ }
+ });
+ }
+ if (client != null) {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close pulsar-client {}", e.getMessage());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public ProducerBuilder<byte[]> getPulsarProducerBuilder() {
+ return pulsarProducerBuilder;
+ }
+
+ @VisibleForTesting
+ public Partitioner getPartitioner() {
+ return partitioner;
+ }
+
+ @VisibleForTesting
+ public Encoder<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ @VisibleForTesting
+ public Encoder<V> getValueSerializer() {
+ return valueSerializer;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarByteBufferMessageSet.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarByteBufferMessageSet.java
new file mode 100644
index 0000000..68ee247
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarByteBufferMessageSet.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+
+import com.google.common.collect.Queues;
+
+import kafka.message.MessageAndOffset;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarByteBufferMessageSet extends kafka.javaapi.message.ByteBufferMessageSet {
+ private final MessageAndOffsetIterator iterator;
+
+ public PulsarByteBufferMessageSet(Reader<byte[]> reader) {
+ super(Collections.emptyList());
+ this.iterator = new MessageAndOffsetIterator(reader);
+ }
+
+ @Override
+ public Iterator<MessageAndOffset> iterator() {
+ return iterator;
+ }
+
+ public static class MessageAndOffsetIterator implements Iterator<MessageAndOffset> {
+
+ private final Reader<byte[]> reader;
+ private final ConcurrentLinkedQueue<org.apache.pulsar.client.api.Message<byte[]>> receivedMessages;
+
+ public MessageAndOffsetIterator(Reader<byte[]> reader) {
+ this.reader = reader;
+ this.receivedMessages = Queues.newConcurrentLinkedQueue();
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ org.apache.pulsar.client.api.Message<byte[]> msg = reader.readNext(10, TimeUnit.MILLISECONDS);
+ if (msg != null) {
+ receivedMessages.offer(msg);
+ return true;
+ }
+ } catch (PulsarClientException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to receive message for {}, {}", reader.getTopic(), e.getMessage());
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public PulsarMsgAndOffset next() {
+
+ org.apache.pulsar.client.api.Message<byte[]> msg = receivedMessages.poll();
+ if (msg == null) {
+ try {
+ msg = reader.readNext();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to receive message for {}, {}", reader.getTopic(), e.getMessage(), e);
+ throw new RuntimeException("failed to receive message from " + reader.getTopic());
+ }
+ }
+
+ String key = msg.getKey();
+ byte[] value = msg.getValue();
+
+ return new PulsarMsgAndOffset(new PulsarMessage(key, value), msg.getMessageId());
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarFetchResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarFetchResponse.java
new file mode 100644
index 0000000..6f47e8b
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarFetchResponse.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import org.apache.pulsar.client.api.Reader;
+
+import kafka.javaapi.FetchResponse;
+
+public class PulsarFetchResponse extends FetchResponse {
+
+ private final Map<String, Reader<byte[]>> topicReaderMap;
+ private boolean hasError = false;
+
+ public PulsarFetchResponse(Map<String, Reader<byte[]>> topicConsumerMap, boolean hasError) {
+ super(null);
+ this.topicReaderMap = topicConsumerMap;
+ this.hasError = hasError;
+ }
+
+ @Override
+ public boolean hasError() {
+ return this.hasError;
+ }
+
+ @Override
+ public PulsarByteBufferMessageSet messageSet(String topic, int partition) {
+ return new PulsarByteBufferMessageSet(topicReaderMap.get(topic));
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java
new file mode 100644
index 0000000..b2d39a3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kafka.clients.producer.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
+
+import com.google.common.collect.Maps;
+
+import kafka.api.FetchRequest;
+import kafka.api.PartitionFetchInfo;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetCommitResponse;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Note: <br/>
+ * - SimpleConsumer doesn't work well with pulsar-batch messages because client-app uses raw-offset to commit offset for
+ * a given group-id. <br/>
+ * - In order to work with partitioned-topic and batch messages: use custom api
+ * {@link #PulsarMsgAndOffset::getFullOffset()} to fetch offset with {@link MessageId} and commit offset with the same
+ * message-Id {@link PulsarOffsetMetadataAndError::PulsarOffsetMetadataAndError(messageId..)}.
+ *
+ */
+@Slf4j
+public class PulsarKafkaSimpleConsumer extends SimpleConsumer {
+
+ private final String host;
+ private final int port;
+ private final String clientId;
+ private final PulsarClient client;
+ private final PulsarAdmin admin;
+ private final Map<TopicGroup, Consumer<byte[]>> topicConsumerMap;
+ private final SubscriptionType subscriptionType;
+ public static final String SUBSCRIPTION_TYPE = "pulsar.subscription.type";
+ public static final String HTTP_SERVICE_URL = "pulsar.http.service.url";
+
+ public PulsarKafkaSimpleConsumer(String host, int port, int soTimeout, int bufferSize, String clientId) {
+ this(host, port, soTimeout, bufferSize, clientId, new Properties());
+ }
+
+ /**
+ *
+ * @param host
+ * pulsar-broker service url
+ * @param port
+ * n/a
+ * @param soTimeout
+ * n/a
+ * @param bufferSize
+ * n/a
+ * @param clientId
+ * client-id
+ * @param properties
+ * properties to retrieve authentication params by {@link PulsarClientKafkaConfig}
+ */
+ public PulsarKafkaSimpleConsumer(String host, int port, int soTimeout, int bufferSize, String clientId,
+ Properties properties) {
+ super(host, port, soTimeout, bufferSize, clientId);
+ this.host = host;
+ this.port = port;
+ this.clientId = clientId;
+ try {
+ client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(host).build();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to create pulsar client for {} and properties {}", host, properties);
+ throw new RuntimeException("Failed to create pulsar client " + host, e);
+ }
+ try {
+ String url = properties.getProperty(HTTP_SERVICE_URL, host);
+ admin = PulsarClientKafkaConfig.getAdminBuilder(url, properties).build();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to create pulsar admin for {} and properties {}", host, properties);
+ throw new RuntimeException("Failed to create pulsar admin " + host, e);
+ }
+ this.topicConsumerMap = new ConcurrentHashMap<>(8, 0.75f, 1);
+ this.subscriptionType = getSubscriptionType(properties);
+ }
+
+ @Override
+ public PulsarFetchResponse fetch(FetchRequest request) {
+ try {
+ Map<String, Reader<byte[]>> topicReaderMap = createTopicReaders(request);
+ return new PulsarFetchResponse(topicReaderMap, false);
+ } catch (Exception e) {
+ log.warn("Failed to process fetch request{}, {}", request, e.getMessage());
+ return new PulsarFetchResponse(null, true);
+ }
+ }
+
+ private Map<String, Reader<byte[]>> createTopicReaders(FetchRequest request) {
+ Map<String, Reader<byte[]>> topicReaderMap = Maps.newHashMap();
+ scala.collection.immutable.Map<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> reqInfo = request
+ .requestInfoGroupedByTopic();
+ Map<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> topicPartitionMap = scala.collection.JavaConverters
+ .mapAsJavaMapConverter(reqInfo).asJava();
+ for (Entry<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> topicPartition : topicPartitionMap
+ .entrySet()) {
+ final String topicName = topicPartition.getKey();
+ Map<TopicAndPartition, PartitionFetchInfo> topicOffsetMap = scala.collection.JavaConverters
+ .mapAsJavaMapConverter(topicPartition.getValue()).asJava();
+ if (topicOffsetMap != null && !topicOffsetMap.isEmpty()) {
+ // pulsar-kafka adapter doesn't deal with partition so, assuming only 1 topic-metadata per topic name
+ Entry<TopicAndPartition, PartitionFetchInfo> topicOffset = topicOffsetMap.entrySet().iterator().next();
+ long offset = topicOffset.getValue().offset();
+ String topic = getTopicName(topicOffset.getKey());
+ MessageId msgId = getMessageId(offset);
+ try {
+ Reader<byte[]> reader = client.newReader().readerName(clientId).topic(topic).startMessageId(msgId)
+ .create();
+ log.info("Successfully created reader for {} at msg-id {}", topic, msgId);
+ topicReaderMap.put(topicName, reader);
+ } catch (PulsarClientException e) {
+ log.warn("Failed to create reader for topic {}", topic, e);
+ throw new RuntimeException("Failed to create reader for " + topic, e);
+ }
+ }
+ }
+ return topicReaderMap;
+ }
+
+ private MessageId getMessageId(long offset) {
+ if (kafka.api.OffsetRequest.EarliestTime() == offset) {
+ return MessageId.earliest;
+ } else if (kafka.api.OffsetRequest.LatestTime() == offset) {
+ return MessageId.latest;
+ } else {
+ return MessageIdUtils.getMessageId(offset);
+ }
+ }
+
+ @Override
+ public PulsarTopicMetadataResponse send(TopicMetadataRequest request) {
+ List<String> topics = request.topics();
+ PulsarTopicMetadataResponse response = new PulsarTopicMetadataResponse(admin, host, port, topics);
+ return response;
+ }
+
+ // It's @Overriden method of: OffsetResponse getOffsetsBefore(OffsetRequest or)
+ public PulsarOffsetResponse getOffsetsBefore(PulsarOffsetRequest or) {
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> request = or.getRequestInfo();
+ Map<TopicAndPartition, Long> offsetResoponse = Maps.newHashMap();
+ for (Entry<TopicAndPartition, PartitionOffsetRequestInfo> topicPartitionRequest : request.entrySet()) {
+ TopicAndPartition topic = topicPartitionRequest.getKey();
+ long time = topicPartitionRequest.getValue().time();
+ if (time != kafka.api.OffsetRequest.EarliestTime() && time != kafka.api.OffsetRequest.LatestTime()) {
+ throw new IllegalArgumentException("Time has to be from EarliestTime or LatestTime");
+ }
+ offsetResoponse.put(topic, time);
+ }
+ return new PulsarOffsetResponse(offsetResoponse);
+ }
+
+ /**
+ * <pre>
+ * Overriden method: OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
+ *
+ * Note:
+ * created PulsarOffsetCommitResponse as OffsetCommitRequest doesn't provide getters
+ *
+ * </pre>
+ */
+ public OffsetCommitResponse commitOffsets(PulsarOffsetCommitRequest request) {
+
+ PulsarOffsetCommitResponse response = new PulsarOffsetCommitResponse(null);
+ for (Entry<String, MessageId> topicOffset : request.getTopicOffsetMap().entrySet()) {
+ final String topic = topicOffset.getKey();
+ final String groupId = request.getGroupId();
+ try {
+ Consumer<byte[]> consumer = getConsumer(topic, groupId);
+ consumer.acknowledgeCumulative(topicOffset.getValue());
+ } catch (Exception e) {
+ log.warn("Failed to ack message for topic {}-{}", topic, topicOffset.getValue(), e);
+ response.hasError = true;
+ TopicAndPartition topicPartition = new TopicAndPartition(topic, 0);
+ response.errors.computeIfAbsent(topicPartition, tp -> ErrorMapping.UnknownCode());
+ }
+ }
+
+ return response;
+ }
+
+ /**
+ * <pre>
+ * Overriden method: OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
+ *
+ * Note:
+ * created PulsarOffsetFetchRequest as OffsetFetchRequest doesn't have getters for any field
+ * and PulsarOffsetFetchResponse created as base-class doesn't have setters to set state
+ * @param request
+ * @return
+ *
+ * </pre>
+ */
+ public PulsarOffsetFetchResponse fetchOffsets(PulsarOffsetFetchRequest request) {
+ final String groupId = request.groupId;
+ Map<TopicAndPartition, OffsetMetadataAndError> responseMap = Maps.newHashMap();
+ PulsarOffsetFetchResponse response = new PulsarOffsetFetchResponse(responseMap);
+ for (TopicAndPartition topicMetadata : request.requestInfo) {
+ final String topicName = getTopicName(topicMetadata);
+ try {
+ PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
+ CursorStats cursor = stats.cursors != null ? stats.cursors.get(groupId) : null;
+ if (cursor != null) {
+ String readPosition = cursor.readPosition;
+ MessageId msgId = null;
+ if (readPosition != null && readPosition.contains(":")) {
+ try {
+ String[] position = readPosition.split(":");
+ msgId = new MessageIdImpl(Long.parseLong(position[0]), Long.parseLong(position[1]), -1);
+ } catch (Exception e) {
+ log.warn("Invalid read-position {} for {}-{}", readPosition, topicName, groupId);
+ }
+ }
+ msgId = msgId == null ? MessageId.earliest : msgId;
+ OffsetMetadataAndError oE = new OffsetMetadataAndError(MessageIdUtils.getOffset(msgId), null,
+ ErrorMapping.NoError());
+ responseMap.put(topicMetadata, oE);
+ }
+ } catch (Exception e) {
+ OffsetMetadataAndError oE = new OffsetMetadataAndError(0, null, ErrorMapping.UnknownCode());
+ responseMap.put(topicMetadata, oE);
+ }
+ }
+ return response;
+ }
+
+ public static String getTopicName(TopicAndPartition topicMetadata) {
+ return topicMetadata.partition() > -1
+ ? TopicName.get(topicMetadata.topic()).getPartition(topicMetadata.partition()).toString()
+ : topicMetadata.topic();
+ }
+
+ @Override
+ public void close() {
+
+ if (topicConsumerMap != null) {
+ topicConsumerMap.forEach((topic, consumer) -> {
+ try {
+ consumer.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close consumer for topic {}", topic, e);
+ }
+ });
+ topicConsumerMap.clear();
+ }
+
+ if (client != null) {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.warn("Failed to close pulsar-client ", e);
+ }
+ }
+
+ if (admin != null) {
+ try {
+ admin.close();
+ } catch (Exception e) {
+ log.warn("Failed to close pulsar-admin ", e);
+ }
+ }
+ }
+
+ private Consumer<byte[]> getConsumer(String topic, String groupId) {
+ TopicGroup topicGroup = new TopicGroup(topic, groupId);
+
+ return topicConsumerMap.computeIfAbsent(topicGroup, (topicName) -> {
+ try {
+ return client.newConsumer().topic(topic).subscriptionName(groupId).subscriptionType(subscriptionType)
+ .subscribe();
+ } catch (PulsarClientException e) {
+ log.error("Failed to create consumer for topic {}", topic, e);
+ throw new RuntimeException("Failed to create consumer for topic " + topic, e);
+ }
+ });
+ }
+
+ public static class TopicGroup {
+ protected String topic;
+ protected String grouoId;
+
+ public TopicGroup(String topic, String grouoId) {
+ super();
+ this.topic = topic;
+ this.grouoId = grouoId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, grouoId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TopicGroup) {
+ TopicGroup t = (TopicGroup) obj;
+ return Objects.equals(topic, t.topic) && Objects.equals(grouoId, t.grouoId);
+ }
+ return false;
+ }
+
+ }
+
+ private SubscriptionType getSubscriptionType(Properties properties) {
+ String subType = properties != null && properties.contains(SUBSCRIPTION_TYPE)
+ ? properties.getProperty(SUBSCRIPTION_TYPE)
+ : SubscriptionType.Failover.toString();
+ try {
+ return SubscriptionType.valueOf(subType);
+ } catch (IllegalArgumentException ie) {
+ return SubscriptionType.Failover;
+ }
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMessage.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMessage.java
new file mode 100644
index 0000000..b32e39a
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMessage.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.nio.ByteBuffer;
+
+public class PulsarMessage extends kafka.message.Message {
+
+ private final String key;
+
+ public PulsarMessage(String key, byte[] bytes) {
+ super(bytes);
+ this.key = key;
+ }
+
+ @Override
+ public ByteBuffer key() {
+ return key != null ? ByteBuffer.wrap(key.getBytes()) : null;
+ }
+
+ @Override
+ public boolean hasKey() {
+ return key != null;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMsgAndOffset.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMsgAndOffset.java
new file mode 100644
index 0000000..ebeec88
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarMsgAndOffset.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.util.MessageIdUtils;
+
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
+public class PulsarMsgAndOffset extends MessageAndOffset {
+
+ private static final long serialVersionUID = 1L;
+ private final MessageId messageId;
+
+ public PulsarMsgAndOffset(Message message, MessageId messageId) {
+ super(message, MessageIdUtils.getOffset(messageId));
+ this.messageId = messageId;
+ }
+
+ public MessageId getFullOffset() {
+ return this.messageId;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitRequest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitRequest.java
new file mode 100644
index 0000000..ab13c9c
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitRequest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.util.MessageIdUtils;
+
+import com.google.common.collect.Maps;
+
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetCommitRequest;
+
+public class PulsarOffsetCommitRequest extends OffsetCommitRequest {
+ private final String groupId;
+ private final Map<String, MessageId> topicOffsetMap = Maps.newHashMap();
+
+ public PulsarOffsetCommitRequest(String groupId, Map<TopicAndPartition, PulsarOffsetMetadataAndError> requestInfo,
+ short versionId, int correlationId, String clientId) {
+ super(groupId, Collections.emptyMap(), versionId, correlationId, clientId);
+ this.groupId = groupId;
+ for (Entry<TopicAndPartition, PulsarOffsetMetadataAndError> topicOffset : requestInfo.entrySet()) {
+ String topicName = PulsarKafkaSimpleConsumer.getTopicName(topicOffset.getKey());
+ OffsetMetadataAndError offsetMetadata = topicOffset.getValue();
+ MessageId msgId = null;
+ if (offsetMetadata instanceof PulsarOffsetMetadataAndError) {
+ msgId = ((PulsarOffsetMetadataAndError) offsetMetadata).getMessageId();
+ }
+ msgId = msgId == null ? MessageIdUtils.getMessageId(topicOffset.getValue().offset()) : msgId;
+ topicOffsetMap.put(topicName, msgId);
+ }
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public Map<String, MessageId> getTopicOffsetMap() {
+ return topicOffsetMap;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitResponse.java
new file mode 100644
index 0000000..e32ffc5
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetCommitResponse.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetCommitResponse;
+
+public class PulsarOffsetCommitResponse extends OffsetCommitResponse {
+
+ protected boolean hasError = false;
+ protected Map<TopicAndPartition, Object> errors;
+
+ public PulsarOffsetCommitResponse(kafka.api.OffsetCommitResponse underlying) {
+ super(underlying);
+ }
+
+ public boolean hasError() {
+ return hasError;
+ }
+
+ @Override
+ public Map<TopicAndPartition, Object> errors() {
+ return errors;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchRequest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchRequest.java
new file mode 100644
index 0000000..98ab6ca
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchRequest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.List;
+
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetFetchRequest;
+
+public class PulsarOffsetFetchRequest extends OffsetFetchRequest {
+ protected final String groupId;
+ protected final List<TopicAndPartition> requestInfo;
+
+ public PulsarOffsetFetchRequest(String groupId, List<TopicAndPartition> requestInfo, short versionId,
+ int correlationId, String clientId) {
+ super(groupId, requestInfo, versionId, correlationId, clientId);
+ this.groupId = groupId;
+ this.requestInfo = requestInfo;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchResponse.java
new file mode 100644
index 0000000..cf8d60b
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetFetchResponse.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetFetchResponse;
+
+public class PulsarOffsetFetchResponse extends OffsetFetchResponse {
+
+ private final Map<TopicAndPartition, OffsetMetadataAndError> response;
+
+ public PulsarOffsetFetchResponse(Map<TopicAndPartition, OffsetMetadataAndError> response) {
+ super(null);
+ this.response = response;
+ }
+
+ @Override
+ public Map<TopicAndPartition, OffsetMetadataAndError> offsets() {
+ return response;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetMetadataAndError.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetMetadataAndError.java
new file mode 100644
index 0000000..06e8c3d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetMetadataAndError.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.util.MessageIdUtils;
+
+import kafka.common.OffsetMetadataAndError;
+
+public class PulsarOffsetMetadataAndError extends OffsetMetadataAndError {
+
+ private static final long serialVersionUID = 1L;
+ private final MessageId messageId;
+
+ public PulsarOffsetMetadataAndError(long offset, String metadata, short error) {
+ super(offset, metadata, error);
+ this.messageId = null;
+ }
+
+ public PulsarOffsetMetadataAndError(MessageId messageId, String metadata, short error) {
+ super(MessageIdUtils.getOffset(messageId), metadata, error);
+ this.messageId = messageId;
+ }
+
+ public MessageId getMessageId() {
+ return messageId;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetRequest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetRequest.java
new file mode 100644
index 0000000..bb7666e
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetRequest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetRequest;
+
+public class PulsarOffsetRequest extends OffsetRequest {
+
+ private final Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo;
+
+ public PulsarOffsetRequest(Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo, short versionId,
+ String clientId) {
+ super(requestInfo, versionId, clientId);
+ this.requestInfo = requestInfo;
+ }
+
+ public Map<TopicAndPartition, PartitionOffsetRequestInfo> getRequestInfo() {
+ return requestInfo;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetResponse.java
new file mode 100644
index 0000000..f7ce44d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarOffsetResponse.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Map;
+
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+
+public class PulsarOffsetResponse extends OffsetResponse {
+
+ private final Map<TopicAndPartition, Long> offsetResoponse;
+
+ public PulsarOffsetResponse(Map<TopicAndPartition, Long> offsetResoponse) {
+ super(null);
+ this.offsetResoponse = offsetResoponse;
+ }
+
+ @Override
+ public long[] offsets(String topic, int partition) {
+ Long offset = offsetResoponse.get(new TopicAndPartition(topic, partition));
+ long[] offsets = { offset != null ? offset : 0 };
+ return offsets;
+ }
+
+ @Override
+ public boolean hasError() {
+ return false;
+ }
+
+ @Override
+ public short errorCode(String topic, int partition) {
+ return ErrorMapping.NoError();
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarPartitionMetadata.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarPartitionMetadata.java
new file mode 100644
index 0000000..f61e7d3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarPartitionMetadata.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.List;
+
+import kafka.cluster.Broker;
+import kafka.javaapi.PartitionMetadata;
+
+public class PulsarPartitionMetadata extends PartitionMetadata {
+
+ private final List<Broker> hosts;
+
+ public PulsarPartitionMetadata(String hostUrl, int port) {
+ super(null);
+ this.hosts = Collections.singletonList(new Broker(0, hostUrl, port));
+ }
+
+ @Override
+ public List<Broker> replicas() {
+ return hosts;
+ }
+
+ @Override
+ public Broker leader() {
+ return hosts.get(0);
+ }
+
+ @Override
+ public int partitionId() {
+ // it always returns partition=-1
+ return -1;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadata.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadata.java
new file mode 100644
index 0000000..4dd6660
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadata.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.Collections;
+import java.util.List;
+
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+
+public class PulsarTopicMetadata extends TopicMetadata {
+
+ private final String hostUrl;
+ private final int port;
+
+ public PulsarTopicMetadata(String hostUrl, int port, String topic) {
+ super(null);
+ this.hostUrl = hostUrl;
+ this.port = port;
+ }
+
+ @Override
+ public List<PartitionMetadata> partitionsMetadata() {
+ return Collections.singletonList(new PulsarPartitionMetadata(hostUrl, port));
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadataResponse.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadataResponse.java
new file mode 100644
index 0000000..e1d39c5
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarTopicMetadataResponse.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.simple.consumer;
+
+import java.util.List;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+
+import com.google.common.collect.Lists;
+
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataResponse;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarTopicMetadataResponse extends TopicMetadataResponse {
+
+ private final List<String> topics;
+ private final String hostUrl;
+ private final int port;
+ private final PulsarAdmin admin;
+
+ public PulsarTopicMetadataResponse(PulsarAdmin admin, String hostUrl, int port, List<String> topics) {
+ super(null);
+ this.hostUrl = hostUrl;
+ this.port = port;
+ this.topics = topics;
+ this.admin = admin;
+ }
+
+ @Override
+ public List<TopicMetadata> topicsMetadata() {
+ List<TopicMetadata> metadataList = Lists.newArrayList();
+ topics.forEach(topic -> {
+ try {
+ int partitions;
+ partitions = admin.topics().getPartitionedTopicMetadata(topic).partitions;
+ if (partitions > 0) {
+ for (int partition = 0; partition < partitions; partition++) {
+ String topicName = TopicName.get(topic).getPartition(partition).toString();
+ metadataList.add(new PulsarTopicMetadata(hostUrl, port, topicName));
+ }
+ } else {
+ metadataList.add(new PulsarTopicMetadata(hostUrl, port, topic));
+ }
+ } catch (PulsarAdminException e) {
+ log.error("Failed to get partitioned metadata for {}", topic, e);
+ throw new RuntimeException("Failed to get partitioned-metadata", e);
+ }
+ });
+ return metadataList;
+ }
+
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java
new file mode 100644
index 0000000..77d7e8c
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static org.testng.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.testng.annotations.Test;
+
+import kafka.consumer.ConsumerConfig;
+
+public class PulsarKafkaConsumerTest {
+
+ @Test
+ public void testPulsarKafkaConsumerWithDefaultConfig() throws Exception {
+ // https://kafka.apache.org/08/documentation.html#consumerconfigs
+ Properties properties = new Properties();
+ properties.put("zookeeper.connect", "http://localhost:8080/");
+ properties.put("group.id", "group1");
+
+ ConsumerConfig config = new ConsumerConfig(properties);
+ ConsumerConnector connector = new ConsumerConnector(config);
+ ConsumerBuilderImpl<byte[]> consumerBuilder = (ConsumerBuilderImpl<byte[]>) connector.getConsumerBuilder();
+ Field confField = consumerBuilder.getClass().getDeclaredField("conf");
+ confField.setAccessible(true);
+ ConsumerConfigurationData conf = (ConsumerConfigurationData) confField.get(consumerBuilder);
+ assertEquals(conf.getSubscriptionName(), "group1");
+ assertEquals(conf.getReceiverQueueSize(), 1000);
+ }
+
+ @Test
+ public void testPulsarKafkaConsumerConfig() throws Exception {
+ // https://kafka.apache.org/08/documentation.html#consumerconfigs
+ Properties properties = new Properties();
+ properties.put("zookeeper.connect", "http://localhost:8080/");
+ properties.put("group.id", "group1");
+ properties.put("consumer.id", "cons1");
+ properties.put("auto.commit.enable", "true");
+ properties.put("auto.commit.interval.ms", "100");
+ properties.put("queued.max.message.chunks", "100");
+
+ ConsumerConfig config = new ConsumerConfig(properties);
+ ConsumerConnector connector = new ConsumerConnector(config);
+ ConsumerBuilderImpl<byte[]> consumerBuilder = (ConsumerBuilderImpl<byte[]>) connector.getConsumerBuilder();
+ Field confField = consumerBuilder.getClass().getDeclaredField("conf");
+ confField.setAccessible(true);
+ ConsumerConfigurationData conf = (ConsumerConfigurationData) confField.get(consumerBuilder);
+ assertEquals(conf.getSubscriptionName(), "group1");
+ assertEquals(conf.getReceiverQueueSize(), 100);
+ assertEquals(conf.getAcknowledgementsGroupTimeMicros(), TimeUnit.MILLISECONDS.toMicros(100));
+ System.out.println(conf);
+ }
+
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
new file mode 100644
index 0000000..4cccf83
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static org.testng.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.testng.annotations.Test;
+
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Encoder;
+
+public class PulsarKafkaProducerTest {
+
+ private static final String BROKER_URL = "metadata.broker.list";
+ private static final String PRODUCER_TYPE = "producer.type";
+ private static final String SERIALIZER_CLASS = "serializer.class";
+ private static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+ private static final String PARTITIONER_CLASS = "partitioner.class";
+ private static final String COMPRESSION_CODEC = "compression.codec";
+ private static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+ private static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+ private static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+ private static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+ private static final String CLIENT_ID = "client.id";
+
+ @Test
+ public void testPulsarKafkaProducerWithDefaultConfig() throws Exception {
+ // https://kafka.apache.org/08/documentation.html#producerconfigs
+ Properties properties = new Properties();
+ properties.put(BROKER_URL, "http://localhost:8080/");
+
+ ProducerConfig config = new ProducerConfig(properties);
+ PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+ ProducerBuilderImpl<byte[]> producerBuilder = (ProducerBuilderImpl<byte[]>) producer.getPulsarProducerBuilder();
+ Field field = ProducerBuilderImpl.class.getDeclaredField("conf");
+ field.setAccessible(true);
+ ProducerConfigurationData conf = (ProducerConfigurationData) field.get(producerBuilder);
+ System.out.println("getMaxPendingMessages= " + conf.getMaxPendingMessages());
+ assertEquals(conf.getCompressionType(), CompressionType.NONE);
+ assertEquals(conf.isBlockIfQueueFull(), true);
+ assertEquals(conf.getMaxPendingMessages(), 1000);
+ assertEquals(conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MILLISECONDS.toMicros(1));
+ assertEquals(conf.getBatchingMaxMessages(), 1000);
+ }
+
+ @Test
+ public void testPulsarKafkaProducer() throws Exception {
+ // https://kafka.apache.org/08/documentation.html#producerconfigs
+ Properties properties = new Properties();
+ properties.put(BROKER_URL, "http://localhost:8080/");
+ properties.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+ properties.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+ properties.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+ properties.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+ properties.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+ properties.put(CLIENT_ID, "test");
+ ProducerConfig config = new ProducerConfig(properties);
+ PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+ ProducerBuilderImpl<byte[]> producerBuilder = (ProducerBuilderImpl<byte[]>) producer.getPulsarProducerBuilder();
+ Field field = ProducerBuilderImpl.class.getDeclaredField("conf");
+ field.setAccessible(true);
+ ProducerConfigurationData conf = (ProducerConfigurationData) field.get(producerBuilder);
+ assertEquals(conf.getCompressionType(), CompressionType.ZLIB);
+ assertEquals(conf.isBlockIfQueueFull(), true);
+ assertEquals(conf.getMaxPendingMessages(), 6000);
+ assertEquals(conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MILLISECONDS.toMicros(100));
+ assertEquals(conf.getBatchingMaxMessages(), 500);
+ }
+
+ @Test
+ public void testPulsarKafkaProducerWithSerializer() throws Exception {
+ Properties properties = new Properties();
+ properties.put(BROKER_URL, "http://localhost:8080/");
+ properties.put(PRODUCER_TYPE, "sync");
+ properties.put(SERIALIZER_CLASS, TestEncoder.class.getName());
+ properties.put(KEY_SERIALIZER_CLASS, TestEncoder.class.getName());
+ properties.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+ ProducerConfig config = new ProducerConfig(properties);
+ PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+ assertEquals(producer.getKeySerializer().getClass(), TestEncoder.class);
+ assertEquals(producer.getValueSerializer().getClass(), TestEncoder.class);
+ assertEquals(producer.getPartitioner().getClass(), TestPartitioner.class);
+
+ }
+
+ public static class TestEncoder implements Encoder<String> {
+ @Override
+ public byte[] toBytes(String value) {
+ return value.getBytes();
+ }
+ }
+
+ public static class TestPartitioner implements Partitioner {
+ @Override
+ public int partition(Object obj, int totalPartition) {
+ return obj.hashCode() % totalPartition;
+ }
+ }
+
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerConsumerTest.java
new file mode 100644
index 0000000..bbf61be
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerConsumerTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.ConsumerConnector;
+import org.apache.kafka.clients.consumer.PulsarKafkaStream;
+import org.apache.kafka.clients.consumer.PulsarMessageAndMetadata;
+import org.apache.kafka.clients.producer.PulsarKafkaProducer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.serializer.StringDecoder;
+import kafka.serializer.StringEncoder;
+
+public class KafkaProducerConsumerTest extends ProducerConsumerBase {
+
+ private static final String BROKER_URL = "metadata.broker.list";
+ private static final String PRODUCER_TYPE = "producer.type";
+ private static final String SERIALIZER_CLASS = "serializer.class";
+ private static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+ private static final String PARTITIONER_CLASS = "partitioner.class";
+ private static final String COMPRESSION_CODEC = "compression.codec";
+ private static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+ private static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+ private static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+ private static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+ private static final String CLIENT_ID = "client.id";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testPulsarKafkaProducerWithSerializer() throws Exception {
+ final String serviceUrl = lookupUrl.toString();
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ // (1) Create consumer
+ Properties properties = new Properties();
+ properties.put("zookeeper.connect", serviceUrl);
+ properties.put("group.id", "group1");
+ properties.put("consumer.id", "cons1");
+ properties.put("auto.commit.enable", "true");
+ properties.put("auto.commit.interval.ms", "100");
+ properties.put("queued.max.message.chunks", "100");
+
+ ConsumerConfig conSConfig = new ConsumerConfig(properties);
+ ConsumerConnector connector = new ConsumerConnector(conSConfig);
+ Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 2);
+ Map<String, List<PulsarKafkaStream<String, Tweet>>> streams = connector.createMessageStreams(topicCountMap,
+ new StringDecoder(null), new TestDecoder());
+
+ // (2) Create producer
+ Properties properties2 = new Properties();
+ properties2.put(BROKER_URL, serviceUrl);
+ properties2.put(PRODUCER_TYPE, "sync");
+ properties2.put(SERIALIZER_CLASS, TestEncoder.class.getName());
+ properties2.put(KEY_SERIALIZER_CLASS, StringEncoder.class.getName());
+ properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+ properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+ properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+ properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+ properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+ properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+ properties2.put(CLIENT_ID, "test");
+ ProducerConfig config = new ProducerConfig(properties2);
+ PulsarKafkaProducer<String, Tweet> producer = new PulsarKafkaProducer<>(config);
+
+ String name = "user";
+ String msg = "Hello World!";
+ Set<Tweet> published = Sets.newHashSet();
+ Set<Tweet> received = Sets.newHashSet();
+ int total = 10;
+ for (int i = 0; i < total; i++) {
+ String sendMessage = msg + i;
+ Tweet tweet = new Tweet(name, sendMessage);
+ KeyedMessage<String, Tweet> message = new KeyedMessage<>(topicName, name, tweet);
+ published.add(tweet);
+ producer.send(message);
+ }
+ while (received.size() < total) {
+ for (int i = 0; i < streams.size(); i++) {
+ List<PulsarKafkaStream<String, Tweet>> kafkaStreams = streams.get(topicName);
+ assertEquals(kafkaStreams.size(), 2);
+ for (PulsarKafkaStream<String, Tweet> kafkaStream : kafkaStreams) {
+ for (PulsarMessageAndMetadata<String, KafkaProducerConsumerTest.Tweet> record : kafkaStream) {
+ received.add(record.message());
+ assertEquals(record.key(), name);
+ }
+ }
+ }
+ }
+ assertEquals(published.size(), received.size());
+ published.removeAll(received);
+ assertTrue(published.isEmpty());
+ }
+
+ public static class Tweet implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public String userName;
+ public String message;
+
+ public Tweet(String userName, String message) {
+ super();
+ this.userName = userName;
+ this.message = message;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(userName, message);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tweet) {
+ Tweet tweet = (Tweet) obj;
+ return Objects.equal(this.userName, tweet.userName) && Objects.equal(this.message, tweet.message);
+ }
+ return false;
+ }
+ }
+
+ public static class TestEncoder implements Encoder<Tweet> {
+ @Override
+ public byte[] toBytes(Tweet tweet) {
+ return (tweet.userName + "," + tweet.message).getBytes();
+ }
+ }
+
+ public static class TestDecoder implements Decoder<Tweet> {
+ @Override
+ public Tweet fromBytes(byte[] input) {
+ String[] tokens = (new String(input)).split(",");
+ return new Tweet(tokens[0], tokens[1]);
+ }
+ }
+
+ public static class TestPartitioner implements Partitioner {
+ @Override
+ public int partition(Object obj, int totalPartition) {
+ return obj.hashCode() % totalPartition;
+ }
+ }
+
+
+ @Test
+ public void testProducerConsumerWithoutSerializer() throws Exception {
+ final String serviceUrl = lookupUrl.toString();
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+ // (1) Create consumer
+ Properties properties = new Properties();
+ properties.put("zookeeper.connect", serviceUrl);
+ properties.put("group.id", "group1");
+ properties.put("consumer.id", "cons1");
+ properties.put("auto.commit.enable", "true");
+ properties.put("auto.commit.interval.ms", "100");
+ properties.put("queued.max.message.chunks", "100");
+
+ ConsumerConfig conSConfig = new ConsumerConfig(properties);
+ ConsumerConnector connector = new ConsumerConnector(conSConfig);
+ Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 2);
+ Map<String, List<PulsarKafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);
+
+ // (2) Create producer
+ Properties properties2 = new Properties();
+ properties2.put(BROKER_URL, serviceUrl);
+ properties2.put(PRODUCER_TYPE, "sync");
+ properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+ properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+ properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+ properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+ properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+ properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+ properties2.put(CLIENT_ID, "test");
+ ProducerConfig config = new ProducerConfig(properties2);
+ PulsarKafkaProducer<byte[], byte[]> producer = new PulsarKafkaProducer<>(config);
+
+ String name = "user";
+ String msg = "Hello World!";
+ int total = 10;
+ for (int i = 0; i < total; i++) {
+ String sendMessage = msg + i;
+ KeyedMessage<byte[], byte[]> message = new KeyedMessage<>(topicName, name.getBytes(), sendMessage.getBytes());
+ producer.send(message);
+ }
+ int count = 0;
+ while (count < total) {
+ for (int i = 0; i < streams.size(); i++) {
+ List<PulsarKafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+ assertEquals(kafkaStreams.size(), 2);
+ for (PulsarKafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
+ for (PulsarMessageAndMetadata<byte[], byte[]> record : kafkaStream) {
+ count++;
+ System.out.println(new String(record.message()));
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
new file mode 100644
index 0000000..3010c54
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.PulsarKafkaProducer;
+import org.apache.kafka.clients.simple.consumer.PulsarKafkaSimpleConsumer;
+import org.apache.kafka.clients.simple.consumer.PulsarMsgAndOffset;
+import org.apache.kafka.clients.simple.consumer.PulsarOffsetCommitRequest;
+import org.apache.kafka.clients.simple.consumer.PulsarOffsetFetchRequest;
+import org.apache.kafka.clients.simple.consumer.PulsarOffsetMetadataAndError;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.message.MessageAndOffset;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.serializer.StringEncoder;
+
+public class KafkaProducerSimpleConsumerTest extends ProducerConsumerBase {
+
+ private static final String BROKER_URL = "metadata.broker.list";
+ private static final String PRODUCER_TYPE = "producer.type";
+ private static final String KEY_SERIALIZER_CLASS = "key.serializer.class";
+ private static final String PARTITIONER_CLASS = "partitioner.class";
+ private static final String COMPRESSION_CODEC = "compression.codec";
+ private static final String QUEUE_BUFFERING_MAX_MS = "queue.buffering.max.ms";
+ private static final String QUEUE_BUFFERING_MAX_MESSAGES = "queue.buffering.max.messages";
+ private static final String QUEUE_ENQUEUE_TIMEOUT_MS = "queue.enqueue.timeout.ms";
+ private static final String BATCH_NUM_MESSAGES = "batch.num.messages";
+ private static final String CLIENT_ID = "client.id";
+
+ private final static int publishPartition = 1;
+
+ @DataProvider(name = "partitions")
+ public Object[][] totalPartitions() {
+ return new Object[][] { { 0 }, { 10 } };
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(dataProvider="partitions")
+ public void testPulsarKafkaProducerWithSerializer(int partitions) throws Exception {
+ final String serviceUrl = lookupUrl.toString();
+ final String topicName = "persistent://my-property/my-ns/my-topic";
+ final String groupId = "group1";
+
+ int partition = -1;
+ if (partitions > 0) {
+ admin.topics().createPartitionedTopic(topicName, 10);
+ partition = publishPartition;
+ }
+
+ // create subscription
+ Consumer<byte[]> cons = pulsarClient.newConsumer().topic(topicName).subscriptionName(groupId).subscribe();
+ cons.close();
+
+ // (2) Create producer
+ Properties properties2 = new Properties();
+ properties2.put(BROKER_URL, serviceUrl);
+ properties2.put(PRODUCER_TYPE, "sync");
+ properties2.put(KEY_SERIALIZER_CLASS, StringEncoder.class.getName());
+ properties2.put(PARTITIONER_CLASS, TestPartitioner.class.getName());
+ properties2.put(COMPRESSION_CODEC, "gzip"); // compression: ZLIB
+ properties2.put(QUEUE_ENQUEUE_TIMEOUT_MS, "-1"); // block queue if full => -1 = true
+ properties2.put(QUEUE_BUFFERING_MAX_MESSAGES, "6000"); // queue max message
+ properties2.put(QUEUE_BUFFERING_MAX_MS, "100"); // batch delay
+ properties2.put(BATCH_NUM_MESSAGES, "500"); // batch msg
+ properties2.put(CLIENT_ID, "test");
+ ProducerConfig config = new ProducerConfig(properties2);
+ PulsarKafkaProducer<String, byte[]> producer = new PulsarKafkaProducer<>(config);
+
+ String name = "user";
+ String msg = "Hello World!";
+ Set<String> published = Sets.newHashSet();
+ Set<String> received = Sets.newHashSet();
+ int total = 10;
+ for (int i = 0; i < total; i++) {
+ String sendMessage = msg + i;
+ KeyedMessage<String, byte[]> message = new KeyedMessage<>(topicName, name, sendMessage.getBytes());
+ published.add(sendMessage);
+ producer.send(message);
+ }
+
+
+ // (2) Consume using simple consumer
+ PulsarKafkaSimpleConsumer consumer = new PulsarKafkaSimpleConsumer(serviceUrl, 0, 0, 0, "clientId");
+ List<String> topics = Collections.singletonList(topicName);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+ List<TopicMetadata> metaData = resp.topicsMetadata();
+ PartitionMetadata part = metaData.get(0).partitionsMetadata().get(0);
+
+ long readOffset = kafka.api.OffsetRequest.EarliestTime();
+ FetchRequest fReq = new FetchRequestBuilder()
+ .clientId("c1")
+ .addFetch(topicName, partition, readOffset, 100000)
+ .build();
+ FetchResponse fetchResponse = consumer.fetch(fReq);
+
+ long lastOffset = 0;
+ MessageId offset = null;
+ for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topicName, partition)) {
+ long currentOffset = messageAndOffset.offset();
+ if (currentOffset < readOffset) {
+ continue;
+ }
+ offset = ((PulsarMsgAndOffset)messageAndOffset).getFullOffset();
+ lastOffset = messageAndOffset.offset();
+ ByteBuffer payload = messageAndOffset.message().payload();
+
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ received.add(new String(bytes, "UTF-8"));
+ }
+ lastOffset -= 1;
+
+ assertEquals(published.size(), received.size());
+ published.removeAll(received);
+ assertTrue(published.isEmpty());
+
+ TopicAndPartition topicPartition = new TopicAndPartition(topicName, partition);
+ PulsarOffsetMetadataAndError offsetError = new PulsarOffsetMetadataAndError(offset, null, (short) 0);
+ Map<TopicAndPartition, PulsarOffsetMetadataAndError> requestInfo = Collections.singletonMap(topicPartition,
+ offsetError);
+ PulsarOffsetCommitRequest offsetReq = new PulsarOffsetCommitRequest(groupId, requestInfo, (short) -1, 0, "c1");
+ consumer.commitOffsets(offsetReq);
+
+ final long expectedReadOffsetPosition = lastOffset;
+
+ retryStrategically((test) -> fetchOffset(consumer, topicPartition, groupId) == expectedReadOffsetPosition, 10, 150);
+
+ long offset1 = fetchOffset(consumer, topicPartition, groupId);
+ MessageIdImpl actualMsgId = ((MessageIdImpl)MessageIdUtils.getMessageId(offset1));
+
+ MessageIdImpl expectedMsgId = (MessageIdImpl) offset;
+ assertEquals(actualMsgId.getLedgerId(), expectedMsgId.getLedgerId());
+ assertEquals(actualMsgId.getEntryId(), expectedMsgId.getEntryId() + 1);
+ }
+
+ private long fetchOffset(PulsarKafkaSimpleConsumer consumer, TopicAndPartition topicPartition, String groupId) {
+ List<TopicAndPartition> fetchReqInfo = Collections.singletonList(topicPartition);
+ PulsarOffsetFetchRequest fetchOffsetRequest = new PulsarOffsetFetchRequest(groupId, fetchReqInfo, (short)-1, 0, "test");
+ OffsetMetadataAndError offsetResponse = consumer.fetchOffsets(fetchOffsetRequest).offsets().get(topicPartition);
+ return offsetResponse.offset();
+ }
+
+ public static class Tweet implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public String userName;
+ public String message;
+
+ public Tweet(String userName, String message) {
+ super();
+ this.userName = userName;
+ this.message = message;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(userName, message);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tweet) {
+ Tweet tweet = (Tweet) obj;
+ return Objects.equal(this.userName, tweet.userName) && Objects.equal(this.message, tweet.message);
+ }
+ return false;
+ }
+ }
+
+ public static class TestEncoder implements Encoder<Tweet> {
+ @Override
+ public byte[] toBytes(Tweet tweet) {
+ return (tweet.userName + "," + tweet.message).getBytes();
+ }
+ }
+
+ public static class TestDecoder implements Decoder<Tweet> {
+ @Override
+ public Tweet fromBytes(byte[] input) {
+ String[] tokens = (new String(input)).split(",");
+ return new Tweet(tokens[0], tokens[1]);
+ }
+ }
+
+ public static class TestPartitioner implements Partitioner {
+ @Override
+ public int partition(Object obj, int totalPartition) {
+ //return obj.hashCode() % totalPartition;
+ return publishPartition;
+ }
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 9eed128..6965378 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -317,4 +317,9 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
public ConsumerConfigurationData<T> getConf() {
return conf;
}
+
+ @Override
+ public String toString() {
+ return conf != null ? conf.toString() : null;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index d8ad226..7976a04 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -270,4 +270,9 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
"should be set as " + MessageRoutingMode.CustomPartition);
}
}
+
+ @Override
+ public String toString() {
+ return conf != null ? conf.toString() : null;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index ed3fa09..626d662 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -28,7 +28,7 @@ public class TopicMessageIdImpl implements MessageId {
private final String topicName;
private final MessageId messageId;
- TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
+ public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
this.messageId = messageId;
this.topicPartitionName = topicPartitionName;
this.topicName = topicName;