You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/11 08:30:58 UTC
[pulsar-adapters] branch master updated: Feature: Run Kafka streams
app with Pulsar (#10)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push:
new e3f72a4 Feature: Run Kafka streams app with Pulsar (#10)
e3f72a4 is described below
commit e3f72a4e6735219492de69ef98e76d16e8b73e0c
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Sun Apr 11 01:30:50 2021 -0700
Feature: Run Kafka streams app with Pulsar (#10)
---
examples/kafka-streams/pom.xml | 130 ++
.../kafkastreams/pulsar/example/LineSplit.java | 102 ++
.../kafka/kafkastreams/pulsar/example/README.md | 136 ++
.../kafkastreams/pulsar/example/WordCount.java | 96 ++
examples/pom.xml | 1 +
pom.xml | 7 +-
.../pulsar-client-kafka-shaded/pom.xml | 42 +-
.../pulsar-client-kafka/pom.xml | 15 +
.../apache/kafka/clients/admin/PulsarAdmin.java | 1399 ++++++++++++++++++++
.../clients/admin/PulsarKafkaAdminClient.java | 362 +++++
.../consumer/PulsarConsumerCoordinator.java | 137 ++
.../clients/consumer/PulsarKafkaConsumer.java | 137 +-
.../clients/producer/PulsarKafkaProducer.java | 16 +-
.../kafka/compat/PulsarClientKafkaConfig.java | 37 +
14 files changed, 2569 insertions(+), 48 deletions(-)
diff --git a/examples/kafka-streams/pom.xml b/examples/kafka-streams/pom.xml
new file mode 100644
index 0000000..b90bc0a
--- /dev/null
+++ b/examples/kafka-streams/pom.xml
@@ -0,0 +1,130 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar.examples</groupId>
+ <artifactId>pulsar-adapters-examples</artifactId>
+ <version>2.8.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>kafka-streams</artifactId>
+ <name>Pulsar Examples :: Kafka Streams</name>
+
+ <properties>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-kafka</artifactId>
+ <version>2.8.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- Apache Kafka dependencies -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams</artifactId>
+ <version>${kafka-client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
+
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <classifier>linux-x86_64</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-unix-common</artifactId>
+ <classifier>linux-x86_64</classifier>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>pulsar-kafka-streams-examples</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.spark.streaming.receiver.example.SparkStreamingPulsarReceiverExample</mainClass>
+ </transformer>
+ </transformers>
+ <finalName>pulsar-kafka-streams-examples</finalName>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ <include>io.netty:netty-codec-http</include>
+ <include>io.netty:netty-transport-native-epoll</include>
+ <include>io.netty:netty</include>
+ <include>io.netty:netty-all</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ </build>
+
+</project>
diff --git a/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/LineSplit.java b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/LineSplit.java
new file mode 100644
index 0000000..97c80f9
--- /dev/null
+++ b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/LineSplit.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.kafkastreams.pulsar.example;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * In this example, we implement a simple LineSplit program using the high-level Streams DSL
+ * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text;
+ * the code split each text line in string into words and then write back into a sink topic "streams-linesplit-output" where
+ * each record represents a single word.
+ */
+public class LineSplit {
+
+ /*
+ to run:
+ mvn clean package
+ mvn exec:java -Dexec.mainClass=org.apache.kafka.kafkastreams.pulsar.example.LineSplit
+
+ it need running pulsar (standalone)
+
+ topic to read from:
+ bin/pulsar-client consume streams-linesplit-output -s test -p Earliest -n 0
+
+ topic to produce to:
+ bin/pulsar-client produce streams-plaintext-input --messages "I sent a few words"
+ */
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
+
+ // kafka would do something like
+ // props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+ props.put("pulsar.admin.url", "http://localhost:8080");
+
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ props.put("key.deserializer", StringSerializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ props.put("group.id", "linesplit-subscription-name");
+ props.put("enable.auto.commit", "true");
+
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.<String, String>stream("streams-plaintext-input")
+ .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+ .to("streams-linesplit-output");
+
+ final Topology topology = builder.build();
+
+ final KafkaStreams streams = new KafkaStreams(topology, props);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+ @Override
+ public void run() {
+ streams.close();
+ latch.countDown();
+ }
+ });
+
+ try {
+ streams.start();
+ latch.await();
+ } catch (Throwable e) {
+ System.exit(1);
+ }
+ System.exit(0);
+ }
+}
diff --git a/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/README.md b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/README.md
new file mode 100644
index 0000000..c44c7e1
--- /dev/null
+++ b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/README.md
@@ -0,0 +1,136 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+## Apache Kafka Streams for Pulsar
+
+This page describes how to use the [Kafka Streams](https://kafka.apache.org/27/documentation/streams/) with Pulsar topics.
+
+## Example
+
+### LineSplit
+
+This Kafka Streams job is consuming from a Pulsar topic, splitting lines into words in a streaming fashion.
+The job writes the words to another Pulsar topic.
+
+The steps to run the example:
+
+1. Start Pulsar Standalone.
+
+ You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
+
+ ```shell
+ $ bin/pulsar standalone
+ ```
+
+2. Create topics to use later. The easiest way to do so is to produce a message to the topic:
+
+ ```shell
+ $ bin/pulsar-client produce streams-plaintext-input --messages "tada"
+ $ bin/pulsar-client produce streams-linesplit-output --messages "tada"
+ ```
+
+3. Build the examples.
+
+ ```shell
+ $ cd ${PULSAR_ADAPTORS_HOME}/examples/kafka-streams
+ $ mvn clean package
+ ```
+
+4. Run the example.
+
+ ```shell
+ $ mvn exec:java -Dexec.mainClass=org.apache.kafka.kafkastreams.pulsar.example.LineSplit
+ ```
+
+5. Produce messages to topic `streams-plaintext-input`.
+
+ ```shell
+ $ bin/pulsar-client produce streams-plaintext-input --messages "I produced a few words"
+ ```
+
+6. You can check that consumer sees the words in output topic `streams-linesplit-output`, e.g.:
+
+ ```shell
+ $ bin/pulsar-client consume streams-linesplit-output -s test -p Earliest -n 0
+ ```
+ outputs something like
+ ```text
+ ----- got message -----
+ key:[null], properties:[pulsar.partition.id=0], content:I
+ ----- got message -----
+ key:[null], properties:[pulsar.partition.id=0], content:produced
+ ----- got message -----
+ key:[null], properties:[pulsar.partition.id=0], content:a
+ ----- got message -----
+ key:[null], properties:[pulsar.partition.id=0], content:few
+ ----- got message -----
+ key:[null], properties:[pulsar.partition.id=0], content:words
+ ```
+
+### WordCount
+
+This Kafka Streams job is consuming from a Pulsar topic, splitting lines into words in a streaming fashion.
+The job writes the words and counts to another Pulsar topic.
+
+The steps to run the example:
+
+1. Start Pulsar Standalone.
+
+ You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
+
+ ```shell
+ $ bin/pulsar standalone
+ ```
+
+2. Build the examples.
+
+ ```shell
+ $ cd ${PULSAR_ADAPTORS_HOME}/examples/kafka-streams
+ $ mvn clean package
+ ```
+
+3. Run the example.
+
+ ```shell
+ $ mvn exec:java -Dexec.mainClass=org.apache.kafka.kafkastreams.pulsar.example.WordCount
+ ```
+
+6. Produce messages to topic `streams-plaintext-input`.
+
+ ```shell
+ $ bin/pulsar-client produce streams-plaintext-input --messages "a a a a b c"
+ ```
+
+6. You can check that consumer sees the words in output topic `streams-wordcount-output`, e.g.:
+
+ ```shell
+ $ bin/pulsar-client consume streams-wordcount-output -s test -p Earliest -n 0
+ ```
+ outputs something like
+ ```text
+ ----- got message -----
+ key:[YQ==], properties:[pulsar.partition.id=0], content:
+ ----- got message -----
+ key:[Yg==], properties:[pulsar.partition.id=0], content:
+ ----- got message -----
+ key:[Yw==], properties:[pulsar.partition.id=0], content:
+ ```
+ (not sure how to force it to decode String key and Long value from byte[]/byte[] topic)
diff --git a/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/WordCount.java b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/WordCount.java
new file mode 100644
index 0000000..55d5e49
--- /dev/null
+++ b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/WordCount.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.kafkastreams.pulsar.example;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * In this example, we implement a simple WordCount program using the high-level Streams DSL
+ * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
+ * split each text line into words and then compute the word occurence histogram, write the continuous updated histogram
+ * into a topic "streams-wordcount-output" where each record is an updated count of a single word.
+ */
+public class WordCount {
+
+ public static void main(String[] args) throws Exception {
+ Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+
+ // kafka would do something like
+ // props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+ props.put("pulsar.admin.url", "http://localhost:8080");
+
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+ props.put("key.deserializer", StringSerializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ props.put("group.id", "streams-wordcount-subscription-name");
+ props.put("enable.auto.commit", "true");
+
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.<String, String>stream("streams-plaintext-input")
+ .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+ .groupBy((key, value) -> value)
+ .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
+ .toStream()
+ .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
+
+ final Topology topology = builder.build();
+ final KafkaStreams streams = new KafkaStreams(topology, props);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+ @Override
+ public void run() {
+ streams.close();
+ latch.countDown();
+ }
+ });
+
+ try {
+ streams.start();
+ latch.await();
+ } catch (Throwable e) {
+ System.exit(1);
+ }
+ System.exit(0);
+ }
+}
diff --git a/examples/pom.xml b/examples/pom.xml
index cfe3a4c..55382c4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>flink</module>
<module>spark</module>
+ <module>kafka-streams</module>
</modules>
</project>
diff --git a/pom.xml b/pom.xml
index 0c5ace5..df4d9fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
<pulsar.version>2.8.0-SNAPSHOT</pulsar.version>
<flink.version>1.7.2</flink.version>
<storm.version>2.0.0</storm.version>
- <kafka-client.version>2.3.0</kafka-client.version>
+ <kafka-client.version>2.7.0</kafka-client.version>
<kafka_0_8.version>0.8.1.1</kafka_0_8.version>
<maven.compiler.source>1.8</maven.compiler.source>
@@ -319,6 +319,11 @@
<type>test-jar</type>
<version>${pulsar.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-utils</artifactId>
+ <version>${pulsar.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
index d2cd4a3..4d847ca 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
@@ -72,14 +72,31 @@
<include>commons-codec:commons-codec</include>
<include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
<include>commons-collections:commons-collections</include>
- <include>org.asynchttpclient:*</include>
+ <!--
+ Do not shade AHC
+ <include>org.asynchttpclient:*</include>
+
+ it results in stuff like
+ java.lang.NumberFormatException: null
+ at java.lang.Integer.parseInt (Integer.java:614)
+ at java.lang.Integer.parseInt (Integer.java:770)
+ at org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigHelper$Config.getInt (AsyncHttpClientConfigHelper.java:85)
+ at org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultMaxRedirects (AsyncHttpClientConfigDefaults.java:138)
+ at org.apache.pulsar.shade.org.asynchttpclient.DefaultAsyncHttpClientConfig$Builder.<init> (DefaultAsyncHttpClientConfig.java:702)
+ at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.<init> (AsyncHttpConnector.java:97)
+ at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider.getConnector (AsyncHttpConnectorProvider.java:52)
+ at org.apache.pulsar.client.admin.internal.PulsarAdminImpl.<init> (PulsarAdminImpl.java:196)
+ at org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl.build (PulsarAdminBuilderImpl.java:47)
+
+ similar to https://github.com/playframework/play-ws/issues/229
+ -->
<include>io.netty:netty-codec-http</include>
<include>io.netty:netty-transport-native-epoll</include>
<include>org.reactivestreams:reactive-streams</include>
<include>com.typesafe.netty:netty-reactive-streams</include>
<include>org.javassist:javassist</include>
<include>com.google.protobuf:protobuf-java</include>
- <include>com.google.guava:guava</include>
+ <include>com.google.guava:*</include>
<include>com.google.code.gson:gson</include>
<include>com.fasterxml.jackson.core</include>
<include>com.fasterxml.jackson.module</include>
@@ -129,14 +146,25 @@
<pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
<shadedPattern>org.apache.kafka.clients.consumer.KafkaConsumer</shadedPattern>
</relocation>
-
- <!-- General relocation rules for Pulsar client dependencies -->
-
<relocation>
- <pattern>org.asynchttpclient</pattern>
- <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+ <pattern>org.apache.kafka.clients.admin.KafkaAdminClient</pattern>
+ <shadedPattern>org.apache.kafka.clients.admin.OriginalKafkaAdminClient</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.admin.PulsarKafkaAdminClient</pattern>
+ <shadedPattern>org.apache.kafka.clients.admin.KafkaAdminClient</shadedPattern>
</relocation>
<relocation>
+ <pattern>org.apache.kafka.clients.admin.Admin.class</pattern>
+ <shadedPattern>org.apache.kafka.clients.admin.OriginalAdmin.class</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.admin.PulsarAdmin</pattern>
+ <shadedPattern>org.apache.kafka.clients.admin.Admin</shadedPattern>
+ </relocation>
+
+ <!-- General relocation rules for Pulsar client dependencies -->
+ <relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
</relocation>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 0adfa0d..ba33c14 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -43,11 +43,26 @@
</dependency>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-admin</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-utils</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
+ <groupId>org.asynchttpclient</groupId>
+ <artifactId>async-http-client</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-client.version}</version>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarAdmin.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarAdmin.java
new file mode 100644
index 0000000..e899bfc
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarAdmin.java
@@ -0,0 +1,1399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.FeatureUpdateFailedException;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
+
+/**
+ * A copy of kafka's admin interface (from v.2.7) to shade and replace.
+ * The only point of doing so is to deal with static methods that cannot be overridden
+ * and force them provide instance of PulsarKafkaAdminClient instead of KafkaAdminClient.
+ * Specifically this is needed for use with KafkaStreams / DefaultKafkaClientSupplier.
+ */
+
+/**
+ * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
+ * <p>
+ * The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker
+ * version required.
+ * <p>
+ * This client was introduced in 0.11.0.0 and the API is still evolving. We will try to evolve the API in a compatible
+ * manner, but we reserve the right to make breaking changes in minor releases, if necessary. We will update the
+ * {@code InterfaceStability} annotation and this notice once the API is considered stable.
+ */
+@InterfaceStability.Evolving
+public interface PulsarAdmin extends AutoCloseable {
+
+ /**
+ * Create a new Admin with the given configuration.
+ *
+ * @param props The configuration.
+ * @return The new KafkaAdminClient.
+ */
+ static Admin create(Properties props) {
+ return PulsarKafkaAdminClient.create(props);
+ }
+
+ /**
+ * Create a new Admin with the given configuration.
+ *
+ * @param conf The configuration.
+ * @return The new KafkaAdminClient.
+ */
+ static Admin create(Map<String, Object> conf) {
+ return PulsarKafkaAdminClient.create(conf);
+ }
+
+ /**
+ * Close the Admin and release all associated resources.
+ * <p>
+ * See {@link #close(long, TimeUnit)}
+ */
+ @Override
+ default void close() {
+ close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Close the Admin and release all associated resources.
+ * <p>
+ * The close operation has a grace period during which current operations will be allowed to
+ * complete, specified by the given duration and time unit.
+ * New operations will not be accepted during the grace period. Once the grace period is over,
+ * all operations that have not yet been completed will be aborted with a {@link org.apache.kafka.common.errors.TimeoutException}.
+ *
+ * @param duration The duration to use for the wait time.
+ * @param unit The time unit to use for the wait time.
+ * @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
+ */
+ @Deprecated
+ default void close(long duration, TimeUnit unit) {
+ close(Duration.ofMillis(unit.toMillis(duration)));
+ }
+
+ /**
+ * Close the Admin client and release all associated resources.
+ * <p>
+ * The close operation has a grace period during which current operations will be allowed to
+ * complete, specified by the given duration.
+ * New operations will not be accepted during the grace period. Once the grace period is over,
+ * all operations that have not yet been completed will be aborted with a {@link org.apache.kafka.common.errors.TimeoutException}.
+ *
+ * @param timeout The time to use for the wait time.
+ */
+ void close(Duration timeout);
+
+ /**
+ * Create a batch of new topics with the default options.
+ * <p>
+ * This is a convenience method for {@link #createTopics(Collection, CreateTopicsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.10.1.0 or higher.
+ *
+ * @param newTopics The new topics to create.
+ * @return The CreateTopicsResult.
+ */
+ default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
+ return createTopics(newTopics, new CreateTopicsOptions());
+ }
+
+ /**
+ * Create a batch of new topics.
+ * <p>
+ * This operation is not transactional so it may succeed for some topics while fail for others.
+ * <p>
+ * It may take several seconds after {@link CreateTopicsResult} returns
+ * success for all the brokers to become aware that the topics have been created.
+ * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)}
+ * may not return information about the new topics.
+ * <p>
+ * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
+ * from version 0.10.2.0.
+ *
+ * @param newTopics The new topics to create.
+ * @param options The options to use when creating the new topics.
+ * @return The CreateTopicsResult.
+ */
+ CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options);
+
+ /**
+ * This is a convenience method for {@link #deleteTopics(Collection, DeleteTopicsOptions)}
+ * with default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.10.1.0 or higher.
+ *
+ * @param topics The topic names to delete.
+ * @return The DeleteTopicsResult.
+ */
+ default DeleteTopicsResult deleteTopics(Collection<String> topics) {
+ return deleteTopics(topics, new DeleteTopicsOptions());
+ }
+
+ /**
+ * Delete a batch of topics.
+ * <p>
+ * This operation is not transactional so it may succeed for some topics while fail for others.
+ * <p>
+ * It may take several seconds after the {@link DeleteTopicsResult} returns
+ * success for all the brokers to become aware that the topics are gone.
+ * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)}
+ * may continue to return information about the deleted topics.
+ * <p>
+ * If delete.topic.enable is false on the brokers, deleteTopics will mark
+ * the topics for deletion, but not actually delete them. The futures will
+ * return successfully in this case.
+ * <p>
+ * This operation is supported by brokers with version 0.10.1.0 or higher.
+ *
+ * @param topics The topic names to delete.
+ * @param options The options to use when deleting the topics.
+ * @return The DeleteTopicsResult.
+ */
+ DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+
+ /**
+ * List the topics available in the cluster with the default options.
+ * <p>
+ * This is a convenience method for {@link #listTopics(ListTopicsOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @return The ListTopicsResult.
+ */
+ default ListTopicsResult listTopics() {
+ return listTopics(new ListTopicsOptions());
+ }
+
+ /**
+ * List the topics available in the cluster.
+ *
+ * @param options The options to use when listing the topics.
+ * @return The ListTopicsResult.
+ */
+ ListTopicsResult listTopics(ListTopicsOptions options);
+
+ /**
+ * Describe some topics in the cluster, with the default options.
+ * <p>
+ * This is a convenience method for {@link #describeTopics(Collection, DescribeTopicsOptions)} with
+ * default options. See the overload for more details.
+ *
+ * @param topicNames The names of the topics to describe.
+ * @return The DescribeTopicsResult.
+ */
+ default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
+ return describeTopics(topicNames, new DescribeTopicsOptions());
+ }
+
+ /**
+ * Describe some topics in the cluster.
+ *
+ * @param topicNames The names of the topics to describe.
+ * @param options The options to use when describing the topic.
+ * @return The DescribeTopicsResult.
+ */
+ DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options);
+
+ /**
+ * Get information about the nodes in the cluster, using the default options.
+ * <p>
+ * This is a convenience method for {@link #describeCluster(DescribeClusterOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @return The DescribeClusterResult.
+ */
+ default DescribeClusterResult describeCluster() {
+ return describeCluster(new DescribeClusterOptions());
+ }
+
+ /**
+ * Get information about the nodes in the cluster.
+ *
+ * @param options The options to use when getting information about the cluster.
+ * @return The DescribeClusterResult.
+ */
+ DescribeClusterResult describeCluster(DescribeClusterOptions options);
+
+ /**
+ * This is a convenience method for {@link #describeAcls(AclBindingFilter, DescribeAclsOptions)} with
+ * default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param filter The filter to use.
+ * @return The DeleteAclsResult.
+ */
+ default DescribeAclsResult describeAcls(AclBindingFilter filter) {
+ return describeAcls(filter, new DescribeAclsOptions());
+ }
+
+ /**
+ * Lists access control lists (ACLs) according to the supplied filter.
+ * <p>
+ * Note: it may take some time for changes made by {@code createAcls} or {@code deleteAcls} to be reflected
+ * in the output of {@code describeAcls}.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param filter The filter to use.
+ * @param options The options to use when listing the ACLs.
+ * @return The DeleteAclsResult.
+ */
+ DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
+
+ /**
+ * This is a convenience method for {@link #createAcls(Collection, CreateAclsOptions)} with
+ * default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param acls The ACLs to create
+ * @return The CreateAclsResult.
+ */
+ default CreateAclsResult createAcls(Collection<AclBinding> acls) {
+ return createAcls(acls, new CreateAclsOptions());
+ }
+
+ /**
+ * Creates access control lists (ACLs) which are bound to specific resources.
+ * <p>
+ * This operation is not transactional so it may succeed for some ACLs while fail for others.
+ * <p>
+ * If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
+ * no changes will be made.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param acls The ACLs to create
+ * @param options The options to use when creating the ACLs.
+ * @return The CreateAclsResult.
+ */
+ CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
+
+ /**
+ * This is a convenience method for {@link #deleteAcls(Collection, DeleteAclsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param filters The filters to use.
+ * @return The DeleteAclsResult.
+ */
+ default DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
+ return deleteAcls(filters, new DeleteAclsOptions());
+ }
+
+ /**
+ * Deletes access control lists (ACLs) according to the supplied filters.
+ * <p>
+ * This operation is not transactional so it may succeed for some ACLs while fail for others.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param filters The filters to use.
+ * @param options The options to use when deleting the ACLs.
+ * @return The DeleteAclsResult.
+ */
+ DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
+
+
+ /**
+ * Get the configuration for the specified resources with the default options.
+ * <p>
+ * This is a convenience method for {@link #describeConfigs(Collection, DescribeConfigsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param resources The resources (topic and broker resource types are currently supported)
+ * @return The DescribeConfigsResult
+ */
+ default DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
+ return describeConfigs(resources, new DescribeConfigsOptions());
+ }
+
+ /**
+ * Get the configuration for the specified resources.
+ * <p>
+ * The returned configuration includes default values and the isDefault() method can be used to distinguish them
+ * from user supplied values.
+ * <p>
+ * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
+ * is not disclosed.
+ * <p>
+ * Config entries where isReadOnly() is true cannot be updated.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param resources The resources (topic and broker resource types are currently supported)
+ * @param options The options to use when describing configs
+ * @return The DescribeConfigsResult
+ */
+ DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options);
+
+ /**
+ * Update the configuration for the specified resources with the default options.
+ * <p>
+ * This is a convenience method for {@link #alterConfigs(Map, AlterConfigsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param configs The resources with their configs (topic is the only resource type with configs that can
+ * be updated currently)
+ * @return The AlterConfigsResult
+ * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
+ */
+ @Deprecated
+ default AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
+ return alterConfigs(configs, new AlterConfigsOptions());
+ }
+
+ /**
+ * Update the configuration for the specified resources with the default options.
+ * <p>
+ * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
+ * a particular resource are updated atomically.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param configs The resources with their configs (topic is the only resource type with configs that can
+ * be updated currently)
+ * @param options The options to use when describing configs
+ * @return The AlterConfigsResult
+ * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}.
+ */
+ @Deprecated
+ AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+
+ /**
+ * Incrementally updates the configuration for the specified resources with default options.
+ * <p>
+ * This is a convenience method for {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 2.3.0 or higher.
+ *
+ * @param configs The resources with their configs
+ * @return The AlterConfigsResult
+ */
+ default AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
+ return incrementalAlterConfigs(configs, new AlterConfigsOptions());
+ }
+
+ /**
+ * Incrementally update the configuration for the specified resources.
+ * <p>
+ * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
+ * a particular resource are updated atomically.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@link AlterConfigsResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * if the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
+ * if the authenticated user didn't have alter access to the Topic.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * if the Topic doesn't exist.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+ * if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.3.0 or higher.
+ *
+ * @param configs The resources with their configs
+ * @param options The options to use when altering configs
+ * @return The AlterConfigsResult
+ */
+ AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
+ Collection<AlterConfigOp>> configs, AlterConfigsOptions options);
+
+ /**
+ * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
+ * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
+ * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
+ * log directory if it is not already there. For detailed result, inspect the returned {@link AlterReplicaLogDirsResult} instance.
+ * <p>
+ * This operation is not transactional so it may succeed for some replicas while fail for others.
+ * <p>
+ * This is a convenience method for {@link #alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 1.1.0 or higher.
+ *
+ * @param replicaAssignment The replicas with their log directory absolute path
+ * @return The AlterReplicaLogDirsResult
+ * @throws InterruptedException Interrupted while joining I/O thread
+ */
+ default AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
+ return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
+ }
+
+ /**
+ * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
+ * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
+ * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
+ * log directory if it is not already there. For detailed result, inspect the returned {@link AlterReplicaLogDirsResult} instance.
+ * <p>
+ * This operation is not transactional so it may succeed for some replicas while fail for others.
+ * <p>
+ * This operation is supported by brokers with version 1.1.0 or higher.
+ *
+ * @param replicaAssignment The replicas with their log directory absolute path
+ * @param options The options to use when changing replica dir
+ * @return The AlterReplicaLogDirsResult
+ * @throws InterruptedException Interrupted while joining I/O thread
+ */
+ AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment,
+ AlterReplicaLogDirsOptions options);
+
+ /**
+ * Query the information of all log directories on the given set of brokers
+ * <p>
+ * This is a convenience method for {@link #describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 1.0.0 or higher.
+ *
+ * @param brokers A list of brokers
+ * @return The DescribeLogDirsResult
+ */
+ default DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
+ return describeLogDirs(brokers, new DescribeLogDirsOptions());
+ }
+
+ /**
+ * Query the information of all log directories on the given set of brokers
+ * <p>
+ * This operation is supported by brokers with version 1.0.0 or higher.
+ *
+ * @param brokers A list of brokers
+ * @param options The options to use when querying log dir info
+ * @return The DescribeLogDirsResult
+ */
+ DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
+
+ /**
+ * Query the replica log directory information for the specified replicas.
+ * <p>
+ * This is a convenience method for {@link #describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
+ * with default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 1.0.0 or higher.
+ *
+ * @param replicas The replicas to query
+ * @return The DescribeReplicaLogDirsResult
+ */
+ default DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
+ return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
+ }
+
+ /**
+ * Query the replica log directory information for the specified replicas.
+ * <p>
+ * This operation is supported by brokers with version 1.0.0 or higher.
+ *
+ * @param replicas The replicas to query
+ * @param options The options to use when querying replica log dir info
+ * @return The DescribeReplicaLogDirsResult
+ */
+ DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
+
+ /**
+ * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
+ * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
+ * the partition logic or ordering of the messages will be affected.</strong>
+ * <p>
+ * This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @param newPartitions The topics which should have new partitions created, and corresponding parameters
+ * for the created partitions.
+ * @return The CreatePartitionsResult.
+ */
+ default CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
+ return createPartitions(newPartitions, new CreatePartitionsOptions());
+ }
+
+ /**
+ * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
+ * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
+ * the partition logic or ordering of the messages will be affected.</strong>
+ * <p>
+ * This operation is not transactional so it may succeed for some topics while fail for others.
+ * <p>
+ * It may take several seconds after this method returns
+ * success for all the brokers to become aware that the partitions have been created.
+ * During this time, {@link #describeTopics(Collection)}
+ * may not return information about the new partitions.
+ * <p>
+ * This operation is supported by brokers with version 1.0.0 or higher.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link CreatePartitionsResult#values() values()} method of the returned {@link CreatePartitionsResult}
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.AuthorizationException}
+ * if the authenticated user is not authorized to alter the topic</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
+ * <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
+ * if a partition reassignment is currently in progress</li>
+ * <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
+ * if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
+ * if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
+ * replicas with the topics replication factor.</li>
+ * <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
+ * if the request is invalid in some way.</li>
+ * </ul>
+ *
+ * @param newPartitions The topics which should have new partitions created, and corresponding parameters
+ * for the created partitions.
+ * @param options The options to use when creating the new partitions.
+ * @return The CreatePartitionsResult.
+ */
+ CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
+ CreatePartitionsOptions options);
+
+ /**
+ * Delete records whose offset is smaller than the given offset of the corresponding partition.
+ * <p>
+ * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
+ * See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
+ * @return The DeleteRecordsResult.
+ */
+ default DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
+ return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
+ }
+
+ /**
+ * Delete records whose offset is smaller than the given offset of the corresponding partition.
+ * <p>
+ * This operation is supported by brokers with version 0.11.0.0 or higher.
+ *
+ * @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
+ * @param options The options to use when deleting records.
+ * @return The DeleteRecordsResult.
+ */
+ DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
+ DeleteRecordsOptions options);
+
+ /**
+ * Create a Delegation Token.
+ * <p>
+ * This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @return The CreateDelegationTokenResult.
+ */
+ default CreateDelegationTokenResult createDelegationToken() {
+ return createDelegationToken(new CreateDelegationTokenOptions());
+ }
+
+
+ /**
+ * Create a Delegation Token.
+ * <p>
+ * This operation is supported by brokers with version 1.1.0 or higher.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@link CreateDelegationTokenResult}
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+ * if the renewers principal type is not supported.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param options The options to use when creating delegation token.
+ * @return The DeleteRecordsResult.
+ */
+ CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
+
+
+ /**
+ * Renew a Delegation Token.
+ * <p>
+ * This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @param hmac HMAC of the Delegation token
+ * @return The RenewDelegationTokenResult.
+ */
+ default RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+ return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
+ }
+
+ /**
+ * Renew a Delegation Token.
+ * <p>
+ * This operation is supported by brokers with version 1.1.0 or higher.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@link RenewDelegationTokenResult}
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+ * if the delegation token is not found on server.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+ * if the authenticated user is not owner/renewer of the token.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+ * if the delegation token is expired.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param hmac HMAC of the Delegation token
+ * @param options The options to use when renewing delegation token.
+ * @return The RenewDelegationTokenResult.
+ */
+ RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
+
+ /**
+ * Expire a Delegation Token.
+ * <p>
+ * This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
+ * This will expire the token immediately. See the overload for more details.
+ *
+ * @param hmac HMAC of the Delegation token
+ * @return The ExpireDelegationTokenResult.
+ */
+ default ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
+ return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
+ }
+
+ /**
+ * Expire a Delegation Token.
+ * <p>
+ * This operation is supported by brokers with version 1.1.0 or higher.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@link ExpireDelegationTokenResult}
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+ * if the delegation token is not found on server.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+ * if the authenticated user is not owner/renewer of the requested token.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+ * if the delegation token is expired.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param hmac HMAC of the Delegation token
+ * @param options The options to use when expiring delegation token.
+ * @return The ExpireDelegationTokenResult.
+ */
+ ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
+
+ /**
+ * Describe the Delegation Tokens.
+ * <p>
+ * This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
+ * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.
+ *
+ * @return The DescribeDelegationTokenResult.
+ */
+ default DescribeDelegationTokenResult describeDelegationToken() {
+ return describeDelegationToken(new DescribeDelegationTokenOptions());
+ }
+
+ /**
+ * Describe the Delegation Tokens.
+ * <p>
+ * This operation is supported by brokers with version 1.1.0 or higher.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+ * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@link DescribeDelegationTokenResult}
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+ * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
+ * </ul>
+ *
+ * @param options The options to use when describing delegation tokens.
+ * @return The DescribeDelegationTokenResult.
+ */
+ DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
+
+ /**
+ * Describe some group IDs in the cluster.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param options The options to use when describing the groups.
+ * @return The DescribeConsumerGroupResult.
+ */
+ DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
+ DescribeConsumerGroupsOptions options);
+
+ /**
+ * Describe some group IDs in the cluster, with the default options.
+ * <p>
+ * This is a convenience method for {@link #describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}
+ * with default options. See the overload for more details.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @return The DescribeConsumerGroupResult.
+ */
+ default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
+ return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
+ }
+
+ /**
+ * List the consumer groups available in the cluster.
+ *
+ * @param options The options to use when listing the consumer groups.
+ * @return The ListGroupsResult.
+ */
+ ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
+
+ /**
+ * List the consumer groups available in the cluster with the default options.
+ * <p>
+ * This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @return The ListGroupsResult.
+ */
+ default ListConsumerGroupsResult listConsumerGroups() {
+ return listConsumerGroups(new ListConsumerGroupsOptions());
+ }
+
+ /**
+ * List the consumer group offsets available in the cluster.
+ *
+ * @param options The options to use when listing the consumer group offsets.
+ * @return The ListGroupOffsetsResult
+ */
+ ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+
+ /**
+ * List the consumer group offsets available in the cluster with the default options.
+ * <p>
+ * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+ *
+ * @return The ListGroupOffsetsResult.
+ */
+ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
+ return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
+ }
+
+ /**
+ * Delete consumer groups from the cluster.
+ *
+ * @param options The options to use when deleting a consumer group.
+ * @return The DeletConsumerGroupResult.
+ */
+ DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
+
+ /**
+ * Delete consumer groups from the cluster with the default options.
+ *
+ * @return The DeleteConsumerGroupResult.
+ */
+ default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
+ return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
+ }
+
+ /**
+ * Delete committed offsets for a set of partitions in a consumer group. This will
+ * succeed at the partition level only if the group is not actively subscribed
+ * to the corresponding topic.
+ *
+ * @param options The options to use when deleting offsets in a consumer group.
+ * @return The DeleteConsumerGroupOffsetsResult.
+ */
+ DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId,
+ Set<TopicPartition> partitions,
+ DeleteConsumerGroupOffsetsOptions options);
+
+ /**
+ * Delete committed offsets for a set of partitions in a consumer group with the default
+ * options. This will succeed at the partition level only if the group is not actively
+ * subscribed to the corresponding topic.
+ *
+ * @return The DeleteConsumerGroupOffsetsResult.
+ */
+ default DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions) {
+ return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
+ }
+
+ /**
+ * Elect the preferred replica as leader for topic partitions.
+ * <p>
+ * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+ * with preferred election type and default options.
+ * <p>
+ * This operation is supported by brokers with version 2.2.0 or higher.
+ *
+ * @param partitions The partitions for which the preferred leader should be elected.
+ * @return The ElectPreferredLeadersResult.
+ * @deprecated Since 2.4.0. Use {@link #electLeaders(ElectionType, Set)}.
+ */
+ @Deprecated
+ default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
+ return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions());
+ }
+
+ /**
+ * Elect the preferred replica as leader for topic partitions.
+ * <p>
+ * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+ * with preferred election type.
+ * <p>
+ * This operation is supported by brokers with version 2.2.0 or higher.
+ *
+ * @param partitions The partitions for which the preferred leader should be elected.
+ * @param options The options to use when electing the preferred leaders.
+ * @return The ElectPreferredLeadersResult.
+ * @deprecated Since 2.4.0. Use {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+ */
+ @Deprecated
+ default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
+ ElectPreferredLeadersOptions options) {
+ final ElectLeadersOptions newOptions = new ElectLeadersOptions();
+ newOptions.timeoutMs(options.timeoutMs());
+ final Set<TopicPartition> topicPartitions = partitions == null ? null : new HashSet<>(partitions);
+
+ return new ElectPreferredLeadersResult(electLeaders(ElectionType.PREFERRED, topicPartitions, newOptions));
+ }
+
+ /**
+ * Elect a replica as leader for topic partitions.
+ * <p>
+ * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+ * with default options.
+ *
+ * @param electionType The type of election to conduct.
+ * @param partitions The topics and partitions for which to conduct elections.
+ * @return The ElectLeadersResult.
+ */
+ default ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions) {
+ return electLeaders(electionType, partitions, new ElectLeadersOptions());
+ }
+
+ /**
+ * Elect a replica as leader for the given {@code partitions}, or for all partitions if the argument
+ * to {@code partitions} is null.
+ * <p>
+ * This operation is not transactional so it may succeed for some partitions while fail for others.
+ * <p>
+ * It may take several seconds after this method returns success for all the brokers in the cluster
+ * to become aware that the partitions have new leaders. During this time,
+ * {@link #describeTopics(Collection)} may not return information about the partitions'
+ * new leaders.
+ * <p>
+ * This operation is supported by brokers with version 2.2.0 or later if preferred election is use;
+ * otherwise the brokers most be 2.4.0 or higher.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the future obtained
+ * from the returned {@link ElectLeadersResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * if the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * if the topic or partition did not exist within the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
+ * if the topic was already queued for deletion.</li>
+ * <li>{@link org.apache.kafka.common.errors.NotControllerException}
+ * if the request was sent to a broker that was not the controller for the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request timed out before the election was complete.</li>
+ * <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
+ * if the preferred leader was not alive or not in the ISR.</li>
+ * </ul>
+ *
+ * @param electionType The type of election to conduct.
+ * @param partitions The topics and partitions for which to conduct elections.
+ * @param options The options to use when electing the leaders.
+ * @return The ElectLeadersResult.
+ */
+ ElectLeadersResult electLeaders(
+ ElectionType electionType,
+ Set<TopicPartition> partitions,
+ ElectLeadersOptions options);
+
+
+ /**
+ * Change the reassignments for one or more partitions.
+ * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+ *
+ * This is a convenience method for {@link #alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
+ * with default options. See the overload for more details.
+ */
+ default AlterPartitionReassignmentsResult alterPartitionReassignments(
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
+ return alterPartitionReassignments(reassignments, new AlterPartitionReassignmentsOptions());
+ }
+
+ /**
+ * Change the reassignments for one or more partitions.
+ * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@code AlterPartitionReassignmentsResult}:</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * If the topic or partition does not exist within the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request timed out before the controller could record the new assignments.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
+ * If the specified assignment was not valid.</li>
+ * <li>{@link org.apache.kafka.common.errors.NoReassignmentInProgressException}
+ * If there was an attempt to cancel a reassignment for a partition which was not being reassigned.</li>
+ * </ul>
+ *
+ * @param reassignments The reassignments to add, modify, or remove. See {@link NewPartitionReassignment}.
+ * @param options The options to use.
+ * @return The result.
+ */
+ AlterPartitionReassignmentsResult alterPartitionReassignments(
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+ AlterPartitionReassignmentsOptions options);
+
+
+ /**
+ * List all of the current partition reassignments
+ *
+ * This is a convenience method for {@link #listPartitionReassignments(ListPartitionReassignmentsOptions)}
+ * with default options. See the overload for more details.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments() {
+ return listPartitionReassignments(new ListPartitionReassignmentsOptions());
+ }
+
+ /**
+ * List the current reassignments for the given partitions
+ *
+ * This is a convenience method for {@link #listPartitionReassignments(Set, ListPartitionReassignmentsOptions)}
+ * with default options. See the overload for more details.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions) {
+ return listPartitionReassignments(partitions, new ListPartitionReassignmentsOptions());
+ }
+
+ /**
+ * List the current reassignments for the given partitions
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@code ListPartitionReassignmentsResult}:</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user doesn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * If a given topic or partition does not exist.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the controller could list the current reassignments.</li>
+ * </ul>
+ *
+ * @param partitions The topic partitions to list reassignments for.
+ * @param options The options to use.
+ * @return The result.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments(
+ Set<TopicPartition> partitions,
+ ListPartitionReassignmentsOptions options) {
+ return listPartitionReassignments(Optional.of(partitions), options);
+ }
+
+ /**
+ * List all of the current partition reassignments
+ *
+ * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@code ListPartitionReassignmentsResult}:</p>
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user doesn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+ * If a given topic or partition does not exist.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the controller could list the current reassignments.</li>
+ * </ul>
+ *
+ * @param options The options to use.
+ * @return The result.
+ */
+ default ListPartitionReassignmentsResult listPartitionReassignments(ListPartitionReassignmentsOptions options) {
+ return listPartitionReassignments(Optional.empty(), options);
+ }
+
+ /**
+ * @param partitions the partitions we want to get reassignment for, or an empty optional if we want to get the reassignments for all partitions in the cluster
+ * @param options The options to use.
+ * @return The result.
+ */
+ ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
+ ListPartitionReassignmentsOptions options);
+
+ /**
+ * Remove members from the consumer group by given member identities.
+ * <p>
+ * For possible error codes, refer to {@link LeaveGroupResponse}.
+ *
+ * @param groupId The ID of the group to remove member from.
+ * @param options The options to carry removing members' information.
+ * @return The MembershipChangeResult.
+ */
+ RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options);
+
+ /**
+ * <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
+ *
+ * <p>This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterConsumerGroupOffsetsOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @param groupId The group for which to alter offsets.
+ * @param offsets A map of offsets by partition with associated metadata.
+ * @return The AlterOffsetsResult.
+ */
+ default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
+ return alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions());
+ }
+
+ /**
+ * <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
+ *
+ * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
+ *
+ * @param groupId The group for which to alter offsets.
+ * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
+ * @param options The options to use when altering the offsets.
+ * @return The AlterOffsetsResult.
+ */
+ AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);
+
+ /**
+ * <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
+ * the beginning offset, end offset as well as the offset matching a timestamp in partitions.
+ *
+ * <p>This is a convenience method for {@link #listOffsets(Map, ListOffsetsOptions)}
+ *
+ * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
+ * @return The ListOffsetsResult.
+ */
+ default ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
+ return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
+ }
+
+ /**
+ * <p>List offset for the specified partitions. This operation enables to find
+ * the beginning offset, end offset as well as the offset matching a timestamp in partitions.
+ *
+ * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
+ * @param options The options to use when retrieving the offsets
+ * @return The ListOffsetsResult.
+ */
+ ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
+
+ /**
+ * Describes all entities matching the provided filter that have at least one client quota configuration
+ * value defined.
+ * <p>
+ * This is a convenience method for {@link #describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)}
+ * with default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param filter the filter to apply to match entities
+ * @return the DescribeClientQuotasResult containing the result
+ */
+ default DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter) {
+ return describeClientQuotas(filter, new DescribeClientQuotasOptions());
+ }
+
+ /**
+ * Describes all entities matching the provided filter that have at least one client quota configuration
+ * value defined.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the future from the
+ * returned {@link DescribeClientQuotasResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have describe access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+ * If the request details are invalid. e.g., an invalid entity type was specified.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe could finish.</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param filter the filter to apply to match entities
+ * @param options the options to use
+ * @return the DescribeClientQuotasResult containing the result
+ */
+ DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
+
+ /**
+ * Alters client quota configurations with the specified alterations.
+ * <p>
+ * This is a convenience method for {@link #alterClientQuotas(Collection, AlterClientQuotasOptions)}
+ * with default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param entries the alterations to perform
+ * @return the AlterClientQuotasResult containing the result
+ */
+ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries) {
+ return alterClientQuotas(entries, new AlterClientQuotasOptions());
+ }
+
+ /**
+ * Alters client quota configurations with the specified alterations.
+ * <p>
+ * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
+ * per-entity error code should be evaluated to resolve the success or failure of all updates.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@link AlterClientQuotasResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+ * If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
+ * succeed or not.</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param entries the alterations to perform
+ * @return the AlterClientQuotasResult containing the result
+ */
+ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
+
+ /**
+ * Describe all SASL/SCRAM credentials.
+ *
+ * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @return The DescribeUserScramCredentialsResult.
+ */
+ default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+ return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions());
+ }
+
+ /**
+ * Describe SASL/SCRAM credentials for the given users.
+ *
+ * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @param users the users for which credentials are to be described; all users' credentials are described if null
+ * or empty.
+ * @return The DescribeUserScramCredentialsResult.
+ */
+ default DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users) {
+ return describeUserScramCredentials(users, new DescribeUserScramCredentialsOptions());
+ }
+
+ /**
+ * Describe SASL/SCRAM credentials.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures from the
+ * returned {@link DescribeUserScramCredentialsResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have describe access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.ResourceNotFoundException}
+ * If the user did not exist/had no SCRAM credentials.</li>
+ * <li>{@link org.apache.kafka.common.errors.DuplicateResourceException}
+ * If the user was requested to be described more than once in the original request.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe operation could finish.</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.7.0 or higher.
+ *
+ * @param users the users for which credentials are to be described; all users' credentials are described if null
+ * or empty.
+ * @param options The options to use when describing the credentials
+ * @return The DescribeUserScramCredentialsResult.
+ */
+ DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users, DescribeUserScramCredentialsOptions options);
+
+ /**
+ * Alter SASL/SCRAM credentials for the given users.
+ *
+ * <p>This is a convenience method for {@link #alterUserScramCredentials(List, AlterUserScramCredentialsOptions)}
+ *
+ * @param alterations the alterations to be applied
+ * @return The AlterUserScramCredentialsResult.
+ */
+ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations) {
+ return alterUserScramCredentials(alterations, new AlterUserScramCredentialsOptions());
+ }
+
+ /**
+ * Alter SASL/SCRAM credentials.
+ *
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} any of the futures from the
+ * returned {@link AlterUserScramCredentialsResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.NotControllerException}
+ * If the request is not sent to the Controller broker.</li>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the user authenticated with a delegation token.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnsupportedSaslMechanismException}
+ * If the requested SCRAM mechanism is unrecognized or otherwise unsupported.</li>
+ * <li>{@link org.apache.kafka.common.errors.UnacceptableCredentialException}
+ * If the username is empty or the requested number of iterations is too small or too large.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe could finish.</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.7.0 or higher.
+ *
+ * @param alterations the alterations to be applied
+ * @param options The options to use when altering the credentials
+ * @return The AlterUserScramCredentialsResult.
+ */
+ AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
+ AlterUserScramCredentialsOptions options);
+ /**
+ * Describes finalized as well as supported features.
+ * <p>
+ * This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @return the {@link DescribeFeaturesResult} containing the result
+ */
+ default DescribeFeaturesResult describeFeatures() {
+ return describeFeatures(new DescribeFeaturesOptions());
+ }
+
+ /**
+ * Describes finalized as well as supported features. By default, the request is issued to any
+ * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly consistent reads of
+ * finalized features.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe operation could finish.</li>
+ * </ul>
+ * <p>
+ *
+ * @param options the options to use
+ * @return the {@link DescribeFeaturesResult} containing the result
+ */
+ DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+ /**
+ * Applies specified updates to finalized features. This operation is not transactional so some
+ * updates may succeed while the rest may fail.
+ * <p>
+ * The API takes in a map of finalized feature names to {@link FeatureUpdate} that needs to be
+ * applied. Each entry in the map specifies the finalized feature to be added or updated or
+ * deleted, along with the new max feature version level value. This request is issued only to
+ * the controller since the API is only served by the controller. The return value contains an
+ * error code for each supplied {@link FeatureUpdate}, and the code indicates if the update
+ * succeeded or failed in the controller.
+ * <ul>
+ * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+ * in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set. Setting this
+ * flag conveys user intent to attempt downgrade of a feature max version level. Note that
+ * despite the allowDowngrade flag being set, certain downgrades may be rejected by the
+ * controller if it is deemed impossible.</li>
+ * <li>Deletion of a finalized feature version is not a regular operation/intent. It could be
+ * done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting
+ * the max version level to a value less than 1.</li>
+ * </ul>
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures
+ * obtained from the returned {@link UpdateFeaturesResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+ * If the request details are invalid. e.g., a non-existing finalized feature is attempted
+ * to be deleted or downgraded.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the updates could finish. It cannot be guaranteed whether
+ * the updates succeeded or not.</li>
+ * <li>{@link FeatureUpdateFailedException}
+ * This means there was an unexpected error encountered when the update was applied on
+ * the controller. There is no guarantee on whether the update succeeded or failed. The best
+ * way to find out is to issue a {@link Admin#describeFeatures(DescribeFeaturesOptions)}
+ * request to the controller to get the latest features.</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.7.0 or higher.
+
+ * @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
+ * @param options the options to use
+ * @return the {@link UpdateFeaturesResult} containing the result
+ */
+ UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
+
+ /**
+ * Get the metrics kept by the adminClient
+ */
+ Map<MetricName, ? extends Metric> metrics();
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
new file mode 100644
index 0000000..8196a39
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Mostly noop implementation of AdminClient
+ */
+@Slf4j
+public class PulsarKafkaAdminClient implements Admin {
+
+ private final Properties properties;
+ private final PulsarAdmin admin;
+
+ private final ConcurrentHashMap<String, Boolean> partitionedTopics = new ConcurrentHashMap<>();
+
+ private PulsarKafkaAdminClient(AdminClientConfig config) {
+ properties = new Properties();
+ log.info("config originals: {}", config.originals());
+ config.originals().forEach((k, v) -> {
+ log.info("Setting k = {} v = {}", k, v);
+ // properties do not allow null values
+ if (k != null && v != null) {
+ properties.put(k, v);
+ }
+ });
+
+ PulsarAdminBuilder builder = PulsarClientKafkaConfig.getAdminBuilder(properties);
+ try {
+ admin = builder.build();
+ } catch (PulsarClientException e) {
+ log.error("Could not create Pulsar Admin", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ static PulsarKafkaAdminClient createInternal(AdminClientConfig config) {
+ return new PulsarKafkaAdminClient(config);
+ }
+
+ public static Admin create(Properties props) {
+ return PulsarKafkaAdminClient.createInternal(new AdminClientConfig(props, true));
+ }
+
+ public static Admin create(Map<String, Object> conf) {
+ return PulsarKafkaAdminClient.createInternal(new AdminClientConfig(conf, true));
+ }
+
+ private boolean isPartitionedTopic(String topic) {
+ Boolean res = partitionedTopics.computeIfAbsent(topic, t -> {
+ try {
+ int numPartitions = admin.topics()
+ .getPartitionedTopicMetadata(t)
+ .partitions;
+ return numPartitions > 0;
+ } catch (PulsarAdminException e) {
+ log.error("getPartitionedTopicMetadata failed", e);
+ return null;
+ }
+ });
+ if (res == null) {
+ throw new RuntimeException("Could not get topic metadata");
+ }
+ return res;
+ }
+
+ public <K, V> Map<K, KafkaFutureImpl<V>> execute(Collection<K> param,
+ java.util.function.BiConsumer<K, KafkaFutureImpl<V>> func) {
+ // preparing topics list for asking metadata about them
+ final Map<K, KafkaFutureImpl<V>> futures
+ = new HashMap<>();
+ for (K value : param) {
+ KafkaFutureImpl<V> future = new KafkaFutureImpl<>();
+ futures.put(value, future);
+ func.accept(value, future);
+ }
+ return futures;
+ }
+
+ public <K, K2, V> Map<K, KafkaFutureImpl<V>> execute(Map<K, K2> param,
+ java.util.function.BiConsumer<Map.Entry<K, K2>, KafkaFutureImpl<V>> func) {
+ // preparing topics list for asking metadata about them
+ final Map<K, KafkaFutureImpl<V>> futures
+ = new HashMap<>(param.size());
+ for (Map.Entry<K, K2> value : param.entrySet()) {
+ KafkaFutureImpl<V> future = new KafkaFutureImpl<>();
+ futures.put(value.getKey(), future);
+ func.accept(value, future);
+ }
+ return futures;
+ }
+
+ @Override
+ public void close(Duration duration) {
+ admin.close();
+ }
+
+ @Override
+ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
+ ListOffsetsOptions listOffsetsOptions) {
+ final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures =
+ execute(topicPartitionOffsets, (entry, future) -> {
+ TopicPartition topicPartition = entry.getKey();
+ String topicName = isPartitionedTopic(topicPartition.topic())
+ ? topicPartition.topic() + TopicName.PARTITIONED_TOPIC_SUFFIX + topicPartition.partition()
+ : topicPartition.topic();
+ admin.topics()
+ .getLastMessageIdAsync(topicName)
+ .whenComplete((msgId, ex) -> {
+ if (ex == null) {
+ long offset = FunctionCommon.getSequenceId(msgId);
+ future.complete(new ListOffsetsResult.ListOffsetsResultInfo(
+ offset,
+ System.currentTimeMillis(),
+ Optional.empty()));
+ } else {
+ log.error("Admin failed to get lastMessageId for topic " + topicName, ex);
+ future.completeExceptionally(ex);
+ }
+ });
+ });
+ return new ListOffsetsResult(new HashMap<>(futures));
+ }
+
+ @Override
+ public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> map, DeleteRecordsOptions deleteRecordsOptions) {
+ final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures =
+ execute(map, (entry, future) -> {
+ // nothing to do, cannot delete messages before offset, let pulsar expire stuff
+ future.complete(new DeletedRecords(entry.getValue().beforeOffset()));
+ });
+
+ return new DeleteRecordsResult(new HashMap<>(futures));
+ }
+
+ @Override
+ public CreateTopicsResult createTopics(Collection<NewTopic> collection, CreateTopicsOptions createTopicsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DeleteTopicsResult deleteTopics(Collection<String> collection, DeleteTopicsOptions deleteTopicsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public ListTopicsResult listTopics(ListTopicsOptions listTopicsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeTopicsResult describeTopics(Collection<String> collection, DescribeTopicsOptions describeTopicsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeClusterResult describeCluster(DescribeClusterOptions describeClusterOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeAclsResult describeAcls(AclBindingFilter aclBindingFilter, DescribeAclsOptions describeAclsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public CreateAclsResult createAcls(Collection<AclBinding> collection, CreateAclsOptions createAclsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> collection, DeleteAclsOptions deleteAclsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeConfigsResult describeConfigs(Collection<ConfigResource> collection, DescribeConfigsOptions describeConfigsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> map, AlterConfigsOptions alterConfigsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> map, AlterConfigsOptions alterConfigsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> map, AlterReplicaLogDirsOptions alterReplicaLogDirsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeLogDirsResult describeLogDirs(Collection<Integer> collection, DescribeLogDirsOptions describeLogDirsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> collection, DescribeReplicaLogDirsOptions describeReplicaLogDirsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public CreatePartitionsResult createPartitions(Map<String, NewPartitions> map, CreatePartitionsOptions createPartitionsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions createDelegationTokenOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public RenewDelegationTokenResult renewDelegationToken(byte[] bytes, RenewDelegationTokenOptions renewDelegationTokenOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public ExpireDelegationTokenResult expireDelegationToken(byte[] bytes, ExpireDelegationTokenOptions expireDelegationTokenOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions describeDelegationTokenOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> collection, DescribeConsumerGroupsOptions describeConsumerGroupsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions listConsumerGroupsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String s, ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> collection, DeleteConsumerGroupsOptions deleteConsumerGroupsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String s, Set<TopicPartition> set, DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> set, ElectLeadersOptions electLeadersOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public AlterPartitionReassignmentsResult alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> map, AlterPartitionReassignmentsOptions alterPartitionReassignmentsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> optional, ListPartitionReassignmentsOptions listPartitionReassignmentsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String s, RemoveMembersFromConsumerGroupOptions removeMembersFromConsumerGroupOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String s, Map<TopicPartition, OffsetAndMetadata> map, AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter clientQuotaFilter, DescribeClientQuotasOptions describeClientQuotasOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> collection, AlterClientQuotasOptions alterClientQuotasOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> list, DescribeUserScramCredentialsOptions describeUserScramCredentialsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> list, AlterUserScramCredentialsOptions alterUserScramCredentialsOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions describeFeaturesOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> map, UpdateFeaturesOptions updateFeaturesOptions) {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+
+ @Override
+ public Map<MetricName, ? extends Metric> metrics() {
+ throw new UnsupportedOperationException("Operation is not supported");
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
new file mode 100644
index 0000000..81f7c49
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import com.google.api.client.util.Maps;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Mostly noop implementation of coordinator to include minimal functionality that
+ * Kafka Streams expect to call all the callbacks.
+ * Unlike in Kafka Streams, it does not actually coordinate anything,
+ * named to keep lose reference to where that functionality is in KS.
+ */
+@Slf4j
+public class PulsarConsumerCoordinator {
+
+ // Used to encode "task assignment info" to force
+ // kafka streams to create new tasks after subscription
+ private static class AssignmentInfo {
+ // v.2 is the first to support partitions per host
+ // the rest is not applicable/useful but requires more placeholder data encoded
+ private final int usedVersion = 2;
+ private final List<TopicPartition> partitions;
+
+ public AssignmentInfo(final List<TopicPartition> partitions) {
+ this.partitions = partitions;
+ }
+
+ @SneakyThrows
+ public ByteBuffer encode() {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (final DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeInt(usedVersion); // version
+ encodeActiveAndStandbyTaskAssignment(out, partitions);
+ encodePartitionsByHost(out);
+
+ out.flush();
+ out.close();
+
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+ }
+
+ private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out,
+ final List<TopicPartition> partitions) throws IOException {
+
+ int lastId = 0;
+ final Map<String, Integer> topicGroupIds = new HashMap<>();
+ // encode active tasks
+ // the number of assigned partitions must be the same as number of active tasks
+ out.writeInt(partitions.size());
+ for (TopicPartition p : partitions) {
+ final int topicGroupId;
+ if (topicGroupIds.containsKey(p.topic())) {
+ topicGroupId = topicGroupIds.get(p.topic());
+ } else {
+ topicGroupId = lastId;
+ lastId++;
+ topicGroupIds.put(p.topic(), topicGroupId);
+ }
+ out.writeInt(topicGroupId);
+ out.writeInt(p.partition());
+ }
+
+ // encode standby tasks
+ out.writeInt(0);
+ }
+
+ private void encodePartitionsByHost(final DataOutputStream out) throws IOException {
+ // encode partitions by host
+ out.writeInt(1);
+ writeHostInfo(out, "fakeHost", 9999);
+ writeTopicPartitions(out, partitions);
+ }
+
+ private void writeHostInfo(final DataOutputStream out, String host, int port) throws IOException {
+ out.writeUTF(host);
+ out.writeInt(port);
+ }
+
+ private void writeTopicPartitions(final DataOutputStream out,
+ final List<TopicPartition> partitions) throws IOException {
+ out.writeInt(partitions.size());
+ for (final TopicPartition partition : partitions) {
+ out.writeUTF(partition.topic());
+ out.writeInt(partition.partition());
+ }
+ }
+ }
+
+ public static void invokePartitionsAssigned(final String groupId, final ConsumerConfig config, final List<TopicPartition> assignedPartitions) {
+ final List<ConsumerPartitionAssignor> assignors = PartitionAssignorAdapter.getAssignorInstances(
+ config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+ config.originals());
+
+ // Give the assignor a chance to update internal state based on the received assignment
+ ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId);
+
+ ByteBuffer bbInfo = new AssignmentInfo(assignedPartitions).encode();
+ ConsumerPartitionAssignor.Assignment assignment =
+ new ConsumerPartitionAssignor.Assignment(assignedPartitions, bbInfo);
+
+ for (ConsumerPartitionAssignor assignor : assignors) {
+ assignor.onAssignment(assignment, groupMetadata);
+ }
+ }
+
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 06b1c6e..9ef4e96 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -18,28 +18,8 @@
*/
package org.apache.kafka.clients.consumer;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -48,25 +28,50 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+import java.time.Duration;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
@Slf4j
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
@@ -96,6 +101,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Properties properties;
+ private final ConsumerConfig consumerCfg;
+
private static class QueueItem {
final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
final Message<byte[]> message;
@@ -140,7 +147,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
@SuppressWarnings("unchecked")
private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
-
+ this.consumerCfg = consumerConfig;
if (keySchema == null) {
Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
@@ -161,7 +168,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
- groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ // kafka removes group id for the restore consumer but adds client id
+ groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG) == null
+ ? consumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG)
+ : consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ Preconditions.checkNotNull(groupId, "groupId cannot be null");
isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
log.info("Offset reset strategy has been assigned value {}", strategy);
@@ -179,7 +190,14 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
this.properties = new Properties();
- consumerConfig.originals().forEach((k, v) -> properties.put(k, v));
+ log.info("config originals: {}", consumerConfig.originals());
+ consumerConfig.originals().forEach((k, v) -> {
+ log.info("Setting k = {} v = {}", k, v);
+ // properties do not allow null values
+ if (k != null && v != null) {
+ properties.put(k, v);
+ }
+ });
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
@@ -217,7 +235,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
@Override
public Set<TopicPartition> assignment() {
- throw new UnsupportedOperationException("Cannot access the partitions assignements");
+ return consumers.keySet();
}
/**
@@ -260,7 +278,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
- TopicName.get(topic).getPartitionedTopicName(),
+ TopicName.get(topic).getLocalName(),
partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
@@ -274,7 +292,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = new TopicPartition(
- TopicName.get(topic).getPartitionedTopicName(),
+ TopicName.get(topic).getLocalName(),
0);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
@@ -288,6 +306,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
// Wait for all consumers to be ready
futures.forEach(CompletableFuture::join);
+ PulsarConsumerCoordinator.invokePartitionsAssigned(groupId, consumerCfg, Lists.newArrayList(consumers.keySet()));
// Notify the listener is now owning all topics/partitions
if (callback != null) {
@@ -310,7 +329,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
@Override
public void assign(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException("Cannot manually assign partitions");
+ Set<String> topics = partitions.stream().map(p -> p.topic()).collect(Collectors.toSet());
+ subscribe(topics);
+ // not throwing exception to let KafkaStreams use the consumer.
+ //log.error("Tried to assign partitions {}. The operation is not supported", partitions,
+ // new UnsupportedOperationException("not supported"));
}
@Override
@@ -349,7 +372,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
- String topic = topicName.getPartitionedTopicName();
+ String topic = topicName.getLocalName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
@@ -641,6 +664,19 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
+ return partitions.stream()
+ .map(x -> new AbstractMap.SimpleEntry<>(x, committed(x)))
+ .filter(entry -> entry.getValue() != null)
+ .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration duration) {
+ return committed(partitions);
+ }
+
+ @Override
public Map<MetricName, ? extends Metric> metrics() {
throw new UnsupportedOperationException();
}
@@ -665,19 +701,32 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
throw new UnsupportedOperationException();
}
+ // there is no isPaused() in pulsar consumer
+ // This works well enough because KafkaStreams
+ // seem to resume all paused consumers
@Override
public Set<TopicPartition> paused() {
- throw new UnsupportedOperationException();
+ return consumers.keySet();
}
@Override
public void pause(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
+ partitions.forEach(p -> {
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+ if (c != null) {
+ c.pause();
+ }
+ });
}
@Override
public void resume(Collection<TopicPartition> partitions) {
- throw new UnsupportedOperationException();
+ partitions.forEach(p -> {
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+ if (c != null) {
+ c.resume();
+ }
+ });
}
@Override
@@ -711,6 +760,16 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public ConsumerGroupMetadata groupMetadata() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void enforceRebalance() {
+ log.info("enforceRebalance() is called but ignored");
+ }
+
+ @Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 13f97b5..f6da38e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
@@ -193,6 +194,11 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
@Override
+ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void commitTransaction() throws ProducerFencedException {
throw new UnsupportedOperationException();
}
@@ -366,7 +372,15 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
TopicPartition tp = new TopicPartition(topic, partition);
TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
- return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
+
+ long publishTime = 0L;
+ try {
+ // there is no hasPublishTime() currently
+ publishTime = mb.getPublishTime();
+ } catch (IllegalStateException ise) {
+ logger.debug("could not get publish time");
+ }
+ return new RecordMetadata(tp, offset, 0L, publishTime, 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
}
private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index 8ba5ede..eebd95b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -22,6 +22,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
@@ -121,4 +125,37 @@ public class PulsarClientKafkaConfig {
return clientBuilder;
}
+
+ @SneakyThrows
+ public static PulsarAdminBuilder getAdminBuilder(Properties properties) {
+ String serviceUrl = properties.getProperty("pulsar.admin.url");
+
+ String authPluginClassName = properties.getProperty("authPlugin");
+ String authParams = properties.getProperty("authParams");
+ boolean tlsAllowInsecureConnection =
+ Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false"));
+
+ boolean tlsEnableHostnameVerification =
+ Boolean.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
+ final String tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
+
+ boolean useKeyStoreTls = Boolean
+ .parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
+ String tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
+ String tlsTrustStorePath = properties.getProperty("tlsTrustStorePath");
+ String tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword");
+
+ PulsarAdminBuilder adminBuilder = PulsarAdmin.builder()
+ .serviceHttpUrl(serviceUrl)
+ .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+ .enableTlsHostnameVerification(tlsEnableHostnameVerification)
+ .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+ .useKeyStoreTls(useKeyStoreTls)
+ .tlsTrustStoreType(tlsTrustStoreType)
+ .tlsTrustStorePath(tlsTrustStorePath)
+ .tlsTrustStorePassword(tlsTrustStorePassword)
+ .authentication(authPluginClassName, authParams);
+
+ return adminBuilder;
+ }
}