You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/06 03:10:49 UTC
[pulsar] branch master updated: Add support of pulsar-kafka-adapter
for kafka-0.9 api (#4886)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f6ed037 Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)
f6ed037 is described below
commit f6ed0377fb47705a2a339bfcc72e8a519b266fb0
Author: Xiaobing Fang <bi...@qq.com>
AuthorDate: Tue Aug 6 11:10:43 2019 +0800
Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)
Fixes #4791
**Motivation**
Currently the Pulsar Kafka wrapper is using Kafka 0.10.x version. However, there are users who use legacy-kafka version in their system and willing to move to pulsar. This PR provides pulsar-kafka adapter for kafka-api-version 0.9.X. So, this adapter can help users in their migration process from kafka-0.9 to pulsar.
---
pulsar-client-kafka-compat/pom.xml | 3 +
.../pulsar-client-kafka-shaded_0_9/pom.xml | 271 +++++++++++++++++
.../kafka/compat/examples/ProducerAvroExample.java | 1 +
.../kafka/compat/examples/ProducerExample.java | 1 +
.../{ => pulsar-client-kafka-tests_0_9}/pom.xml | 44 ++-
.../compat/examples/ConsumerAvroExample.java} | 39 ++-
.../kafka/compat/examples/ConsumerExample.java} | 44 +--
.../kafka/compat/examples/ProducerAvroExample.java | 1 +
.../kafka/compat/examples/ProducerExample.java | 2 +-
.../client/kafka/compat/examples/utils/Bar.java | 30 ++
.../client/kafka/compat/examples/utils/Foo.java | 35 +++
.../clients/consumer/PulsarKafkaConsumer.java | 2 +-
.../clients/producer/PulsarKafkaProducer.java | 2 +-
.../{ => pulsar-client-kafka_0_9}/pom.xml | 45 ++-
.../clients/consumer/PulsarKafkaConsumer.java | 328 ++++++---------------
.../clients/producer/PulsarKafkaProducer.java | 131 +++-----
.../client/kafka/compat/KafkaMessageRouter.java} | 36 +--
.../kafka/compat/PulsarClientKafkaConfig.java | 117 ++++++++
.../kafka/compat/PulsarConsumerKafkaConfig.java | 72 +++++
.../client/kafka/compat/PulsarKafkaSchema.java | 77 +++++
.../kafka/compat/PulsarProducerKafkaConfig.java | 65 ++++
.../clients/producer/PulsarKafkaProducerTest.java | 192 ++++++++++++
.../apache/pulsar/client/util}/MessageIdUtils.java | 2 +-
23 files changed, 1143 insertions(+), 397 deletions(-)
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml
index f25274e..569647f 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -38,7 +38,10 @@
<modules>
<module>pulsar-client-kafka</module>
+ <module>pulsar-client-kafka_0_9</module>
<module>pulsar-client-kafka-shaded</module>
+ <module>pulsar-client-kafka-shaded_0_9</module>
<module>pulsar-client-kafka-tests</module>
+ <module>pulsar-client-kafka-tests_0_9</module>
</modules>
</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
new file mode 100644
index 0000000..a6a7ba3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
@@ -0,0 +1,271 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-kafka-compat</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-client-kafka_0_9</artifactId>
+ <name>Pulsar Kafka compatibility 0.9 :: API</name>
+
+ <description>Drop-in replacement for Kafka client library that publishes and consumes
+ messages on Pulsar topics</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <artifactSet>
+ <includes>
+ <include>org.apache.kafka:kafka-clients</include>
+ <include>org.apache.pulsar:pulsar-client-kafka_0_9-original</include>
+ <include>org.apache.pulsar:pulsar-client-original</include>
+ <include>org.apache.commons:commons-lang3</include>
+ <include>commons-codec:commons-codec</include>
+ <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
+ <include>commons-collections:commons-collections</include>
+ <include>org.asynchttpclient:*</include>
+ <include>io.netty:netty-codec-http</include>
+ <include>io.netty:netty-transport-native-epoll</include>
+ <include>org.reactivestreams:reactive-streams</include>
+ <include>com.typesafe.netty:netty-reactive-streams</include>
+ <include>org.javassist:javassist</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>com.google.code.gson:gson</include>
+ <include>com.fasterxml.jackson.core</include>
+ <include>com.fasterxml.jackson.module</include>
+ <include>com.fasterxml.jackson.dataformat</include>
+ <include>io.netty:netty</include>
+ <include>io.netty:netty-*</include>
+ <include>org.apache.pulsar:pulsar-common</include>
+ <include>org.apache.bookkeeper:circe-checksum</include>
+ <include>com.yahoo.datasketches:sketches-core</include>
+ <include>org.apache.httpcomponents:httpclient</include>
+ <include>commons-logging:commons-logging</include>
+ <include>org.apache.httpcomponents:httpcore</include>
+ <include>org.eclipse.jetty:*</include>
+ <include>com.yahoo.datasketches:*</include>
+ <include>commons-*:*</include>
+ <include>org.yaml:snakeyaml</include>
+ <include>org.objenesis:*</include>
+
+ <include>org.apache.avro:*</include>
+ <!-- Avro transitive dependencies-->
+ <include>org.codehaus.jackson:jackson-core-asl</include>
+ <include>org.codehaus.jackson:jackson-mapper-asl</include>
+ <include>com.thoughtworks.paranamer:paranamer</include>
+ <include>org.xerial.snappy:snappy-java</include>
+ <include>org.apache.commons:commons-compress</include>
+ <include>org.tukaani:xz</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>commons-logging:commons-logging</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka.clients.producer.KafkaProducer</pattern>
+ <shadedPattern>org.apache.kafka.clients.producer.OriginalKafkaProducer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.producer.PulsarKafkaProducer</pattern>
+ <shadedPattern>org.apache.kafka.clients.producer.KafkaProducer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.KafkaConsumer</pattern>
+ <shadedPattern>org.apache.kafka.clients.consumer.OriginalKafkaConsumer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
+ <shadedPattern>org.apache.kafka.clients.consumer.KafkaConsumer</shadedPattern>
+ </relocation>
+
+ <!-- General relocation rules for Pulsar client dependencies -->
+
+ <relocation>
+ <pattern>org.asynchttpclient</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.datasketches</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.sketches</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.eclipse.jetty</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.eclipse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.reactivestreams</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.typesafe</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.typesafe</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.memory</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.memory</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.objenesis</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.objenesis</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.yaml</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.avro</shadedPattern>
+ <excludes>
+ <exclude>org.apache.avro.reflect.AvroAlias</exclude>
+ <exclude>org.apache.avro.reflect.AvroDefault</exclude>
+ <exclude>org.apache.avro.reflect.AvroEncode</exclude>
+ <exclude>org.apache.avro.reflect.AvroIgnore</exclude>
+ <exclude>org.apache.avro.reflect.AvroMeta</exclude>
+ <exclude>org.apache.avro.reflect.AvroName</exclude>
+ <exclude>org.apache.avro.reflect.AvroSchema</exclude>
+ <exclude>org.apache.avro.reflect.Nullable</exclude>
+ <exclude>org.apache.avro.reflect.Stringable</exclude>
+ <exclude>org.apache.avro.reflect.Union</exclude>
+ </excludes>
+ </relocation>
+ <!--Avro transitive dependencies-->
+ <relocation>
+ <pattern>org.codehaus.jackson</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.codehaus.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.thoughtworks.paranamer</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.thoughtworks.paranamer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.xerial.snappy</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.xerial.snappy</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.tukaani</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.bookkeeper</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- This plugin is used to run a script after the package phase in order to rename
+ libnetty_transport_native_epoll_x86_64.so from Netty into
+ liborg_apache_pulsar_shade_netty_transport_native_epoll_x86_64.so
+ to reflect the shade that is being applied.
+ -->
+ <artifactId>exec-maven-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <executions>
+ <execution>
+ <id>rename-epoll-library</id>
+ <phase>package</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${project.parent.basedir}/../src/${rename.netty.native.libs}</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
index aa5e29a..5d9fcfc 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -61,6 +61,7 @@ public class ProducerAvroExample {
log.info("Message {} sent successfully", i);
}
+ producer.flush();
producer.close();
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
index a95413c..34e008e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -45,6 +45,7 @@ public class ProducerExample {
log.info("Message {} sent successfully", i);
}
+ producer.flush();
producer.close();
}
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
similarity index 52%
copy from pulsar-client-kafka-compat/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
index f25274e..c63c8dc 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
@@ -26,19 +26,45 @@
<parent>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
+ <artifactId>pulsar-client-kafka-compat</artifactId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
- <artifactId>pulsar-client-kafka-compat</artifactId>
- <name>Pulsar Kafka compatibility</name>
+ <artifactId>pulsar-client-kafka_0_9-tests</artifactId>
+ <name>Pulsar Kafka compatibility 0.9 :: Tests</name>
- <packaging>pom</packaging>
+ <description>Tests to verify the correct shading configuration for the pulsar-client-kafka wrapper</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-kafka_0_9</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
- <modules>
- <module>pulsar-client-kafka</module>
- <module>pulsar-client-kafka-shaded</module>
- <module>pulsar-client-kafka-tests</module>
- </modules>
</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
similarity index 62%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
index aa5e29a..3e39b8d 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
@@ -18,11 +18,11 @@
*/
package org.apache.pulsar.client.kafka.compat.examples;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
@@ -30,17 +30,20 @@ import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Properties;
-public class ProducerAvroExample {
+public class ConsumerAvroExample {
+
public static void main(String[] args) {
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
-
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
@@ -53,16 +56,20 @@ public class ProducerAvroExample {
foo.setField2("field2");
foo.setField3(3);
+ @SuppressWarnings("resource")
+ Consumer<Foo, Bar> consumer = new KafkaConsumer<>(props, fooSchema, barSchema);
+ consumer.subscribe(Arrays.asList(topic));
- Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
+ while (true) {
+ ConsumerRecords<Foo, Bar> records = consumer.poll(100);
+ records.forEach(record -> {
+ log.info("Received record: {}", record);
+ });
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
- log.info("Message {} sent successfully", i);
+ // Commit last offset
+ consumer.commitSync();
}
-
- producer.close();
}
- private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+ private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
similarity index 51%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
index a95413c..983d7b7 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
@@ -18,35 +18,43 @@
*/
package org.apache.pulsar.client.kafka.compat.examples;
+import java.util.Arrays;
import java.util.Properties;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ProducerExample {
+public class ConsumerExample {
+
public static void main(String[] args) {
String topic = "persistent://public/default/test";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
-
- props.put("key.serializer", IntegerSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- Producer<Integer, String> producer = new KafkaProducer<>(props);
-
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
- log.info("Message {} sent successfully", i);
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ @SuppressWarnings("resource")
+ Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ while (true) {
+ ConsumerRecords<Integer, String> records = consumer.poll(100);
+ records.forEach(record -> {
+ log.info("Received record: {}", record);
+ });
+
+ // Commit last offset
+ consumer.commitSync();
}
-
- producer.close();
}
- private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+ private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
similarity index 99%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
index aa5e29a..5d9fcfc 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -61,6 +61,7 @@ public class ProducerAvroExample {
log.info("Message {} sent successfully", i);
}
+ producer.flush();
producer.close();
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
similarity index 98%
copy from pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
index a95413c..f089b26 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -34,7 +34,6 @@ public class ProducerExample {
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
-
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
@@ -45,6 +44,7 @@ public class ProducerExample {
log.info("Message {} sent successfully", i);
}
+ producer.flush();
producer.close();
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
new file mode 100644
index 0000000..8120900
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Bar {
+ private boolean field1;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
new file mode 100644
index 0000000..d584f51
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ private int field3;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 15e23a2..06b1c6e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -60,7 +60,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 14dd78b..cacca60 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -57,7 +57,7 @@ import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
similarity index 52%
copy from pulsar-client-kafka-compat/pom.xml
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
index f25274e..0740e78 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
@@ -26,19 +26,46 @@
<parent>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
+ <artifactId>pulsar-client-kafka-compat</artifactId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
- <artifactId>pulsar-client-kafka-compat</artifactId>
- <name>Pulsar Kafka compatibility</name>
+ <properties>
+ <kafka_0_9.version>0.9.0.1</kafka_0_9.version>
+ </properties>
- <packaging>pom</packaging>
+ <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+ <name>Pulsar Kafka compatibility 0.9 :: API (original)</name>
+
+ <description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka_0_9.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
- <modules>
- <module>pulsar-client-kafka</module>
- <module>pulsar-client-kafka-shaded</module>
- <module>pulsar-client-kafka-tests</module>
- </modules>
</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
similarity index 72%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 15e23a2..6d3c383 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -18,11 +18,8 @@
*/
package org.apache.kafka.clients.consumer;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -45,7 +42,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.MessageListener;
@@ -60,11 +57,11 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -88,14 +85,13 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
private final SubscriptionInitialPosition strategy;
- private List<ConsumerInterceptor<K, V>> interceptors;
-
private volatile boolean closed = false;
private final int maxRecordsInSinglePoll;
private final Properties properties;
+
private static class QueueItem {
final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
final Message<byte[]> message;
@@ -117,7 +113,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(configs),
- new PulsarKafkaSchema<K>(keyDeserializer), new PulsarKafkaSchema<V>(valueDeserializer));
+ new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
}
public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
@@ -168,18 +164,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
- // If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
- if(consumerConfig.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
- maxRecordsInSinglePoll = consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
- } else {
- maxRecordsInSinglePoll = 1000;
- }
-
- interceptors = (List) consumerConfig.getConfiguredInstances(
- ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
+ // there is not this config in kafka 0.9, so use default value.
+ maxRecordsInSinglePoll = 1000;
this.properties = new Properties();
- consumerConfig.originals().forEach((k, v) -> properties.put(k, v));
+ consumerConfig.originals().forEach(properties::put);
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
@@ -192,26 +181,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
private SubscriptionInitialPosition getStrategy(final String strategy) {
- switch(strategy) {
- case "earliest":
- return SubscriptionInitialPosition.Earliest;
- default:
- return SubscriptionInitialPosition.Latest;
- }
- }
-
- @Override
- public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
- // Block listener thread if the application is slowing down
- try {
- receivedMessages.put(new QueueItem(consumer, msg));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- if (closed) {
- // Consumer was closed and the thread was interrupted. Nothing to worry about here
- } else {
- throw new RuntimeException(e);
- }
+ if ("earliest".equals(strategy)) {
+ return SubscriptionInitialPosition.Earliest;
+ } else {
+ return SubscriptionInitialPosition.Latest;
}
}
@@ -220,24 +193,18 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
throw new UnsupportedOperationException("Cannot access the partitions assignements");
}
- /**
- * Get the current subscription. Will return the same topics used in the most recent call to
- * {@link #subscribe(Collection, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
- *
- * @return The set of topics currently subscribed to
- */
@Override
public Set<String> subscription() {
return consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
}
@Override
- public void subscribe(Collection<String> topics) {
+ public void subscribe(List<String> topics) {
subscribe(topics, null);
}
@Override
- public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+ public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
List<TopicPartition> topicPartitions = new ArrayList<>();
@@ -251,6 +218,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
+ consumerBuilder.topics(topics);
if (numberOfPartitions > 1) {
// Subscribe to each partition
consumerBuilder.consumerName(ConsumerName.generateRandomName());
@@ -260,8 +228,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
- TopicName.get(topic).getPartitionedTopicName(),
- partitionIndex);
+ TopicName.get(topic).getPartitionedTopicName(),
+ partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
@@ -274,8 +242,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = new TopicPartition(
- TopicName.get(topic).getPartitionedTopicName(),
- 0);
+ TopicName.get(topic).getPartitionedTopicName(),
+ 0);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
@@ -285,7 +253,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
}
unpolledPartitions.addAll(topicPartitions);
-
+
// Wait for all consumers to be ready
futures.forEach(CompletableFuture::join);
@@ -309,17 +277,12 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
- public void assign(Collection<TopicPartition> partitions) {
+ public void assign(List<TopicPartition> list) {
throw new UnsupportedOperationException("Cannot manually assign partitions");
}
@Override
- public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
- throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
- }
-
- @Override
- public void subscribe(Pattern pattern) {
+ public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
}
@@ -334,7 +297,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
});
}
- @SuppressWarnings("unchecked")
@Override
public ConsumerRecords<K, V> poll(long timeoutMillis) {
try {
@@ -357,8 +319,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
TopicPartition tp = new TopicPartition(topic, partition);
if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
- log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
- resetOffsets(tp);
+ log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
+ resetOffsets(tp);
}
K key = getKey(topic, msg);
@@ -367,17 +329,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
V value = valueSchema.decode(msg.getData());
- TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
- long timestamp = msg.getPublishTime();
-
- if (msg.getEventTime() > 0) {
- // If we have Event time, use that in preference
- timestamp = msg.getEventTime();
- timestampType = TimestampType.CREATE_TIME;
- }
-
- ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
- timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
+ ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, key, value);
records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);
@@ -398,39 +350,13 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
commitAsync();
}
- // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
- return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
+ return new ConsumerRecords<>(records);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
- public ConsumerRecords<K, V> poll(Duration duration) {
- return poll(duration.toMillis());
- }
-
- @SuppressWarnings("unchecked")
- private K getKey(String topic, Message<byte[]> msg) {
- if (!msg.hasKey()) {
- return null;
- }
-
- if (keySchema instanceof PulsarKafkaSchema) {
- PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
- Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
- if (kafkaDeserializer instanceof StringDeserializer) {
- return (K) msg.getKey();
- }
- pulsarKafkaSchema.setTopic(topic);
- }
- // Assume base64 encoding
- byte[] data = Base64.getDecoder().decode(msg.getKey());
- return keySchema.decode(data);
-
- }
-
- @Override
public void commitSync() {
try {
doCommitOffsets(getCurrentOffsetsMap()).get();
@@ -440,11 +366,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
- public void commitSync(Duration duration) {
- commitSync();
- }
-
- @Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
doCommitOffsets(offsets).get();
@@ -454,11 +375,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
- public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
- commitSync(map);
- }
-
- @Override
public void commitAsync() {
doCommitOffsets(getCurrentOffsetsMap());
}
@@ -483,7 +399,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
- applyConsumerInterceptorsOnCommit(interceptors, offsets);
offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);
@@ -503,43 +418,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
return offsets;
}
- /**
- * Apply all onConsume methods in a list of ConsumerInterceptors.
- * Catch any exception during the process.
- *
- * @param interceptors Interceptors provided.
- * @param consumerRecords ConsumerRecords returned by calling {@link this#poll(long)}.
- * @return ConsumerRecords after applying all ConsumerInterceptor in interceptors list.
- */
- private ConsumerRecords applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>> interceptors, ConsumerRecords consumerRecords) {
- ConsumerRecords processedConsumerRecords = consumerRecords;
- for (ConsumerInterceptor interceptor : interceptors) {
- try {
- processedConsumerRecords = interceptor.onConsume(processedConsumerRecords);
- } catch (Exception e) {
- log.warn("Error executing onConsume for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
- }
- }
- return processedConsumerRecords;
- }
-
- /**
- * Apply all onCommit methods in a list of ConsumerInterceptors.
- * Catch any exception during the process.
- *
- * @param interceptors Interceptors provided.
- * @param offsets Offsets need to be commit.
- */
- private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
- for (ConsumerInterceptor interceptor : interceptors) {
- try {
- interceptor.onCommit(offsets);
- } catch (Exception e) {
- log.warn("Error executing onCommit for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
- }
- }
- }
-
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
@@ -556,20 +434,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
- public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
- seek(topicPartition, offsetAndMetadata.offset());
- }
-
- @Override
- public void seekToBeginning(Collection<TopicPartition> partitions) {
+ public void seekToBeginning(TopicPartition... partitions) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
- if (partitions.isEmpty()) {
- partitions = consumers.keySet();
+ if (partitions.length == 0) {
+ partitions = consumers.keySet().toArray(new TopicPartition[0]);
}
lastCommittedOffset.clear();
lastReceivedOffset.clear();
-
+
for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
@@ -584,11 +457,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
- public void seekToEnd(Collection<TopicPartition> partitions) {
+ public void seekToEnd(TopicPartition... partitions) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
- if (partitions.isEmpty()) {
- partitions = consumers.keySet();
+ if (partitions.length == 0) {
+ partitions = consumers.keySet().toArray(new TopicPartition[0]);
}
lastCommittedOffset.clear();
lastReceivedOffset.clear();
@@ -616,42 +489,17 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
- public long position(TopicPartition topicPartition, Duration duration) {
- return position(topicPartition);
- }
-
- private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
- log.info("Resetting partition {} and seeking to {} position", partition, strategy);
- if (strategy == SubscriptionInitialPosition.Earliest) {
- seekToBeginning(Collections.singleton(partition));
- } else {
- seekToEnd(Collections.singleton(partition));
- }
- return strategy;
- }
-
- @Override
public OffsetAndMetadata committed(TopicPartition partition) {
return lastCommittedOffset.get(partition);
}
@Override
- public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
- return committed(topicPartition);
- }
-
- @Override
public Map<MetricName, ? extends Metric> metrics() {
throw new UnsupportedOperationException();
}
@Override
- public List<PartitionInfo> partitionsFor(String topic) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<PartitionInfo> partitionsFor(String s, Duration duration) {
+ public List<PartitionInfo> partitionsFor(String s) {
throw new UnsupportedOperationException();
}
@@ -661,62 +509,17 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
- public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<TopicPartition> paused() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void pause(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void resume(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
+ public void pause(TopicPartition... topicPartitions) {
throw new UnsupportedOperationException();
}
@Override
- public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
+ public void resume(TopicPartition... topicPartitions) {
throw new UnsupportedOperationException();
}
@Override
public void close() {
- close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void close(long timeout, TimeUnit unit) {
try {
closed = true;
@@ -724,20 +527,71 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
commitAsync();
}
- client.closeAsync().get(timeout, unit);
+ client.closeAsync().get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
@Override
- public void close(Duration duration) {
- close(duration.toMillis(), TimeUnit.MILLISECONDS);
+ public void wakeup() {
+ throw new UnsupportedOperationException();
}
+ /**
+ * This method is called whenever a new message is received.
+ * <p>
+ * Messages are guaranteed to be delivered in order and from the same thread for a single consumer
+ * <p>
+ * This method will only be called once for each message, unless either application or broker crashes.
+ * <p>
+ * Application is responsible of handling any exception that could be thrown while processing the message.
+ *
+ * @param consumer the consumer that received the message
+ * @param msg
+ */
@Override
- public void wakeup() {
- throw new UnsupportedOperationException();
+ public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
+ // Block listener thread if the application is slowing down
+ try {
+ receivedMessages.put(new QueueItem(consumer, msg));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (closed) {
+ // Consumer was closed and the thread was interrupted. Nothing to worry about here
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
+ log.info("Resetting partition {} and seeking to {} position", partition, strategy);
+ if (strategy == SubscriptionInitialPosition.Earliest) {
+ seekToBeginning(partition);
+ } else {
+ seekToEnd(partition);
+ }
+ return strategy;
}
+ @SuppressWarnings("unchecked")
+ private K getKey(String topic, Message<byte[]> msg) {
+ if (!msg.hasKey()) {
+ return null;
+ }
+
+ if (keySchema instanceof PulsarKafkaSchema) {
+ PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
+ Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
+ if (kafkaDeserializer instanceof StringDeserializer) {
+ return (K) msg.getKey();
+ }
+ pulsarKafkaSchema.setTopic(topic);
+ }
+ // Assume base64 encoding
+ byte[] data = Base64.getDecoder().decode(msg.getKey());
+ return keySchema.decode(data);
+
+ }
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
similarity index 75%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 14dd78b..b853459 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -19,13 +19,13 @@
package org.apache.kafka.clients.producer;
import java.nio.charset.StandardCharsets;
-import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -34,30 +34,25 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
-import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,10 +69,10 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private final Partitioner partitioner;
private volatile Cluster cluster = Cluster.empty();
- private List<ProducerInterceptor<K, V>> interceptors;
-
private final Properties properties;
+ private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
+
public PulsarKafkaProducer(Map<String, Object> configs) {
this(new ProducerConfig(configs), null, null);
}
@@ -128,7 +123,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
partitioner.configure(producerConfig.originals());
this.properties = new Properties();
- producerConfig.originals().forEach((k, v) -> properties.put(k, v));
+ producerConfig.originals().forEach(properties::put);
long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
@@ -137,9 +132,13 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
// Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
// If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
- client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS).build();
+ client = PulsarClientKafkaConfig.getClientBuilder(properties)
+ .serviceUrl(serviceUrl)
+ .keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS)
+ .build();
} catch (ArithmeticException e) {
- String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
+ String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. " +
+ "Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
logger.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
} catch (PulsarClientException e) {
@@ -170,47 +169,20 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
-
- interceptors = (List) producerConfig.getConfiguredInstances(
- ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
- }
-
- @Override
- public void initTransactions() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void beginTransaction() throws ProducerFencedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s) throws ProducerFencedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void commitTransaction() throws ProducerFencedException {
- throw new UnsupportedOperationException();
}
- @Override
- public void abortTransaction() throws ProducerFencedException {
- throw new UnsupportedOperationException();
- }
@Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
- return send(record, null);
+ public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
+ return send(producerRecord, null);
}
@Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+ public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
org.apache.pulsar.client.api.Producer<byte[]> producer;
try {
- producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic));
+ producer = producers.computeIfAbsent(producerRecord.topic(), topic -> createNewProducer(topic));
} catch (Exception e) {
if (callback != null) {
callback.onCompletion(null, e);
@@ -220,12 +192,11 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
return future;
}
- TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
- int messageSize = buildMessage(messageBuilder, record);
+ TypedMessageBuilder<byte[]> messageBuilder = buildMessage(producer, producerRecord);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
messageBuilder.sendAsync().thenAccept((messageId) -> {
- future.complete(getRecordMetadata(record.topic(), messageBuilder, messageId, messageSize));
+ future.complete(getRecordMetadata(producerRecord.topic(), messageId));
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
@@ -251,7 +222,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
@Override
- public List<PartitionInfo> partitionsFor(String topic) {
+ public List<PartitionInfo> partitionsFor(String s) {
throw new UnsupportedOperationException();
}
@@ -275,44 +246,41 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
}
- @Override
- public void close(Duration duration) {
- close(duration.toMillis(), TimeUnit.MILLISECONDS);
- }
-
private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
try {
// Add the partitions info for the new topic
- synchronized (this){
- cluster = cluster.withPartitions(readPartitionsInfo(topic));
- }
- List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
- .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
- .collect(Collectors.toList());
+ cluster = addPartitionsInfo(cluster, topic);
return pulsarProducerBuilder.clone()
.topic(topic)
- .intercept(wrappedInterceptors.toArray(new org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()]))
.create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
- private Map<TopicPartition, PartitionInfo> readPartitionsInfo(String topic) {
+ /**
+ * Add the partitions info for the new topic.
+ * Need to ensure the atomicity of the update operation.
+ */
+ private synchronized Cluster addPartitionsInfo(Cluster cluster, String topic) {
List<String> partitions = client.getPartitionsForTopic(topic).join();
-
- Map<TopicPartition, PartitionInfo> partitionsInfo = new HashMap<>();
-
+ // read partitions info
+ Set<PartitionInfo> partitionsInfo = new HashSet<>();
for (int i = 0; i < partitions.size(); i++) {
- TopicPartition tp = new TopicPartition(topic, i);
- PartitionInfo pi = new PartitionInfo(topic, i, null, null, null);
- partitionsInfo.put(tp, pi);
+ partitionsInfo.add(new PartitionInfo(topic, i, null, null, null));
}
-
- return partitionsInfo;
+ // create cluster with new partitions info
+ Set<PartitionInfo> combinedPartitions = new HashSet<>();
+ if (cluster.partitionsForTopic(topic) != null) {
+ combinedPartitions.addAll(cluster.partitionsForTopic(topic));
+ }
+ combinedPartitions.addAll(partitionsInfo);
+ return new Cluster(cluster.nodes(), combinedPartitions, new HashSet(cluster.unauthorizedTopics()));
}
- private int buildMessage(TypedMessageBuilder<byte[]> builder, ProducerRecord<K, V> record) {
+ private TypedMessageBuilder<byte[]> buildMessage(org.apache.pulsar.client.api.Producer<byte[]> producer, ProducerRecord<K, V> record) {
+ TypedMessageBuilder<byte[]> builder = producer.newMessage();
+
byte[] keyBytes = null;
if (record.key() != null) {
String key = getKey(record.topic(), record.key());
@@ -320,10 +288,6 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
builder.key(key);
}
- if (record.timestamp() != null) {
- builder.eventTime(record.timestamp());
- }
-
if (valueSchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
}
@@ -338,8 +302,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
}
-
- return value.length;
+ return builder;
}
private String getKey(String topic, K key) {
@@ -354,8 +317,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
return Base64.getEncoder().encodeToString(keyBytes);
}
- private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
- int size) {
+ private RecordMetadata getRecordMetadata(String topic, MessageId messageId) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
// Combine ledger id and entry id to form offset
@@ -363,9 +325,6 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
int partition = msgId.getPartitionIndex();
TopicPartition tp = new TopicPartition(topic, partition);
- TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
- return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
+ return new RecordMetadata(tp, offset, 0L);
}
-
- private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
similarity index 50%
copy from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
copy to pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
index d8f7680..d3fb915 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
@@ -18,27 +18,27 @@
*/
package org.apache.pulsar.client.kafka.compat;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import java.util.concurrent.ThreadLocalRandom;
-public class MessageIdUtils {
- public static final long getOffset(MessageId messageId) {
- MessageIdImpl msgId = (MessageIdImpl) messageId;
- long ledgerId = msgId.getLedgerId();
- long entryId = msgId.getEntryId();
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
- // Combine ledger id and entry id to form offset
- // Use less than 32 bits to represent entry id since it will get
- // rolled over way before overflowing the max int range
- long offset = (ledgerId << 28) | entryId;
- return offset;
- }
+public class KafkaMessageRouter extends RoundRobinPartitionMessageRouterImpl {
+
+ public static final String PARTITION_ID = "pulsar.partition.id";
- public static final MessageId getMessageId(long offset) {
- // Demultiplex ledgerId and entryId from offset
- long ledgerId = offset >>> 28;
- long entryId = offset & 0x0F_FF_FF_FFL;
+ public KafkaMessageRouter(long maxBatchingDelayMs) {
+ super(HashingScheme.JavaStringHash, ThreadLocalRandom.current().nextInt(), true, maxBatchingDelayMs);
+ }
- return new MessageIdImpl(ledgerId, entryId, -1);
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+ if (msg.hasProperty(PARTITION_ID)) {
+ return Integer.parseInt(msg.getProperty(PARTITION_ID));
+ } else {
+ return super.choosePartition(msg, metadata);
+ }
}
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
new file mode 100644
index 0000000..c38d93d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarClientKafkaConfig {
+
+ /// Config variables
+ public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
+ public static final String AUTHENTICATION_PARAMS_MAP = "pulsar.authentication.params.map";
+ public static final String AUTHENTICATION_PARAMS_STRING = "pulsar.authentication.params.string";
+ public static final String USE_TLS = "pulsar.use.tls";
+ public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
+ public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+ public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
+
+ public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
+ public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
+ public static final String NUM_IO_THREADS = "pulsar.num.io.threads";
+
+ public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker";
+
+ public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";
+
+ public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
+ public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
+
+ public static ClientBuilder getClientBuilder(Properties properties) {
+ ClientBuilder clientBuilder = PulsarClient.builder();
+
+ if (properties.containsKey(AUTHENTICATION_CLASS)) {
+ String className = properties.getProperty(AUTHENTICATION_CLASS);
+ try {
+ if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+ String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+ clientBuilder.authentication(className, authParamsString);
+ } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+ Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+ clientBuilder.authentication(className, authParams);
+ } else {
+ @SuppressWarnings("unchecked")
+ Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+ Authentication auth = clazz.newInstance();
+ clientBuilder.authentication(auth);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+ clientBuilder.allowTlsInsecureConnection(
+ Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+ clientBuilder.enableTlsHostnameVerification(
+ Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+ if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+ clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+ }
+
+ if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+ clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+ TimeUnit.MILLISECONDS);
+ }
+
+ if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
+ clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+ TimeUnit.SECONDS);
+ }
+
+ if (properties.containsKey(NUM_IO_THREADS)) {
+ clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+ }
+
+ if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
+ clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+ }
+
+ if (properties.containsKey(USE_TCP_NODELAY)) {
+ clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+ }
+
+ if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
+ clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+ }
+
+ if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
+ clientBuilder.maxNumberOfRejectedRequestPerConnection(
+ Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
+ }
+
+ return clientBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
new file mode 100644
index 0000000..a527827
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+
+public class PulsarConsumerKafkaConfig {
+
+ /// Config variables
+ public static final String CONSUMER_NAME = "pulsar.consumer.name";
+ public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
+ public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = "pulsar.consumer.acknowledgments.group.time.millis";
+ public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
+ public static final String SUBSCRIPTION_TOPICS_MODE = "pulsar.consumer.subscription.topics.mode";
+
+ public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
+ ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
+
+ if (properties.containsKey(CONSUMER_NAME)) {
+ consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
+ }
+
+ if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
+ consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+ }
+
+ if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
+ consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
+ Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
+ }
+
+ if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) {
+ consumerBuilder.acknowledgmentGroupTime(
+ Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
+ }
+
+ if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
+ RegexSubscriptionMode mode;
+ try {
+ mode = RegexSubscriptionMode.valueOf(properties.getProperty(SUBSCRIPTION_TOPICS_MODE));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal subscription mode, valid values are: "
+ + Arrays.asList(RegexSubscriptionMode.values()));
+ }
+ consumerBuilder.subscriptionTopicsMode(mode);
+ }
+
+ return consumerBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
new file mode 100644
index 0000000..807f482
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PulsarKafkaSchema<T> implements Schema<T> {
+
+ private final Serializer<T> kafkaSerializer;
+
+ private final Deserializer<T> kafkaDeserializer;
+
+ private String topic;
+
+ public PulsarKafkaSchema(Serializer<T> serializer) {
+ this(serializer, null);
+ }
+
+ public PulsarKafkaSchema(Deserializer<T> deserializer) {
+ this(null, deserializer);
+ }
+
+ public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
+ this.kafkaSerializer = serializer;
+ this.kafkaDeserializer = deserializer;
+ }
+
+ public Serializer<T> getKafkaSerializer() {
+ return kafkaSerializer;
+ }
+
+ public Deserializer<T> getKafkaDeserializer() {
+ return kafkaDeserializer;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public byte[] encode(T message) {
+ checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
+ return kafkaSerializer.serialize(this.topic, message);
+ }
+
+ @Override
+ public T decode(byte[] message) {
+ checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
+ return kafkaDeserializer.deserialize(this.topic, message);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return Schema.BYTES.getSchemaInfo();
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
new file mode 100644
index 0000000..5a9a651
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarProducerKafkaConfig {
+
+ /// Config variables
+ public static final String PRODUCER_NAME = "pulsar.producer.name";
+ public static final String INITIAL_SEQUENCE_ID = "pulsar.producer.initial.sequence.id";
+
+ public static final String MAX_PENDING_MESSAGES = "pulsar.producer.max.pending.messages";
+ public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
+ public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
+ public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+
+ public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer();
+
+ if (properties.containsKey(PRODUCER_NAME)) {
+ producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
+ }
+
+ if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
+ producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+ }
+
+ if (properties.containsKey(MAX_PENDING_MESSAGES)) {
+ producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+ }
+
+ if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
+ producerBuilder.maxPendingMessagesAcrossPartitions(
+ Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
+ }
+
+ producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
+
+ if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
+ producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+ }
+
+ return producerBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
new file mode 100644
index 0000000..4d12418
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
+public class PulsarKafkaProducerTest {
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ private int field3;
+ }
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Bar {
+ private boolean field1;
+ }
+
+ @ObjectFactory
+ // Necessary to make PowerMockito.mockStatic work with TestNG.
+ public IObjectFactory getObjectFactory() {
+ return new org.powermock.modules.testng.PowerMockObjectFactory();
+ }
+
+ @Test
+ public void testPulsarKafkaProducer() {
+ ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+ ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+ return mockProducerBuilder;
+ }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
+ doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+ return mockClientBuilder;
+ }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+
+ PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+ PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+ when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
+
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+ new PulsarKafkaProducer<>(properties);
+
+ verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
+ verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testPulsarKafkaSendAvro() throws PulsarClientException {
+ // Arrange
+ PulsarClient mockClient = mock(PulsarClient.class);
+ ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+ ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+ CompletableFuture mockPartitionFuture = new CompletableFuture();
+ CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+ TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+ mockPartitionFuture.complete(new ArrayList<>());
+ mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+ doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+ doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+ doReturn(mockClient).when(mockClientBuilder).build();
+ doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+ doReturn(mockProducer).when(mockProducerBuilder).create();
+ doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+ doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+ PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+ PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+ when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ // Act
+ PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+
+ pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, foo, bar));
+
+ // Verify
+ verify(mockTypedMessageBuilder).sendAsync();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
+ public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
+
+ new PulsarKafkaProducer<>(properties);
+ }
+
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
similarity index 97%
rename from pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
index d8f7680..34920c3 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.kafka.compat;
+package org.apache.pulsar.client.util;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;