You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/11 08:30:58 UTC

[pulsar-adapters] branch master updated: Feature: Run Kafka streams app with Pulsar (#10)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f72a4  Feature: Run Kafka streams app with Pulsar (#10)
e3f72a4 is described below

commit e3f72a4e6735219492de69ef98e76d16e8b73e0c
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Sun Apr 11 01:30:50 2021 -0700

    Feature: Run Kafka streams app with Pulsar (#10)
---
 examples/kafka-streams/pom.xml                     |  130 ++
 .../kafkastreams/pulsar/example/LineSplit.java     |  102 ++
 .../kafka/kafkastreams/pulsar/example/README.md    |  136 ++
 .../kafkastreams/pulsar/example/WordCount.java     |   96 ++
 examples/pom.xml                                   |    1 +
 pom.xml                                            |    7 +-
 .../pulsar-client-kafka-shaded/pom.xml             |   42 +-
 .../pulsar-client-kafka/pom.xml                    |   15 +
 .../apache/kafka/clients/admin/PulsarAdmin.java    | 1399 ++++++++++++++++++++
 .../clients/admin/PulsarKafkaAdminClient.java      |  362 +++++
 .../consumer/PulsarConsumerCoordinator.java        |  137 ++
 .../clients/consumer/PulsarKafkaConsumer.java      |  137 +-
 .../clients/producer/PulsarKafkaProducer.java      |   16 +-
 .../kafka/compat/PulsarClientKafkaConfig.java      |   37 +
 14 files changed, 2569 insertions(+), 48 deletions(-)

diff --git a/examples/kafka-streams/pom.xml b/examples/kafka-streams/pom.xml
new file mode 100644
index 0000000..b90bc0a
--- /dev/null
+++ b/examples/kafka-streams/pom.xml
@@ -0,0 +1,130 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.pulsar.examples</groupId>
+        <artifactId>pulsar-adapters-examples</artifactId>
+        <version>2.8.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>kafka-streams</artifactId>
+    <name>Pulsar Examples :: Kafka Streams</name>
+
+    <properties>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <maven.compiler.source>1.8</maven.compiler.source>
+    </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-kafka</artifactId>
+      <version>2.8.0-SNAPSHOT</version>
+    </dependency>
+
+    <!-- Apache Kafka dependencies -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-streams</artifactId>
+      <version>${kafka-client.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+        </exclusion>
+      </exclusions>
+
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-epoll</artifactId>
+      <classifier>linux-x86_64</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-unix-common</artifactId>
+      <classifier>linux-x86_64</classifier>
+    </dependency>
+
+  </dependencies>
+
+    <build>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-shade-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>pulsar-kafka-streams-examples</id>
+              <phase>package</phase>
+              <goals>
+                <goal>shade</goal>
+              </goals>
+              <configuration>
+                <shadedArtifactAttached>false</shadedArtifactAttached>
+                <createDependencyReducedPom>false</createDependencyReducedPom>
+                <transformers>
+                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                    <mainClass>org.apache.spark.streaming.receiver.example.SparkStreamingPulsarReceiverExample</mainClass>
+                  </transformer>
+                </transformers>
+                <finalName>pulsar-kafka-streams-examples</finalName>
+                <artifactSet>
+                  <includes>
+                    <include>com.google.guava:guava</include>
+                    <include>io.netty:netty-codec-http</include>
+                    <include>io.netty:netty-transport-native-epoll</include>
+                    <include>io.netty:netty</include>
+                    <include>io.netty:netty-all</include>
+                  </includes>
+                </artifactSet>
+                <relocations>
+                  <relocation>
+                    <pattern>com.google</pattern>
+                    <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+                  </relocation>
+                  <relocation>
+                    <pattern>io.netty</pattern>
+                    <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+                  </relocation>
+                </relocations>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+        </plugins>
+
+    </build>
+
+</project>
diff --git a/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/LineSplit.java b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/LineSplit.java
new file mode 100644
index 0000000..97c80f9
--- /dev/null
+++ b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/LineSplit.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.kafkastreams.pulsar.example;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * In this example, we implement a simple LineSplit program using the high-level Streams DSL
+ * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text;
+ * the code split each text line in string into words and then write back into a sink topic "streams-linesplit-output" where
+ * each record represents a single word.
+ */
+public class LineSplit {
+
+    /*
+    to run:
+    mvn clean package
+    mvn exec:java -Dexec.mainClass=org.apache.kafka.kafkastreams.pulsar.example.LineSplit
+
+    it need running pulsar (standalone)
+
+    topic to read from:
+    bin/pulsar-client consume streams-linesplit-output -s test -p Earliest -n 0
+
+    topic to produce to:
+    bin/pulsar-client produce streams-plaintext-input --messages "I sent a few words"
+    */
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
+
+        // kafka would do something like
+        // props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+        props.put("pulsar.admin.url", "http://localhost:8080");
+
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        props.put("key.deserializer", StringSerializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        props.put("group.id", "linesplit-subscription-name");
+        props.put("enable.auto.commit", "true");
+
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream("streams-plaintext-input")
+                .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
+                .to("streams-linesplit-output");
+
+        final Topology topology = builder.build();
+
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    }
+}
diff --git a/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/README.md b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/README.md
new file mode 100644
index 0000000..c44c7e1
--- /dev/null
+++ b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/README.md
@@ -0,0 +1,136 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+## Apache Kafka Streams for Pulsar
+
+This page describes how to use the [Kafka Streams](https://kafka.apache.org/27/documentation/streams/) with Pulsar topics.
+
+## Example
+
+### LineSplit
+
+This Kafka Streams job is consuming from a Pulsar topic, splitting lines into words in a streaming fashion. 
+The job writes the words to another Pulsar topic.
+
+The steps to run the example:
+
+1. Start Pulsar Standalone.
+
+   You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
+
+    ```shell
+    $ bin/pulsar standalone
+    ```
+
+2. Create topics to use later. The easiest way to do so is to produce a message to the topic:
+
+   ```shell
+   $ bin/pulsar-client produce streams-plaintext-input --messages "tada"
+   $ bin/pulsar-client produce streams-linesplit-output --messages "tada"
+   ```
+
+3. Build the examples.
+
+    ```shell
+    $ cd ${PULSAR_ADAPTORS_HOME}/examples/kafka-streams
+    $ mvn clean package
+    ```
+
+4. Run the example.
+
+    ```shell
+    $ mvn exec:java -Dexec.mainClass=org.apache.kafka.kafkastreams.pulsar.example.LineSplit
+    ```
+
+5. Produce messages to topic `streams-plaintext-input`.
+
+    ```shell
+    $ bin/pulsar-client produce streams-plaintext-input --messages "I produced a few words"
+    ```
+
+6. You can check that consumer sees the words in output topic `streams-linesplit-output`, e.g.:
+
+    ```shell
+    $ bin/pulsar-client consume streams-linesplit-output -s test -p Earliest -n 0  
+    ```
+   outputs something like
+   ```text
+   ----- got message -----
+   key:[null], properties:[pulsar.partition.id=0], content:I
+   ----- got message -----
+   key:[null], properties:[pulsar.partition.id=0], content:produced
+   ----- got message -----
+   key:[null], properties:[pulsar.partition.id=0], content:a
+   ----- got message -----
+   key:[null], properties:[pulsar.partition.id=0], content:few
+   ----- got message -----
+   key:[null], properties:[pulsar.partition.id=0], content:words
+   ```
+
+### WordCount
+
+This Kafka Streams job is consuming from a Pulsar topic, splitting lines into words in a streaming fashion.
+The job writes the words and counts to another Pulsar topic.
+
+The steps to run the example:
+
+1. Start Pulsar Standalone.
+
+   You can follow the [instructions](https://pulsar.apache.org/docs/en/standalone/) to start a Pulsar standalone locally.
+
+    ```shell
+    $ bin/pulsar standalone
+    ```
+
+2. Build the examples.
+
+    ```shell
+    $ cd ${PULSAR_ADAPTORS_HOME}/examples/kafka-streams
+    $ mvn clean package
+    ```
+
+3. Run the example.
+
+    ```shell
+    $ mvn exec:java -Dexec.mainClass=org.apache.kafka.kafkastreams.pulsar.example.WordCount
+    ```
+
+6. Produce messages to topic `streams-plaintext-input`.
+
+    ```shell
+    $ bin/pulsar-client produce streams-plaintext-input --messages "a a a a b c"
+    ```
+
+6. You can check that consumer sees the words in output topic `streams-wordcount-output`, e.g.:
+
+    ```shell
+    $ bin/pulsar-client consume streams-wordcount-output -s test -p Earliest -n 0  
+    ```
+   outputs something like
+   ```text
+   ----- got message -----
+   key:[YQ==], properties:[pulsar.partition.id=0], content:
+   ----- got message -----
+   key:[Yg==], properties:[pulsar.partition.id=0], content:
+   ----- got message -----
+   key:[Yw==], properties:[pulsar.partition.id=0], content:
+   ```
+   (not sure how to force it to decode String key and Long value from byte[]/byte[] topic)
diff --git a/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/WordCount.java b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/WordCount.java
new file mode 100644
index 0000000..55d5e49
--- /dev/null
+++ b/examples/kafka-streams/src/main/java/org/apache/kafka/kafkastreams/pulsar/example/WordCount.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.kafkastreams.pulsar.example;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * In this example, we implement a simple WordCount program using the high-level Streams DSL
+ * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
+ * split each text line into words and then compute the word occurence histogram, write the continuous updated histogram
+ * into a topic "streams-wordcount-output" where each record is an updated count of a single word.
+ */
+public class WordCount {
+
+    public static void main(String[] args) throws Exception {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
+
+        // kafka would do something like
+        // props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+        props.put("pulsar.admin.url", "http://localhost:8080");
+
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+        props.put("key.deserializer", StringSerializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        props.put("group.id", "streams-wordcount-subscription-name");
+        props.put("enable.auto.commit", "true");
+
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream("streams-plaintext-input")
+                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+                .groupBy((key, value) -> value)
+                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
+                .toStream()
+                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
+
+        final Topology topology = builder.build();
+        final KafkaStreams streams = new KafkaStreams(topology, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // attach shutdown handler to catch control-c
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
+            @Override
+            public void run() {
+                streams.close();
+                latch.countDown();
+            }
+        });
+
+        try {
+            streams.start();
+            latch.await();
+        } catch (Throwable e) {
+            System.exit(1);
+        }
+        System.exit(0);
+    }
+}
diff --git a/examples/pom.xml b/examples/pom.xml
index cfe3a4c..55382c4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -35,6 +35,7 @@
   <modules>
     <module>flink</module>
     <module>spark</module>
+    <module>kafka-streams</module>
   </modules>
 
 </project>
diff --git a/pom.xml b/pom.xml
index 0c5ace5..df4d9fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
     <pulsar.version>2.8.0-SNAPSHOT</pulsar.version>
     <flink.version>1.7.2</flink.version>
     <storm.version>2.0.0</storm.version>
-    <kafka-client.version>2.3.0</kafka-client.version>
+    <kafka-client.version>2.7.0</kafka-client.version>
     <kafka_0_8.version>0.8.1.1</kafka_0_8.version>
 
     <maven.compiler.source>1.8</maven.compiler.source>
@@ -319,6 +319,11 @@
         <type>test-jar</type>
         <version>${pulsar.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-functions-utils</artifactId>
+        <version>${pulsar.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
index d2cd4a3..4d847ca 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
@@ -72,14 +72,31 @@
                   <include>commons-codec:commons-codec</include>
                   <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                   <include>commons-collections:commons-collections</include>
-                  <include>org.asynchttpclient:*</include>
+                  <!--
+                    Do not shade AHC
+                    <include>org.asynchttpclient:*</include>
+
+                    it results in stuff like
+                    java.lang.NumberFormatException: null
+                    at java.lang.Integer.parseInt (Integer.java:614)
+                    at java.lang.Integer.parseInt (Integer.java:770)
+                    at org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigHelper$Config.getInt (AsyncHttpClientConfigHelper.java:85)
+                    at org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultMaxRedirects (AsyncHttpClientConfigDefaults.java:138)
+                    at org.apache.pulsar.shade.org.asynchttpclient.DefaultAsyncHttpClientConfig$Builder.<init> (DefaultAsyncHttpClientConfig.java:702)
+                    at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.<init> (AsyncHttpConnector.java:97)
+                    at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider.getConnector (AsyncHttpConnectorProvider.java:52)
+                    at org.apache.pulsar.client.admin.internal.PulsarAdminImpl.<init> (PulsarAdminImpl.java:196)
+                    at org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl.build (PulsarAdminBuilderImpl.java:47)
+
+                    similar to https://github.com/playframework/play-ws/issues/229
+                    -->
                   <include>io.netty:netty-codec-http</include>
                   <include>io.netty:netty-transport-native-epoll</include>
                   <include>org.reactivestreams:reactive-streams</include>
                   <include>com.typesafe.netty:netty-reactive-streams</include>
                   <include>org.javassist:javassist</include>
                   <include>com.google.protobuf:protobuf-java</include>
-                  <include>com.google.guava:guava</include>
+                  <include>com.google.guava:*</include>
                   <include>com.google.code.gson:gson</include>
                   <include>com.fasterxml.jackson.core</include>
                   <include>com.fasterxml.jackson.module</include>
@@ -129,14 +146,25 @@
                   <pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
                   <shadedPattern>org.apache.kafka.clients.consumer.KafkaConsumer</shadedPattern>
                 </relocation>
-
-                <!-- General relocation rules for Pulsar client dependencies -->
-
                 <relocation>
-                  <pattern>org.asynchttpclient</pattern>
-                  <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+                  <pattern>org.apache.kafka.clients.admin.KafkaAdminClient</pattern>
+                  <shadedPattern>org.apache.kafka.clients.admin.OriginalKafkaAdminClient</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.admin.PulsarKafkaAdminClient</pattern>
+                  <shadedPattern>org.apache.kafka.clients.admin.KafkaAdminClient</shadedPattern>
                 </relocation>
                 <relocation>
+                  <pattern>org.apache.kafka.clients.admin.Admin.class</pattern>
+                  <shadedPattern>org.apache.kafka.clients.admin.OriginalAdmin.class</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.admin.PulsarAdmin</pattern>
+                  <shadedPattern>org.apache.kafka.clients.admin.Admin</shadedPattern>
+                </relocation>
+
+                <!-- General relocation rules for Pulsar client dependencies -->
+                <relocation>
                   <pattern>org.apache.commons</pattern>
                   <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
                 </relocation>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 0adfa0d..ba33c14 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -43,11 +43,26 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-admin</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-utils</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>${kafka-client.version}</version>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarAdmin.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarAdmin.java
new file mode 100644
index 0000000..e899bfc
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarAdmin.java
@@ -0,0 +1,1399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.FeatureUpdateFailedException;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
+
+/**
+ * A copy of kafka's admin interface (from v.2.7) to shade and replace.
+ * The only point of doing so is to deal with static methods that cannot be overridden
+ * and force them provide instance of PulsarKafkaAdminClient instead of KafkaAdminClient.
+ * Specifically this is needed for use with KafkaStreams / DefaultKafkaClientSupplier.
+ */
+
+/**
+ * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
+ * <p>
+ * The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker
+ * version required.
+ * <p>
+ * This client was introduced in 0.11.0.0 and the API is still evolving. We will try to evolve the API in a compatible
+ * manner, but we reserve the right to make breaking changes in minor releases, if necessary. We will update the
+ * {@code InterfaceStability} annotation and this notice once the API is considered stable.
+ */
+@InterfaceStability.Evolving
+public interface PulsarAdmin extends AutoCloseable {
+
+    /**
+     * Create a new Admin with the given configuration.
+     *
+     * @param props The configuration.
+     * @return The new KafkaAdminClient.
+     */
+    static Admin create(Properties props) {
+        return PulsarKafkaAdminClient.create(props);
+    }
+
+    /**
+     * Create a new Admin with the given configuration.
+     *
+     * @param conf The configuration.
+     * @return The new KafkaAdminClient.
+     */
+    static Admin create(Map<String, Object> conf) {
+        return PulsarKafkaAdminClient.create(conf);
+    }
+
+    /**
+     * Close the Admin and release all associated resources.
+     * <p>
+     * See {@link #close(long, TimeUnit)}
+     */
+    @Override
+    default void close() {
+        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Close the Admin and release all associated resources.
+     * <p>
+     * The close operation has a grace period during which current operations will be allowed to
+     * complete, specified by the given duration and time unit.
+     * New operations will not be accepted during the grace period. Once the grace period is over,
+     * all operations that have not yet been completed will be aborted with a {@link org.apache.kafka.common.errors.TimeoutException}.
+     *
+     * @param duration The duration to use for the wait time.
+     * @param unit     The time unit to use for the wait time.
+     * @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
+     */
+    @Deprecated
+    default void close(long duration, TimeUnit unit) {
+        close(Duration.ofMillis(unit.toMillis(duration)));
+    }
+
+    /**
+     * Close the Admin client and release all associated resources.
+     * <p>
+     * The close operation has a grace period during which current operations will be allowed to
+     * complete, specified by the given duration.
+     * New operations will not be accepted during the grace period. Once the grace period is over,
+     * all operations that have not yet been completed will be aborted with a {@link org.apache.kafka.common.errors.TimeoutException}.
+     *
+     * @param timeout The time to use for the wait time.
+     */
+    void close(Duration timeout);
+
+    /**
+     * Create a batch of new topics with the default options.
+     * <p>
+     * This is a convenience method for {@link #createTopics(Collection, CreateTopicsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.10.1.0 or higher.
+     *
+     * @param newTopics The new topics to create.
+     * @return The CreateTopicsResult.
+     */
+    default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
+        return createTopics(newTopics, new CreateTopicsOptions());
+    }
+
+    /**
+     * Create a batch of new topics.
+     * <p>
+     * This operation is not transactional so it may succeed for some topics while fail for others.
+     * <p>
+     * It may take several seconds after {@link CreateTopicsResult} returns
+     * success for all the brokers to become aware that the topics have been created.
+     * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)}
+     * may not return information about the new topics.
+     * <p>
+     * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
+     * from version 0.10.2.0.
+     *
+     * @param newTopics The new topics to create.
+     * @param options   The options to use when creating the new topics.
+     * @return The CreateTopicsResult.
+     */
+    CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options);
+
+    /**
+     * This is a convenience method for {@link #deleteTopics(Collection, DeleteTopicsOptions)}
+     * with default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.10.1.0 or higher.
+     *
+     * @param topics The topic names to delete.
+     * @return The DeleteTopicsResult.
+     */
+    default DeleteTopicsResult deleteTopics(Collection<String> topics) {
+        return deleteTopics(topics, new DeleteTopicsOptions());
+    }
+
+    /**
+     * Delete a batch of topics.
+     * <p>
+     * This operation is not transactional so it may succeed for some topics while fail for others.
+     * <p>
+     * It may take several seconds after the {@link DeleteTopicsResult} returns
+     * success for all the brokers to become aware that the topics are gone.
+     * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)}
+     * may continue to return information about the deleted topics.
+     * <p>
+     * If delete.topic.enable is false on the brokers, deleteTopics will mark
+     * the topics for deletion, but not actually delete them. The futures will
+     * return successfully in this case.
+     * <p>
+     * This operation is supported by brokers with version 0.10.1.0 or higher.
+     *
+     * @param topics  The topic names to delete.
+     * @param options The options to use when deleting the topics.
+     * @return The DeleteTopicsResult.
+     */
+    DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+
+    /**
+     * List the topics available in the cluster with the default options.
+     * <p>
+     * This is a convenience method for {@link #listTopics(ListTopicsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The ListTopicsResult.
+     */
+    default ListTopicsResult listTopics() {
+        return listTopics(new ListTopicsOptions());
+    }
+
+    /**
+     * List the topics available in the cluster.
+     *
+     * @param options The options to use when listing the topics.
+     * @return The ListTopicsResult.
+     */
+    ListTopicsResult listTopics(ListTopicsOptions options);
+
+    /**
+     * Describe some topics in the cluster, with the default options.
+     * <p>
+     * This is a convenience method for {@link #describeTopics(Collection, DescribeTopicsOptions)} with
+     * default options. See the overload for more details.
+     *
+     * @param topicNames The names of the topics to describe.
+     * @return The DescribeTopicsResult.
+     */
+    default DescribeTopicsResult describeTopics(Collection<String> topicNames) {
+        return describeTopics(topicNames, new DescribeTopicsOptions());
+    }
+
+    /**
+     * Describe some topics in the cluster.
+     *
+     * @param topicNames The names of the topics to describe.
+     * @param options    The options to use when describing the topic.
+     * @return The DescribeTopicsResult.
+     */
+    DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options);
+
+    /**
+     * Get information about the nodes in the cluster, using the default options.
+     * <p>
+     * This is a convenience method for {@link #describeCluster(DescribeClusterOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The DescribeClusterResult.
+     */
+    default DescribeClusterResult describeCluster() {
+        return describeCluster(new DescribeClusterOptions());
+    }
+
+    /**
+     * Get information about the nodes in the cluster.
+     *
+     * @param options The options to use when getting information about the cluster.
+     * @return The DescribeClusterResult.
+     */
+    DescribeClusterResult describeCluster(DescribeClusterOptions options);
+
+    /**
+     * This is a convenience method for {@link #describeAcls(AclBindingFilter, DescribeAclsOptions)} with
+     * default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param filter The filter to use.
+     * @return The DeleteAclsResult.
+     */
+    default DescribeAclsResult describeAcls(AclBindingFilter filter) {
+        return describeAcls(filter, new DescribeAclsOptions());
+    }
+
+    /**
+     * Lists access control lists (ACLs) according to the supplied filter.
+     * <p>
+     * Note: it may take some time for changes made by {@code createAcls} or {@code deleteAcls} to be reflected
+     * in the output of {@code describeAcls}.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param filter  The filter to use.
+     * @param options The options to use when listing the ACLs.
+     * @return The DeleteAclsResult.
+     */
+    DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
+
+    /**
+     * This is a convenience method for {@link #createAcls(Collection, CreateAclsOptions)} with
+     * default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param acls The ACLs to create
+     * @return The CreateAclsResult.
+     */
+    default CreateAclsResult createAcls(Collection<AclBinding> acls) {
+        return createAcls(acls, new CreateAclsOptions());
+    }
+
+    /**
+     * Creates access control lists (ACLs) which are bound to specific resources.
+     * <p>
+     * This operation is not transactional so it may succeed for some ACLs while fail for others.
+     * <p>
+     * If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
+     * no changes will be made.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param acls    The ACLs to create
+     * @param options The options to use when creating the ACLs.
+     * @return The CreateAclsResult.
+     */
+    CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
+
+    /**
+     * This is a convenience method for {@link #deleteAcls(Collection, DeleteAclsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param filters The filters to use.
+     * @return The DeleteAclsResult.
+     */
+    default DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
+        return deleteAcls(filters, new DeleteAclsOptions());
+    }
+
+    /**
+     * Deletes access control lists (ACLs) according to the supplied filters.
+     * <p>
+     * This operation is not transactional so it may succeed for some ACLs while fail for others.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param filters The filters to use.
+     * @param options The options to use when deleting the ACLs.
+     * @return The DeleteAclsResult.
+     */
+    DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
+
+
+    /**
+     * Get the configuration for the specified resources with the default options.
+     * <p>
+     * This is a convenience method for {@link #describeConfigs(Collection, DescribeConfigsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param resources The resources (topic and broker resource types are currently supported)
+     * @return The DescribeConfigsResult
+     */
+    default DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
+        return describeConfigs(resources, new DescribeConfigsOptions());
+    }
+
+    /**
+     * Get the configuration for the specified resources.
+     * <p>
+     * The returned configuration includes default values and the isDefault() method can be used to distinguish them
+     * from user supplied values.
+     * <p>
+     * The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
+     * is not disclosed.
+     * <p>
+     * Config entries where isReadOnly() is true cannot be updated.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param resources The resources (topic and broker resource types are currently supported)
+     * @param options   The options to use when describing configs
+     * @return The DescribeConfigsResult
+     */
+    DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options);
+
+    /**
+     * Update the configuration for the specified resources with the default options.
+     * <p>
+     * This is a convenience method for {@link #alterConfigs(Map, AlterConfigsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param configs The resources with their configs (topic is the only resource type with configs that can
+     *                be updated currently)
+     * @return The AlterConfigsResult
+     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
+     */
+    @Deprecated
+    default AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
+        return alterConfigs(configs, new AlterConfigsOptions());
+    }
+
+    /**
+     * Update the configuration for the specified resources with the default options.
+     * <p>
+     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
+     * a particular resource are updated atomically.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param configs The resources with their configs (topic is the only resource type with configs that can
+     *                be updated currently)
+     * @param options The options to use when describing configs
+     * @return The AlterConfigsResult
+     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}.
+     */
+    @Deprecated
+    AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+
+    /**
+     * Incrementally updates the configuration for the specified resources with default options.
+     * <p>
+     * This is a convenience method for {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 2.3.0 or higher.
+     *
+     * @param configs The resources with their configs
+     * @return The AlterConfigsResult
+     */
+    default AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
+        return incrementalAlterConfigs(configs, new AlterConfigsOptions());
+    }
+
+    /**
+     * Incrementally update the configuration for the specified resources.
+     * <p>
+     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
+     * a particular resource are updated atomically.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@link AlterConfigsResult}:
+     * <ul>
+     * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     * if the authenticated user didn't have alter access to the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
+     * if the authenticated user didn't have alter access to the Topic.</li>
+     * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     * if the Topic doesn't exist.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     * if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.3.0 or higher.
+     *
+     * @param configs The resources with their configs
+     * @param options The options to use when altering configs
+     * @return The AlterConfigsResult
+     */
+    AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
+            Collection<AlterConfigOp>> configs, AlterConfigsOptions options);
+
+    /**
+     * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
+     * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
+     * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
+     * log directory if it is not already there. For detailed result, inspect the returned {@link AlterReplicaLogDirsResult} instance.
+     * <p>
+     * This operation is not transactional so it may succeed for some replicas while fail for others.
+     * <p>
+     * This is a convenience method for {@link #alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 1.1.0 or higher.
+     *
+     * @param replicaAssignment     The replicas with their log directory absolute path
+     * @return                      The AlterReplicaLogDirsResult
+     * @throws InterruptedException Interrupted while joining I/O thread
+     */
+    default AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
+        return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
+    }
+
+    /**
+     * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
+     * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
+     * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
+     * log directory if it is not already there. For detailed result, inspect the returned {@link AlterReplicaLogDirsResult} instance.
+     * <p>
+     * This operation is not transactional so it may succeed for some replicas while fail for others.
+     * <p>
+     * This operation is supported by brokers with version 1.1.0 or higher.
+     *
+     * @param replicaAssignment     The replicas with their log directory absolute path
+     * @param options               The options to use when changing replica dir
+     * @return                      The AlterReplicaLogDirsResult
+     * @throws InterruptedException Interrupted while joining I/O thread
+     */
+    AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment,
+                                                  AlterReplicaLogDirsOptions options);
+
+    /**
+     * Query the information of all log directories on the given set of brokers
+     * <p>
+     * This is a convenience method for {@link #describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param brokers A list of brokers
+     * @return The DescribeLogDirsResult
+     */
+    default DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
+        return describeLogDirs(brokers, new DescribeLogDirsOptions());
+    }
+
+    /**
+     * Query the information of all log directories on the given set of brokers
+     * <p>
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param brokers A list of brokers
+     * @param options The options to use when querying log dir info
+     * @return The DescribeLogDirsResult
+     */
+    DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
+
+    /**
+     * Query the replica log directory information for the specified replicas.
+     * <p>
+     * This is a convenience method for {@link #describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
+     * with default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicas The replicas to query
+     * @return The DescribeReplicaLogDirsResult
+     */
+    default DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
+        return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
+    }
+
+    /**
+     * Query the replica log directory information for the specified replicas.
+     * <p>
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     *
+     * @param replicas The replicas to query
+     * @param options  The options to use when querying replica log dir info
+     * @return The DescribeReplicaLogDirsResult
+     */
+    DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
+
+    /**
+     * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
+     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
+     * the partition logic or ordering of the messages will be affected.</strong>
+     * <p>
+     * This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
+     *                      for the created partitions.
+     * @return The CreatePartitionsResult.
+     */
+    default CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
+        return createPartitions(newPartitions, new CreatePartitionsOptions());
+    }
+
+    /**
+     * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
+     * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
+     * the partition logic or ordering of the messages will be affected.</strong>
+     * <p>
+     * This operation is not transactional so it may succeed for some topics while fail for others.
+     * <p>
+     * It may take several seconds after this method returns
+     * success for all the brokers to become aware that the partitions have been created.
+     * During this time, {@link #describeTopics(Collection)}
+     * may not return information about the new partitions.
+     * <p>
+     * This operation is supported by brokers with version 1.0.0 or higher.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link CreatePartitionsResult#values() values()} method of the returned {@link CreatePartitionsResult}
+     * <ul>
+     * <li>{@link org.apache.kafka.common.errors.AuthorizationException}
+     * if the authenticated user is not authorized to alter the topic</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
+     * <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
+     * if a partition reassignment is currently in progress</li>
+     * <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
+     * if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
+     * if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
+     * replicas with the topics replication factor.</li>
+     * <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
+     * if the request is invalid in some way.</li>
+     * </ul>
+     *
+     * @param newPartitions The topics which should have new partitions created, and corresponding parameters
+     *                      for the created partitions.
+     * @param options       The options to use when creating the new partitions.
+     * @return The CreatePartitionsResult.
+     */
+    CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
+                                            CreatePartitionsOptions options);
+
+    /**
+     * Delete records whose offset is smaller than the given offset of the corresponding partition.
+     * <p>
+     * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
+     * See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
+     * @return The DeleteRecordsResult.
+     */
+    default DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
+        return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
+    }
+
+    /**
+     * Delete records whose offset is smaller than the given offset of the corresponding partition.
+     * <p>
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
+     * @param options         The options to use when deleting records.
+     * @return The DeleteRecordsResult.
+     */
+    DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
+                                      DeleteRecordsOptions options);
+
+    /**
+     * Create a Delegation Token.
+     * <p>
+     * This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The CreateDelegationTokenResult.
+     */
+    default CreateDelegationTokenResult createDelegationToken() {
+        return createDelegationToken(new CreateDelegationTokenOptions());
+    }
+
+
+    /**
+     * Create a Delegation Token.
+     * <p>
+     * This operation is supported by brokers with version 1.1.0 or higher.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@link CreateDelegationTokenResult}
+     * <ul>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+     * if the renewers principal type is not supported.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options The options to use when creating delegation token.
+     * @return The DeleteRecordsResult.
+     */
+    CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);
+
+
+    /**
+     * Renew a Delegation Token.
+     * <p>
+     * This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @param hmac HMAC of the Delegation token
+     * @return The RenewDelegationTokenResult.
+     */
+    default RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+        return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
+    }
+
+    /**
+     * Renew a Delegation Token.
+     * <p>
+     * This operation is supported by brokers with version 1.1.0 or higher.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@link RenewDelegationTokenResult}
+     * <ul>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     * if the delegation token is not found on server.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     * if the authenticated user is not owner/renewer of the token.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     * if the delegation token is expired.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac    HMAC of the Delegation token
+     * @param options The options to use when renewing delegation token.
+     * @return The RenewDelegationTokenResult.
+     */
+    RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options);
+
+    /**
+     * Expire a Delegation Token.
+     * <p>
+     * This is a convenience method for {@link #expireDelegationToken(byte[], ExpireDelegationTokenOptions)} with default options.
+     * This will expire the token immediately. See the overload for more details.
+     *
+     * @param hmac HMAC of the Delegation token
+     * @return The ExpireDelegationTokenResult.
+     */
+    default ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) {
+        return expireDelegationToken(hmac, new ExpireDelegationTokenOptions());
+    }
+
+    /**
+     * Expire a Delegation Token.
+     * <p>
+     * This operation is supported by brokers with version 1.1.0 or higher.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@link ExpireDelegationTokenResult}
+     * <ul>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+     * if the delegation token is not found on server.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException}
+     * if the authenticated user is not owner/renewer of the requested token.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenExpiredException}
+     * if the delegation token is expired.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param hmac    HMAC of the Delegation token
+     * @param options The options to use when expiring delegation token.
+     * @return The ExpireDelegationTokenResult.
+     */
+    ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options);
+
+    /**
+     * Describe the Delegation Tokens.
+     * <p>
+     * This is a convenience method for {@link #describeDelegationToken(DescribeDelegationTokenOptions)} with default options.
+     * This will return all the user owned tokens and tokens where user have Describe permission. See the overload for more details.
+     *
+     * @return The DescribeDelegationTokenResult.
+     */
+    default DescribeDelegationTokenResult describeDelegationToken() {
+        return describeDelegationToken(new DescribeDelegationTokenOptions());
+    }
+
+    /**
+     * Describe the Delegation Tokens.
+     * <p>
+     * This operation is supported by brokers with version 1.1.0 or higher.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
+     * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@link DescribeDelegationTokenResult}
+     * <ul>
+     * <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels.</li>
+     * <li>{@link org.apache.kafka.common.errors.DelegationTokenDisabledException}
+     * if the delegation token feature is disabled.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.</li>
+     * </ul>
+     *
+     * @param options The options to use when describing delegation tokens.
+     * @return The DescribeDelegationTokenResult.
+     */
+    DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);
+
+    /**
+     * Describe some group IDs in the cluster.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @param options  The options to use when describing the groups.
+     * @return The DescribeConsumerGroupResult.
+     */
+    DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
+                                                        DescribeConsumerGroupsOptions options);
+
+    /**
+     * Describe some group IDs in the cluster, with the default options.
+     * <p>
+     * This is a convenience method for {@link #describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}
+     * with default options. See the overload for more details.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @return The DescribeConsumerGroupResult.
+     */
+    default DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
+        return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
+    }
+
+    /**
+     * List the consumer groups available in the cluster.
+     *
+     * @param options The options to use when listing the consumer groups.
+     * @return The ListGroupsResult.
+     */
+    ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);
+
+    /**
+     * List the consumer groups available in the cluster with the default options.
+     * <p>
+     * This is a convenience method for {@link #listConsumerGroups(ListConsumerGroupsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The ListGroupsResult.
+     */
+    default ListConsumerGroupsResult listConsumerGroups() {
+        return listConsumerGroups(new ListConsumerGroupsOptions());
+    }
+
+    /**
+     * List the consumer group offsets available in the cluster.
+     *
+     * @param options The options to use when listing the consumer group offsets.
+     * @return The ListGroupOffsetsResult
+     */
+    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
+
+    /**
+     * List the consumer group offsets available in the cluster with the default options.
+     * <p>
+     * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
+     *
+     * @return The ListGroupOffsetsResult.
+     */
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
+        return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
+    }
+
+    /**
+     * Delete consumer groups from the cluster.
+     *
+     * @param options The options to use when deleting a consumer group.
+     * @return The DeletConsumerGroupResult.
+     */
+    DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);
+
+    /**
+     * Delete consumer groups from the cluster with the default options.
+     *
+     * @return The DeleteConsumerGroupResult.
+     */
+    default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) {
+        return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
+    }
+
+    /**
+     * Delete committed offsets for a set of partitions in a consumer group. This will
+     * succeed at the partition level only if the group is not actively subscribed
+     * to the corresponding topic.
+     *
+     * @param options The options to use when deleting offsets in a consumer group.
+     * @return The DeleteConsumerGroupOffsetsResult.
+     */
+    DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId,
+                                                                Set<TopicPartition> partitions,
+                                                                DeleteConsumerGroupOffsetsOptions options);
+
+    /**
+     * Delete committed offsets for a set of partitions in a consumer group with the default
+     * options. This will succeed at the partition level only if the group is not actively
+     * subscribed to the corresponding topic.
+     *
+     * @return The DeleteConsumerGroupOffsetsResult.
+     */
+    default DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions) {
+        return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
+    }
+
+    /**
+     * Elect the preferred replica as leader for topic partitions.
+     * <p>
+     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+     * with preferred election type and default options.
+     * <p>
+     * This operation is supported by brokers with version 2.2.0 or higher.
+     *
+     * @param partitions The partitions for which the preferred leader should be elected.
+     * @return The ElectPreferredLeadersResult.
+     * @deprecated Since 2.4.0. Use {@link #electLeaders(ElectionType, Set)}.
+     */
+    @Deprecated
+    default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
+        return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions());
+    }
+
+    /**
+     * Elect the preferred replica as leader for topic partitions.
+     * <p>
+     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+     * with preferred election type.
+     * <p>
+     * This operation is supported by brokers with version 2.2.0 or higher.
+     *
+     * @param partitions The partitions for which the preferred leader should be elected.
+     * @param options    The options to use when electing the preferred leaders.
+     * @return The ElectPreferredLeadersResult.
+     * @deprecated Since 2.4.0. Use {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+     */
+    @Deprecated
+    default ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
+                                                              ElectPreferredLeadersOptions options) {
+        final ElectLeadersOptions newOptions = new ElectLeadersOptions();
+        newOptions.timeoutMs(options.timeoutMs());
+        final Set<TopicPartition> topicPartitions = partitions == null ? null : new HashSet<>(partitions);
+
+        return new ElectPreferredLeadersResult(electLeaders(ElectionType.PREFERRED, topicPartitions, newOptions));
+    }
+
+    /**
+     * Elect a replica as leader for topic partitions.
+     * <p>
+     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+     * with default options.
+     *
+     * @param electionType The type of election to conduct.
+     * @param partitions   The topics and partitions for which to conduct elections.
+     * @return The ElectLeadersResult.
+     */
+    default ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions) {
+        return electLeaders(electionType, partitions, new ElectLeadersOptions());
+    }
+
+    /**
+     * Elect a replica as leader for the given {@code partitions}, or for all partitions if the argument
+     * to {@code partitions} is null.
+     * <p>
+     * This operation is not transactional so it may succeed for some partitions while fail for others.
+     * <p>
+     * It may take several seconds after this method returns success for all the brokers in the cluster
+     * to become aware that the partitions have new leaders. During this time,
+     * {@link #describeTopics(Collection)} may not return information about the partitions'
+     * new leaders.
+     * <p>
+     * This operation is supported by brokers with version 2.2.0 or later if preferred election is use;
+     * otherwise the brokers most be 2.4.0 or higher.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future obtained
+     * from the returned {@link ElectLeadersResult}:
+     * <ul>
+     * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     * if the authenticated user didn't have alter access to the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     * if the topic or partition did not exist within the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
+     * if the topic was already queued for deletion.</li>
+     * <li>{@link org.apache.kafka.common.errors.NotControllerException}
+     * if the request was sent to a broker that was not the controller for the cluster.</li>
+     * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     * if the request timed out before the election was complete.</li>
+     * <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
+     * if the preferred leader was not alive or not in the ISR.</li>
+     * </ul>
+     *
+     * @param electionType The type of election to conduct.
+     * @param partitions   The topics and partitions for which to conduct elections.
+     * @param options      The options to use when electing the leaders.
+     * @return The ElectLeadersResult.
+     */
+    ElectLeadersResult electLeaders(
+            ElectionType electionType,
+            Set<TopicPartition> partitions,
+            ElectLeadersOptions options);
+
+
+    /**
+     * Change the reassignments for one or more partitions.
+     * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+     *
+     * This is a convenience method for {@link #alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}
+     * with default options.  See the overload for more details.
+     */
+    default AlterPartitionReassignmentsResult alterPartitionReassignments(
+            Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
+        return alterPartitionReassignments(reassignments, new AlterPartitionReassignmentsOptions());
+    }
+
+    /**
+     * Change the reassignments for one or more partitions.
+     * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code AlterPartitionReassignmentsResult}:</p>
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     *   If the topic or partition does not exist within the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   if the request timed out before the controller could record the new assignments.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidReplicaAssignmentException}
+     *   If the specified assignment was not valid.</li>
+     *   <li>{@link org.apache.kafka.common.errors.NoReassignmentInProgressException}
+     *   If there was an attempt to cancel a reassignment for a partition which was not being reassigned.</li>
+     * </ul>
+     *
+     * @param reassignments   The reassignments to add, modify, or remove. See {@link NewPartitionReassignment}.
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    AlterPartitionReassignmentsResult alterPartitionReassignments(
+            Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+            AlterPartitionReassignmentsOptions options);
+
+
+    /**
+     * List all of the current partition reassignments
+     *
+     * This is a convenience method for {@link #listPartitionReassignments(ListPartitionReassignmentsOptions)}
+     * with default options. See the overload for more details.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments() {
+        return listPartitionReassignments(new ListPartitionReassignmentsOptions());
+    }
+
+    /**
+     * List the current reassignments for the given partitions
+     *
+     * This is a convenience method for {@link #listPartitionReassignments(Set, ListPartitionReassignmentsOptions)}
+     * with default options. See the overload for more details.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions) {
+        return listPartitionReassignments(partitions, new ListPartitionReassignmentsOptions());
+    }
+
+    /**
+     * List the current reassignments for the given partitions
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code ListPartitionReassignmentsResult}:</p>
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user doesn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     *   If a given topic or partition does not exist.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the controller could list the current reassignments.</li>
+     * </ul>
+     *
+     * @param partitions      The topic partitions to list reassignments for.
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments(
+            Set<TopicPartition> partitions,
+            ListPartitionReassignmentsOptions options) {
+        return listPartitionReassignments(Optional.of(partitions), options);
+    }
+
+    /**
+     * List all of the current partition reassignments
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code ListPartitionReassignmentsResult}:</p>
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user doesn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
+     *   If a given topic or partition does not exist.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the controller could list the current reassignments.</li>
+     * </ul>
+     *
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    default ListPartitionReassignmentsResult listPartitionReassignments(ListPartitionReassignmentsOptions options) {
+        return listPartitionReassignments(Optional.empty(), options);
+    }
+
+    /**
+     * @param partitions the partitions we want to get reassignment for, or an empty optional if we want to get the reassignments for all partitions in the cluster
+     * @param options         The options to use.
+     * @return                The result.
+     */
+    ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
+                                                                ListPartitionReassignmentsOptions options);
+
+    /**
+     * Remove members from the consumer group by given member identities.
+     * <p>
+     * For possible error codes, refer to {@link LeaveGroupResponse}.
+     *
+     * @param groupId The ID of the group to remove member from.
+     * @param options The options to carry removing members' information.
+     * @return The MembershipChangeResult.
+     */
+    RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options);
+
+    /**
+     * <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
+     *
+     * <p>This is a convenience method for {@link #alterConsumerGroupOffsets(String, Map, AlterConsumerGroupOffsetsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @param groupId The group for which to alter offsets.
+     * @param offsets A map of offsets by partition with associated metadata.
+     * @return The AlterOffsetsResult.
+     */
+    default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
+        return alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions());
+    }
+
+    /**
+     * <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
+     *
+     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
+     *
+     * @param groupId The group for which to alter offsets.
+     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
+     * @param options The options to use when altering the offsets.
+     * @return The AlterOffsetsResult.
+     */
+    AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);
+
+    /**
+     * <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
+     * the beginning offset, end offset as well as the offset matching a timestamp in partitions.
+     *
+     * <p>This is a convenience method for {@link #listOffsets(Map, ListOffsetsOptions)}
+     *
+     * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
+     * @return The ListOffsetsResult.
+     */
+    default ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
+        return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
+    }
+
+    /**
+     * <p>List offset for the specified partitions. This operation enables to find
+     * the beginning offset, end offset as well as the offset matching a timestamp in partitions.
+     *
+     * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
+     * @param options The options to use when retrieving the offsets
+     * @return The ListOffsetsResult.
+     */
+    ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
+
+    /**
+     * Describes all entities matching the provided filter that have at least one client quota configuration
+     * value defined.
+     * <p>
+     * This is a convenience method for {@link #describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)}
+     * with default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param filter the filter to apply to match entities
+     * @return the DescribeClientQuotasResult containing the result
+     */
+    default DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter) {
+        return describeClientQuotas(filter, new DescribeClientQuotasOptions());
+    }
+
+    /**
+     * Describes all entities matching the provided filter that have at least one client quota configuration
+     * value defined.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeClientQuotasResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have describe access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., an invalid entity type was specified.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe could finish.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param filter the filter to apply to match entities
+     * @param options the options to use
+     * @return the DescribeClientQuotasResult containing the result
+     */
+    DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
+
+    /**
+     * Alters client quota configurations with the specified alterations.
+     * <p>
+     * This is a convenience method for {@link #alterClientQuotas(Collection, AlterClientQuotasOptions)}
+     * with default options. See the overload for more details.
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param entries the alterations to perform
+     * @return the AlterClientQuotasResult containing the result
+     */
+    default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries) {
+        return alterClientQuotas(entries, new AlterClientQuotasOptions());
+    }
+
+    /**
+     * Alters client quota configurations with the specified alterations.
+     * <p>
+     * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
+     * per-entity error code should be evaluated to resolve the success or failure of all updates.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@link AlterClientQuotasResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
+     *   succeed or not.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.6.0 or higher.
+     *
+     * @param entries the alterations to perform
+     * @return the AlterClientQuotasResult containing the result
+     */
+    AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
+
+    /**
+     * Describe all SASL/SCRAM credentials.
+     *
+     * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+     *
+     * @return The DescribeUserScramCredentialsResult.
+     */
+    default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+        return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions());
+    }
+
+    /**
+     * Describe SASL/SCRAM credentials for the given users.
+     *
+     * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+     *
+     * @param users the users for which credentials are to be described; all users' credentials are described if null
+     *              or empty.
+     * @return The DescribeUserScramCredentialsResult.
+     */
+    default DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users) {
+        return describeUserScramCredentials(users, new DescribeUserScramCredentialsOptions());
+    }
+
+    /**
+     * Describe SASL/SCRAM credentials.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures from the
+     * returned {@link DescribeUserScramCredentialsResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have describe access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.ResourceNotFoundException}
+     *   If the user did not exist/had no SCRAM credentials.</li>
+     *   <li>{@link org.apache.kafka.common.errors.DuplicateResourceException}
+     *   If the user was requested to be described more than once in the original request.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+     *
+     * @param users the users for which credentials are to be described; all users' credentials are described if null
+     *              or empty.
+     * @param options The options to use when describing the credentials
+     * @return The DescribeUserScramCredentialsResult.
+     */
+    DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users, DescribeUserScramCredentialsOptions options);
+
+    /**
+     * Alter SASL/SCRAM credentials for the given users.
+     *
+     * <p>This is a convenience method for {@link #alterUserScramCredentials(List, AlterUserScramCredentialsOptions)}
+     *
+     * @param alterations the alterations to be applied
+     * @return The AlterUserScramCredentialsResult.
+     */
+    default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations) {
+        return alterUserScramCredentials(alterations, new AlterUserScramCredentialsOptions());
+    }
+
+    /**
+     * Alter SASL/SCRAM credentials.
+     *
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} any of the futures from the
+     * returned {@link AlterUserScramCredentialsResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.NotControllerException}
+     *   If the request is not sent to the Controller broker.</li>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+     *   If the user authenticated with a delegation token.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnsupportedSaslMechanismException}
+     *   If the requested SCRAM mechanism is unrecognized or otherwise unsupported.</li>
+     *   <li>{@link org.apache.kafka.common.errors.UnacceptableCredentialException}
+     *   If the username is empty or the requested number of iterations is too small or too large.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe could finish.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+     *
+     * @param alterations the alterations to be applied
+     * @param options The options to use when altering the credentials
+     * @return The AlterUserScramCredentialsResult.
+     */
+    AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
+                                                              AlterUserScramCredentialsOptions options);
+    /**
+     * Describes finalized as well as supported features.
+     * <p>
+     * This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return the {@link DescribeFeaturesResult} containing the result
+     */
+    default DescribeFeaturesResult describeFeatures() {
+        return describeFeatures(new DescribeFeaturesOptions());
+    }
+
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     *
+     * @param options the options to use
+     * @return the {@link DescribeFeaturesResult} containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not transactional so some
+     * updates may succeed while the rest may fail.
+     * <p>
+     * The API takes in a map of finalized feature names to {@link FeatureUpdate} that needs to be
+     * applied. Each entry in the map specifies the finalized feature to be added or updated or
+     * deleted, along with the new max feature version level value. This request is issued only to
+     * the controller since the API is only served by the controller. The return value contains an
+     * error code for each supplied {@link FeatureUpdate}, and the code indicates if the update
+     * succeeded or failed in the controller.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+     * in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set. Setting this
+     * flag conveys user intent to attempt downgrade of a feature max version level. Note that
+     * despite the allowDowngrade flag being set, certain downgrades may be rejected by the
+     * controller if it is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular operation/intent. It could be
+     * done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting
+     * the max version level to a value less than 1.</li>
+     * </ul>
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures
+     * obtained from the returned {@link UpdateFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link FeatureUpdateFailedException}
+     *   This means there was an unexpected error encountered when the update was applied on
+     *   the controller. There is no guarantee on whether the update succeeded or failed. The best
+     *   way to find out is to issue a {@link Admin#describeFeatures(DescribeFeaturesOptions)}
+     *   request to the controller to get the latest features.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
+     * @param options the options to use
+     * @return the {@link UpdateFeaturesResult} containing the result
+     */
+    UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
+
+    /**
+     * Get the metrics kept by the adminClient
+     */
+    Map<MetricName, ? extends Metric> metrics();
+}
\ No newline at end of file
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
new file mode 100644
index 0000000..8196a39
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Mostly noop implementation of AdminClient
+ */
+@Slf4j
+public class PulsarKafkaAdminClient implements Admin {
+
+    private final Properties properties;
+    private final PulsarAdmin admin;
+
+    private final ConcurrentHashMap<String, Boolean> partitionedTopics = new ConcurrentHashMap<>();
+
+    private PulsarKafkaAdminClient(AdminClientConfig config) {
+        properties = new Properties();
+        log.info("config originals: {}", config.originals());
+        config.originals().forEach((k, v) -> {
+            log.info("Setting k = {} v = {}", k, v);
+            // properties do not allow null values
+            if (k != null && v != null) {
+                properties.put(k, v);
+            }
+        });
+
+        PulsarAdminBuilder builder = PulsarClientKafkaConfig.getAdminBuilder(properties);
+        try {
+            admin = builder.build();
+        } catch (PulsarClientException e) {
+            log.error("Could not create Pulsar Admin", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    static PulsarKafkaAdminClient createInternal(AdminClientConfig config) {
+        return new PulsarKafkaAdminClient(config);
+    }
+
+    public static Admin create(Properties props) {
+        return PulsarKafkaAdminClient.createInternal(new AdminClientConfig(props, true));
+    }
+
+    public static Admin create(Map<String, Object> conf) {
+        return PulsarKafkaAdminClient.createInternal(new AdminClientConfig(conf, true));
+    }
+
+    private boolean isPartitionedTopic(String topic) {
+        Boolean res = partitionedTopics.computeIfAbsent(topic, t -> {
+            try {
+                int numPartitions = admin.topics()
+                        .getPartitionedTopicMetadata(t)
+                        .partitions;
+                return numPartitions > 0;
+            } catch (PulsarAdminException e) {
+                log.error("getPartitionedTopicMetadata failed", e);
+                return null;
+            }
+        });
+        if (res == null) {
+            throw new RuntimeException("Could not get topic metadata");
+        }
+        return res;
+    }
+
+    public <K, V> Map<K, KafkaFutureImpl<V>> execute(Collection<K> param,
+                                                     java.util.function.BiConsumer<K, KafkaFutureImpl<V>> func) {
+        // preparing topics list for asking metadata about them
+        final Map<K, KafkaFutureImpl<V>> futures
+                = new HashMap<>();
+        for (K value : param) {
+            KafkaFutureImpl<V> future = new KafkaFutureImpl<>();
+            futures.put(value, future);
+            func.accept(value, future);
+        }
+        return futures;
+    }
+
+    public <K, K2, V> Map<K, KafkaFutureImpl<V>> execute(Map<K, K2> param,
+                                 java.util.function.BiConsumer<Map.Entry<K, K2>, KafkaFutureImpl<V>> func) {
+        // preparing topics list for asking metadata about them
+        final Map<K, KafkaFutureImpl<V>> futures
+                = new HashMap<>(param.size());
+        for (Map.Entry<K, K2> value : param.entrySet()) {
+            KafkaFutureImpl<V> future = new KafkaFutureImpl<>();
+            futures.put(value.getKey(), future);
+            func.accept(value, future);
+        }
+        return futures;
+    }
+
+    @Override
+    public void close(Duration duration) {
+        admin.close();
+    }
+
+    @Override
+    public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
+                                         ListOffsetsOptions listOffsetsOptions) {
+        final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures =
+            execute(topicPartitionOffsets, (entry, future) -> {
+                TopicPartition topicPartition = entry.getKey();
+                String topicName = isPartitionedTopic(topicPartition.topic())
+                        ? topicPartition.topic() + TopicName.PARTITIONED_TOPIC_SUFFIX + topicPartition.partition()
+                        : topicPartition.topic();
+                admin.topics()
+                        .getLastMessageIdAsync(topicName)
+                        .whenComplete((msgId, ex) -> {
+                            if (ex == null) {
+                                long offset = FunctionCommon.getSequenceId(msgId);
+                                future.complete(new ListOffsetsResult.ListOffsetsResultInfo(
+                                        offset,
+                                        System.currentTimeMillis(),
+                                        Optional.empty()));
+                            } else {
+                                log.error("Admin failed to get lastMessageId for topic " + topicName, ex);
+                                future.completeExceptionally(ex);
+                            }
+                        });
+            });
+        return new ListOffsetsResult(new HashMap<>(futures));
+    }
+
+    @Override
+    public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> map, DeleteRecordsOptions deleteRecordsOptions) {
+        final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures =
+                execute(map, (entry, future) -> {
+                    // nothing to do, cannot delete messages before offset, let pulsar expire stuff
+                    future.complete(new DeletedRecords(entry.getValue().beforeOffset()));
+                });
+
+        return new DeleteRecordsResult(new HashMap<>(futures));
+    }
+
+    @Override
+    public CreateTopicsResult createTopics(Collection<NewTopic> collection, CreateTopicsOptions createTopicsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DeleteTopicsResult deleteTopics(Collection<String> collection, DeleteTopicsOptions deleteTopicsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public ListTopicsResult listTopics(ListTopicsOptions listTopicsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeTopicsResult describeTopics(Collection<String> collection, DescribeTopicsOptions describeTopicsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeClusterResult describeCluster(DescribeClusterOptions describeClusterOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeAclsResult describeAcls(AclBindingFilter aclBindingFilter, DescribeAclsOptions describeAclsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public CreateAclsResult createAcls(Collection<AclBinding> collection, CreateAclsOptions createAclsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> collection, DeleteAclsOptions deleteAclsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> collection, DescribeConfigsOptions describeConfigsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> map, AlterConfigsOptions alterConfigsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> map, AlterConfigsOptions alterConfigsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> map, AlterReplicaLogDirsOptions alterReplicaLogDirsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeLogDirsResult describeLogDirs(Collection<Integer> collection, DescribeLogDirsOptions describeLogDirsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> collection, DescribeReplicaLogDirsOptions describeReplicaLogDirsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> map, CreatePartitionsOptions createPartitionsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions createDelegationTokenOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public RenewDelegationTokenResult renewDelegationToken(byte[] bytes, RenewDelegationTokenOptions renewDelegationTokenOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public ExpireDelegationTokenResult expireDelegationToken(byte[] bytes, ExpireDelegationTokenOptions expireDelegationTokenOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions describeDelegationTokenOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> collection, DescribeConsumerGroupsOptions describeConsumerGroupsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions listConsumerGroupsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String s, ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> collection, DeleteConsumerGroupsOptions deleteConsumerGroupsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String s, Set<TopicPartition> set, DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> set, ElectLeadersOptions electLeadersOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public AlterPartitionReassignmentsResult alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> map, AlterPartitionReassignmentsOptions alterPartitionReassignmentsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> optional, ListPartitionReassignmentsOptions listPartitionReassignmentsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String s, RemoveMembersFromConsumerGroupOptions removeMembersFromConsumerGroupOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String s, Map<TopicPartition, OffsetAndMetadata> map, AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter clientQuotaFilter, DescribeClientQuotasOptions describeClientQuotasOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> collection, AlterClientQuotasOptions alterClientQuotasOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> list, DescribeUserScramCredentialsOptions describeUserScramCredentialsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> list, AlterUserScramCredentialsOptions alterUserScramCredentialsOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions describeFeaturesOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> map, UpdateFeaturesOptions updateFeaturesOptions) {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        throw new UnsupportedOperationException("Operation is not supported");
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
new file mode 100644
index 0000000..81f7c49
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import com.google.api.client.util.Maps;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Mostly noop implementation of coordinator to include minimal functionality that
+ * Kafka Streams expect to call all the callbacks.
+ * Unlike in Kafka Streams, it does not actually coordinate anything,
+ * named to keep lose reference to where that functionality is in KS.
+ */
+@Slf4j
+public class PulsarConsumerCoordinator {
+
+    // Used to encode "task assignment info" to force
+    // kafka streams to create new tasks after subscription
+    private static class AssignmentInfo {
+        // v.2 is the first to support partitions per host
+        // the rest is not applicable/useful but requires more placeholder data encoded
+        private final int usedVersion = 2;
+        private final List<TopicPartition> partitions;
+
+        public AssignmentInfo(final List<TopicPartition> partitions) {
+            this.partitions = partitions;
+        }
+
+        @SneakyThrows
+        public ByteBuffer encode() {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            try (final DataOutputStream out = new DataOutputStream(baos)) {
+                out.writeInt(usedVersion); // version
+                encodeActiveAndStandbyTaskAssignment(out, partitions);
+                encodePartitionsByHost(out);
+
+                out.flush();
+                out.close();
+
+                return ByteBuffer.wrap(baos.toByteArray());
+            }
+        }
+
+        private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out,
+                                                          final List<TopicPartition> partitions) throws IOException {
+
+            int lastId = 0;
+            final Map<String, Integer> topicGroupIds = new HashMap<>();
+            // encode active tasks
+            // the number of assigned partitions must be the same as number of active tasks
+            out.writeInt(partitions.size());
+            for (TopicPartition p : partitions) {
+                final int topicGroupId;
+                if (topicGroupIds.containsKey(p.topic())) {
+                    topicGroupId = topicGroupIds.get(p.topic());
+                } else {
+                    topicGroupId = lastId;
+                    lastId++;
+                    topicGroupIds.put(p.topic(), topicGroupId);
+                }
+                out.writeInt(topicGroupId);
+                out.writeInt(p.partition());
+            }
+
+            // encode standby tasks
+            out.writeInt(0);
+        }
+
+        private void encodePartitionsByHost(final DataOutputStream out) throws IOException {
+            // encode partitions by host
+            out.writeInt(1);
+            writeHostInfo(out, "fakeHost", 9999);
+            writeTopicPartitions(out, partitions);
+        }
+
+        private void writeHostInfo(final DataOutputStream out, String host, int port) throws IOException {
+            out.writeUTF(host);
+            out.writeInt(port);
+        }
+
+        private void writeTopicPartitions(final DataOutputStream out,
+                                          final List<TopicPartition> partitions) throws IOException {
+            out.writeInt(partitions.size());
+            for (final TopicPartition partition : partitions) {
+                out.writeUTF(partition.topic());
+                out.writeInt(partition.partition());
+            }
+        }
+    }
+
+    public static void invokePartitionsAssigned(final String groupId, final ConsumerConfig config, final List<TopicPartition> assignedPartitions) {
+        final List<ConsumerPartitionAssignor> assignors = PartitionAssignorAdapter.getAssignorInstances(
+                config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+                config.originals());
+
+        // Give the assignor a chance to update internal state based on the received assignment
+        ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata(groupId);
+
+        ByteBuffer bbInfo = new AssignmentInfo(assignedPartitions).encode();
+        ConsumerPartitionAssignor.Assignment assignment =
+                new ConsumerPartitionAssignor.Assignment(assignedPartitions, bbInfo);
+
+        for (ConsumerPartitionAssignor assignor : assignors) {
+            assignor.onAssignment(assignment, groupMetadata);
+        }
+    }
+
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 06b1c6e..9ef4e96 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -18,28 +18,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -48,25 +28,50 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
 import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
+import java.time.Duration;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
@@ -96,6 +101,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
     private final Properties properties;
 
+    private final ConsumerConfig consumerCfg;
+
     private static class QueueItem {
         final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
         final Message<byte[]> message;
@@ -140,7 +147,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
     @SuppressWarnings("unchecked")
     private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
-
+        this.consumerCfg = consumerConfig;
         if (keySchema == null) {
             Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
                     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
@@ -161,7 +168,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
             consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
         }
 
-        groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+        // kafka removes group id for the restore consumer but adds client id
+        groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG) == null
+                ? consumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG)
+                : consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+        Preconditions.checkNotNull(groupId, "groupId cannot be null");
         isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         log.info("Offset reset strategy has been assigned value {}", strategy);
@@ -179,7 +190,14 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
 
         this.properties = new Properties();
-        consumerConfig.originals().forEach((k, v) -> properties.put(k, v));
+        log.info("config originals: {}", consumerConfig.originals());
+        consumerConfig.originals().forEach((k, v) -> {
+            log.info("Setting k = {} v = {}", k, v);
+            // properties do not allow null values
+            if (k != null && v != null) {
+                properties.put(k, v);
+            }
+        });
         ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
         // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
         // all the acknowledgments sent to broker within a short time frame
@@ -217,7 +235,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
     @Override
     public Set<TopicPartition> assignment() {
-        throw new UnsupportedOperationException("Cannot access the partitions assignements");
+        return consumers.keySet();
     }
 
     /**
@@ -260,7 +278,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                                 .topic(partitionName).subscribeAsync();
                         int partitionIndex = i;
                         TopicPartition tp = new TopicPartition(
-                            TopicName.get(topic).getPartitionedTopicName(),
+                            TopicName.get(topic).getLocalName(),
                             partitionIndex);
                         futures.add(future.thenApply(consumer -> {
                             log.info("Add consumer {} for partition {}", consumer, tp);
@@ -274,7 +292,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                     CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
                             .subscribeAsync();
                     TopicPartition tp = new TopicPartition(
-                        TopicName.get(topic).getPartitionedTopicName(),
+                        TopicName.get(topic).getLocalName(),
                         0);
                     futures.add(future.thenApply(consumer -> {
                         log.info("Add consumer {} for partition {}", consumer, tp);
@@ -288,6 +306,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
             
             // Wait for all consumers to be ready
             futures.forEach(CompletableFuture::join);
+            PulsarConsumerCoordinator.invokePartitionsAssigned(groupId, consumerCfg, Lists.newArrayList(consumers.keySet()));
 
             // Notify the listener is now owning all topics/partitions
             if (callback != null) {
@@ -310,7 +329,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
     @Override
     public void assign(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException("Cannot manually assign partitions");
+        Set<String> topics = partitions.stream().map(p -> p.topic()).collect(Collectors.toSet());
+        subscribe(topics);
+        // not throwing exception to let KafkaStreams use the consumer.
+        //log.error("Tried to assign partitions {}. The operation is not supported", partitions,
+        //        new UnsupportedOperationException("not supported"));
     }
 
     @Override
@@ -349,7 +372,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
             while (item != null) {
                 TopicName topicName = TopicName.get(item.consumer.getTopic());
-                String topic = topicName.getPartitionedTopicName();
+                String topic = topicName.getLocalName();
                 int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                 Message<byte[]> msg = item.message;
                 MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
@@ -641,6 +664,19 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
+        return partitions.stream()
+                .map(x -> new AbstractMap.SimpleEntry<>(x, committed(x)))
+                .filter(entry -> entry.getValue() != null)
+                .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration duration) {
+        return committed(partitions);
+    }
+
+    @Override
     public Map<MetricName, ? extends Metric> metrics() {
         throw new UnsupportedOperationException();
     }
@@ -665,19 +701,32 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         throw new UnsupportedOperationException();
     }
 
+    // there is no isPaused() in pulsar consumer
+    // This works well enough because KafkaStreams
+    // seem to resume all paused consumers
     @Override
     public Set<TopicPartition> paused() {
-        throw new UnsupportedOperationException();
+        return consumers.keySet();
     }
 
     @Override
     public void pause(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
+        partitions.forEach(p -> {
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+            if (c != null) {
+                c.pause();
+            }
+        });
     }
 
     @Override
     public void resume(Collection<TopicPartition> partitions) {
-        throw new UnsupportedOperationException();
+        partitions.forEach(p -> {
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+            if (c != null) {
+                c.resume();
+            }
+        });
     }
 
     @Override
@@ -711,6 +760,16 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public ConsumerGroupMetadata groupMetadata() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void enforceRebalance() {
+        log.info("enforceRebalance() is called but ignored");
+    }
+
+    @Override
     public void close() {
         close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
     }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 13f97b5..f6da38e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
@@ -193,6 +194,11 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
     }
 
     @Override
+    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void commitTransaction() throws ProducerFencedException {
         throw new UnsupportedOperationException();
     }
@@ -366,7 +372,15 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
         TopicPartition tp = new TopicPartition(topic, partition);
         TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
-        return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
+
+        long publishTime = 0L;
+        try {
+            // there is no hasPublishTime() currently
+            publishTime = mb.getPublishTime();
+        } catch (IllegalStateException ise) {
+            logger.debug("could not get publish time");
+        }
+        return new RecordMetadata(tp, offset, 0L, publishTime, 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
     }
 
     private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index 8ba5ede..eebd95b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -22,6 +22,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -121,4 +125,37 @@ public class PulsarClientKafkaConfig {
 
         return clientBuilder;
     }
+
+    @SneakyThrows
+    public static PulsarAdminBuilder getAdminBuilder(Properties properties) {
+        String serviceUrl = properties.getProperty("pulsar.admin.url");
+
+        String authPluginClassName = properties.getProperty("authPlugin");
+        String authParams = properties.getProperty("authParams");
+        boolean tlsAllowInsecureConnection =
+                Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false"));
+
+        boolean tlsEnableHostnameVerification =
+                Boolean.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
+        final String tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
+
+        boolean useKeyStoreTls = Boolean
+                .parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
+        String tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
+        String tlsTrustStorePath = properties.getProperty("tlsTrustStorePath");
+        String tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword");
+
+        PulsarAdminBuilder adminBuilder = PulsarAdmin.builder()
+                .serviceHttpUrl(serviceUrl)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .enableTlsHostnameVerification(tlsEnableHostnameVerification)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .useKeyStoreTls(useKeyStoreTls)
+                .tlsTrustStoreType(tlsTrustStoreType)
+                .tlsTrustStorePath(tlsTrustStorePath)
+                .tlsTrustStorePassword(tlsTrustStorePassword)
+                .authentication(authPluginClassName, authParams);
+
+        return adminBuilder;
+    }
 }