You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/08 19:24:08 UTC

[pulsar] branch master updated: [clients][kafka] Fix topic name & race condition on kafka wrapper (#2746)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dd4f71d   [clients][kafka] Fix topic name & race condition on kafka wrapper (#2746)
dd4f71d is described below

commit dd4f71d9572313e26674209cf1ae2d1bc0611f6d
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Oct 8 12:24:04 2018 -0700

     [clients][kafka] Fix topic name & race condition on kafka wrapper (#2746)
    
    *Motivation*
    
    Current PulsarKafkaConsumer has following defects:
    
    - topic name is used inconsistently for keeping different mapping. we should always use fully qualified topic name as keys for keeping mappings.
    - seek should clean up offset maps
    - poll logic can potentially pop a message but not deliver it to the client
    
    *Changes*
    
    - Fixes those issues in kafka wrapper
    - Enable kafka integration tests and use standalone test suite
    - Enable kafka client logging
---
 buildtools/src/main/resources/log4j2.xml           |   1 +
 .../clients/consumer/PulsarKafkaConsumer.java      |  37 +++--
 ...luster-2-bookie-1-broker-unstarted-with-s3.yaml | 164 ---------------------
 tests/integration/pom.xml                          |  11 ++
 .../topologies/PulsarStandaloneTestBase.java       |   1 -
 tests/pom.xml                                      |   1 +
 .../pom.xml                                        |  92 ++----------
 .../integration/compat/kafka/KafkaApiTest.java     |  83 ++++++-----
 8 files changed, 102 insertions(+), 288 deletions(-)

diff --git a/buildtools/src/main/resources/log4j2.xml b/buildtools/src/main/resources/log4j2.xml
index 85a7c1e..a658b55 100644
--- a/buildtools/src/main/resources/log4j2.xml
+++ b/buildtools/src/main/resources/log4j2.xml
@@ -31,5 +31,6 @@
         </Root>
         <Logger name="org.apache.pulsar" level="info"/>
         <Logger name="org.apache.bookkeeper" level="info"/>
+        <Logger name="org.apache.kafka" level="info"/>
     </Loggers>
 </Configuration>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 0ce91c2..cf1e716 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -62,6 +63,7 @@ import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
+@Slf4j
 public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
 
     private static final long serialVersionUID = 1L;
@@ -216,18 +218,28 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                         CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                                 .topic(partitionName).subscribeAsync();
                         int partitionIndex = i;
-                        TopicPartition tp = new TopicPartition(topic, partitionIndex);
-                        future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
-                        futures.add(future);
+                        TopicPartition tp = new TopicPartition(
+                            TopicName.get(topic).getPartitionedTopicName(),
+                            partitionIndex);
+                        futures.add(future.thenApply(consumer -> {
+                            log.info("Add consumer {} for partition {}", consumer, tp);
+                            consumers.putIfAbsent(tp, consumer);
+                            return consumer;
+                        }));
                         topicPartitions.add(tp);
                     }
                 } else {
                     // Topic has a single partition
                     CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
                             .subscribeAsync();
-                    TopicPartition tp = new TopicPartition(topic, 0);
-                    future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
-                    futures.add(future);
+                    TopicPartition tp = new TopicPartition(
+                        TopicName.get(topic).getPartitionedTopicName(),
+                        0);
+                    futures.add(future.thenApply(consumer -> {
+                        log.info("Add consumer {} for partition {}", consumer, tp);
+                        consumers.putIfAbsent(tp, consumer);
+                        return consumer;
+                    }));
                     topicPartitions.add(tp);
                 }
             }
@@ -290,7 +302,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
             int numberOfRecords = 0;
 
-            while (item != null && ++numberOfRecords < MAX_RECORDS_IN_SINGLE_POLL) {
+            while (item != null) {
                 TopicName topicName = TopicName.get(item.consumer.getTopic());
                 String topic = topicName.getPartitionedTopicName();
                 int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
@@ -320,11 +332,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 // Update last offset seen by application
                 lastReceivedOffset.put(tp, offset);
 
+                if (++numberOfRecords < MAX_RECORDS_IN_SINGLE_POLL) {
+                    break;
+                }
+
                 // Check if we have an item already available
                 item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
             }
 
-            if (isAutoCommit) {
+            if (isAutoCommit && !records.isEmpty()) {
                 // Commit the offset of previously dequeued messages
                 commitAsync();
             }
@@ -395,7 +411,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
         offsets.forEach((topicPartition, offsetAndMetadata) -> {
             org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
-
             lastCommittedOffset.put(topicPartition, offsetAndMetadata);
             futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
         });
@@ -435,6 +450,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         if (partitions.isEmpty()) {
             partitions = consumers.keySet();
         }
+        lastCommittedOffset.clear();
+        lastReceivedOffset.clear();
 
         for (TopicPartition tp : partitions) {
             org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
@@ -456,6 +473,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         if (partitions.isEmpty()) {
             partitions = consumers.keySet();
         }
+        lastCommittedOffset.clear();
+        lastReceivedOffset.clear();
 
         for (TopicPartition tp : partitions) {
             org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
deleted file mode 100644
index ec46571..0000000
--- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
+++ /dev/null
@@ -1,164 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-networks:
-  pulsarnet*:
-    driver: bridge
-
-zookeeper*:
-  image: apachepulsar/pulsar-test-latest-version:latest
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  env: [ZOOKEEPER_SERVERS=zookeeper]
-  labels:
-    cluster: test
-    service: zookeeper
-  entryPoint: [bin/run-local-zk.sh]
-  aliases:
-    - zookeeper
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.ZKJournalToTargetDirStopAction
-  networkMode: pulsarnet*
-
-configuration-store*:
-  image: apachepulsar/pulsar-test-latest-version:latest
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  env: [ZOOKEEPER_SERVERS=configuration-store]
-  labels:
-    cluster: test
-    service: configuration-store
-  entryPoint: [bin/run-global-zk.sh]
-  aliases:
-    - configuration-store
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.ZKJournalToTargetDirStopAction
-  networkMode: pulsarnet*
-
-init*:
-  image: apachepulsar/pulsar-test-latest-version:latest
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  env:
-    - clusterName=test
-    - zkServers=zookeeper
-    - configurationStore=configuration-store:2184
-    - pulsarNode=pulsar-broker1
-  labels:
-    cluster: test
-    service: init
-  entryPoint: [bin/init-cluster.sh]
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.LogToTargetDirStopAction
-  networkMode: pulsarnet*
-
-bookkeeper1*:
-  image: apachepulsar/pulsar-test-latest-version:latest
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  env:
-    - clusterName=test
-    - zkServers=zookeeper
-    - useHostNameAsBookieID=true
-  labels:
-    cluster: test
-    service: bookie
-  entryPoint: [bin/run-bookie.sh]
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
-  networkMode: pulsarnet*
-
-bookkeeper2*:
-  image: apachepulsar/pulsar-test-latest-version:latest
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  env:
-    - clusterName=test
-    - zkServers=zookeeper
-    - useHostNameAsBookieID=true
-  labels:
-    cluster: test
-    service: bookie
-  entryPoint: [bin/run-bookie.sh]
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
-  networkMode: pulsarnet*
-
-pulsar-broker1*:
-  image: apachepulsar/pulsar-test-latest-version:latest
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  aliases:
-    - pulsar-broker1
-  env:
-    - zookeeperServers=zookeeper
-    - configurationStoreServers=configuration-store:2184
-    - clusterName=test
-    - NO_AUTOSTART=true
-  labels:
-    cluster: test
-    service: pulsar-broker
-  entryPoint: [bin/run-broker.sh]
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
-  networkMode: pulsarnet*
-
-pulsar-proxy*:
-  image: apachepulsar/pulsar-test-latest-version:latest
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  env:
-    - zookeeperServers=zookeeper
-    - configurationStoreServers=configuration-store:2184
-    - clusterName=test
-    - NO_AUTOSTART=true
-  labels:
-    cluster: test
-    service: pulsar-proxy
-  entryPoint: [bin/run-proxy.sh]
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
-  networkMode: pulsarnet*
-
-s3*:
-  ## use latest adobe/s3mock, for issue: https://github.com/adobe/S3Mock/issues/32
-  ## TODO: https://github.com/apache/incubator-pulsar/issues/2133
-  image: apachepulsar/s3mock
-  await:
-    strategy: org.apache.pulsar.tests.NoopAwaitStrategy
-  env:
-    - initialBuckets=pulsar-integtest
-  labels:
-    cluster: test
-    service: s3
-  beforeStop:
-    - customBeforeStopAction:
-        strategy: org.apache.pulsar.tests.LogToTargetDirStopAction
-  networkMode: pulsarnet*
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 653c076..7f084b0 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -140,6 +140,17 @@
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <!-- only run tests when -DintegrationTests is specified //-->
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 48c7fe6..40be9ba 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -62,7 +62,6 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase {
         container = new StandaloneContainer(clusterName)
             .withNetwork(network)
             .withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName);
-        container.tailContainerLog();
         container.start();
         log.info("Pulsar cluster {} is up running:", clusterName);
         log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl());
diff --git a/tests/pom.xml b/tests/pom.xml
index 074d4bc..5c410b1 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -34,6 +34,7 @@
   <modules>
     <module>docker-images</module>
     <module>integration</module>
+    <module>pulsar-kafka-compat-client-test</module>
   </modules>
   <build>
     <plugins>
diff --git a/tests/integration/pom.xml b/tests/pulsar-kafka-compat-client-test/pom.xml
similarity index 58%
copy from tests/integration/pom.xml
copy to tests/pulsar-kafka-compat-client-test/pom.xml
index 653c076..196f682 100644
--- a/tests/integration/pom.xml
+++ b/tests/pulsar-kafka-compat-client-test/pom.xml
@@ -28,37 +28,27 @@
     <version>2.2.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>integration</artifactId>
+  <artifactId>pulsar-kafka-compat-client-test</artifactId>
   <packaging>jar</packaging>
-  <name>Apache Pulsar :: Tests :: Integration</name>
+  <name>Apache Pulsar :: Tests :: Pulsar Kafka Compat Client Tests</name>
 
   <dependencies>
     <dependency>
-      <groupId>org.testng</groupId>
-      <artifactId>testng</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-functions-api-examples</artifactId>
+      <groupId>org.apache.pulsar.tests</groupId>
+      <artifactId>integration</artifactId>
       <version>${project.version}</version>
+      <type>test-jar</type>
       <scope>test</scope>
       <exclusions>
         <exclusion>
-          <groupId>org.apache.pulsar</groupId>
-          <artifactId>pulsar-client-schema</artifactId>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client</artifactId>
-      <version>${project.version}</version>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
@@ -69,71 +59,16 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
-      <artifactId>managed-ledger-original</artifactId>
+      <artifactId>pulsar-client-kafka</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.datastax.cassandra</groupId>
-      <artifactId>cassandra-driver-core</artifactId>
       <exclusions>
         <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty-handler</artifactId>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
         </exclusion>
       </exclusions>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.testcontainers</groupId>
-      <artifactId>kafka</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-io-kafka</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.testcontainers</groupId>
-      <artifactId>mysql</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>mysql</groupId>
-      <artifactId>mysql-connector-java</artifactId>
-      <version>${mysql-jdbc.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-io-jdbc</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.dataformat</groupId>
-      <artifactId>jackson-dataformat-yaml</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-  	  <groupId>org.elasticsearch.client</groupId>
-  	  <artifactId>elasticsearch-rest-high-level-client</artifactId>
-  	  <version>6.3.2</version>
-  	</dependency>
-
   </dependencies>
 
   <build>
@@ -177,9 +112,6 @@
               -Dio.netty.leakDetectionLevel=advanced
               </argLine>
               <skipTests>false</skipTests>
-              <suiteXmlFiles>
-                <file>src/test/resources/pulsar.xml</file>
-              </suiteXmlFiles>
               <forkCount>1</forkCount>
             </configuration>
           </plugin>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
similarity index 88%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
rename to tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
index 3dd8940..fe2a4d9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
@@ -51,25 +51,32 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
 import org.testng.annotations.Test;
 
-@Test(enabled = false)
 @Slf4j
-public class KafkaApiTest extends PulsarTestSuite {
+public class KafkaApiTest extends PulsarStandaloneTestSuite {
+
+    private static String getPlainTextServiceUrl() {
+        return container.getPlainTextServiceUrl();
+    }
+
+    private static String getHttpServiceUrl() {
+        return container.getHttpServiceUrl();
+    }
 
     @Test(timeOut = 30000)
     public void testSimpleProducerConsumer() throws Exception {
         String topic = "persistent://public/default/testSimpleProducerConsumer";
 
         Properties producerProperties = new Properties();
-        producerProperties.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
         producerProperties.put("key.serializer", IntegerSerializer.class.getName());
         producerProperties.put("value.serializer", StringSerializer.class.getName());
         Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
 
         Properties consumerProperties = new Properties();
-        consumerProperties.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        consumerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
         consumerProperties.put("group.id", "my-subscription-name");
         consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
         consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
@@ -110,16 +117,20 @@ public class KafkaApiTest extends PulsarTestSuite {
         String topic = "testSimpleConsumer";
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
         props.put("group.id", "my-subscription-name");
         props.put("enable.auto.commit", "false");
         props.put("key.deserializer", StringDeserializer.class.getName());
         props.put("value.deserializer", StringDeserializer.class.getName());
 
+        @Cleanup
         Consumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(topic));
 
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
+
+        @Cleanup
         org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
 
         for (int i = 0; i < 10; i++) {
@@ -129,17 +140,21 @@ public class KafkaApiTest extends PulsarTestSuite {
         AtomicInteger received = new AtomicInteger();
         while (received.get() < 10) {
             ConsumerRecords<String, String> records = consumer.poll(100);
-            records.forEach(record -> {
-                assertEquals(record.key(), Integer.toString(received.get()));
-                assertEquals(record.value(), "hello-" + received.get());
-
-                received.incrementAndGet();
-            });
-
-            consumer.commitSync();
+            if (!records.isEmpty()) {
+                records.forEach(record -> {
+                    String key = Integer.toString(received.get());
+                    String value = "hello-" + received.get();
+                    log.info("Receive record : key = {}, value = {}, topic = {}, ptn = {}",
+                        key, value, record.topic(), record.partition());
+                    assertEquals(record.key(), key);
+                    assertEquals(record.value(), value);
+
+                    received.incrementAndGet();
+                });
+
+                consumer.commitSync();
+            }
         }
-
-        consumer.close();
     }
 
     @Test
@@ -147,7 +162,7 @@ public class KafkaApiTest extends PulsarTestSuite {
         String topic = "testConsumerAutoCommit";
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
         props.put("group.id", "my-subscription-name");
         props.put("enable.auto.commit", "true");
         props.put("key.deserializer", StringDeserializer.class.getName());
@@ -157,7 +172,7 @@ public class KafkaApiTest extends PulsarTestSuite {
         consumer.subscribe(Arrays.asList(topic));
 
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
         org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
 
         for (int i = 0; i < 10; i++) {
@@ -190,7 +205,7 @@ public class KafkaApiTest extends PulsarTestSuite {
         String topic = "testConsumerManualOffsetCommit";
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
         props.put("group.id", "my-subscription-name");
         props.put("enable.auto.commit", "false");
         props.put("key.deserializer", StringDeserializer.class.getName());
@@ -200,7 +215,7 @@ public class KafkaApiTest extends PulsarTestSuite {
         consumer.subscribe(Arrays.asList(topic));
 
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
         org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
 
         for (int i = 0; i < 10; i++) {
@@ -240,18 +255,18 @@ public class KafkaApiTest extends PulsarTestSuite {
 
         // Create 8 partitions in topic
         @Cleanup
-        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
         admin.topics().createPartitionedTopic(topic, 8);
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
         props.put("group.id", "my-subscription-name");
         props.put("enable.auto.commit", "true");
         props.put("key.deserializer", StringDeserializer.class.getName());
         props.put("value.deserializer", StringDeserializer.class.getName());
 
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
         org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic)
                 .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition).create();
 
@@ -287,10 +302,10 @@ public class KafkaApiTest extends PulsarTestSuite {
 
     @Test
     public void testConsumerSeek() throws Exception {
-        String topic = "testSimpleConsumer";
+        String topic = "testConsumerSeek";
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
         props.put("group.id", "my-subscription-name");
         props.put("enable.auto.commit", "false");
         props.put("key.deserializer", StringDeserializer.class.getName());
@@ -301,7 +316,7 @@ public class KafkaApiTest extends PulsarTestSuite {
         consumer.subscribe(Arrays.asList(topic));
 
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
         org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
 
         for (int i = 0; i < 10; i++) {
@@ -344,10 +359,10 @@ public class KafkaApiTest extends PulsarTestSuite {
 
     @Test
     public void testConsumerSeekToEnd() throws Exception {
-        String topic = "testSimpleConsumer";
+        String topic = "testConsumerSeekToEnd";
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
         props.put("group.id", "my-subscription-name");
         props.put("enable.auto.commit", "false");
         props.put("key.deserializer", StringDeserializer.class.getName());
@@ -358,7 +373,7 @@ public class KafkaApiTest extends PulsarTestSuite {
         consumer.subscribe(Arrays.asList(topic));
 
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
         org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
 
         for (int i = 0; i < 10; i++) {
@@ -399,13 +414,13 @@ public class KafkaApiTest extends PulsarTestSuite {
         String topic = "testSimpleProducer";
 
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
         org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("my-subscription")
                 .subscribe();
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
 
         props.put("key.serializer", IntegerSerializer.class.getName());
         props.put("value.serializer", StringSerializer.class.getName());
@@ -431,14 +446,14 @@ public class KafkaApiTest extends PulsarTestSuite {
         String topic = "testProducerCallback";
 
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
         org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer()
                 .topic(topic)
                 .subscriptionName("my-subscription")
                 .subscribe();
 
         Properties props = new Properties();
-        props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+        props.put("bootstrap.servers", getPlainTextServiceUrl());
 
         props.put("key.serializer", IntegerSerializer.class.getName());
         props.put("value.serializer", StringSerializer.class.getName());