You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/08 19:24:08 UTC
[pulsar] branch master updated: [clients][kafka] Fix topic name &
race condition on kafka wrapper (#2746)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dd4f71d [clients][kafka] Fix topic name & race condition on kafka wrapper (#2746)
dd4f71d is described below
commit dd4f71d9572313e26674209cf1ae2d1bc0611f6d
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Oct 8 12:24:04 2018 -0700
[clients][kafka] Fix topic name & race condition on kafka wrapper (#2746)
*Motivation*
Current PulsarKafkaConsumer has following defects:
- topic name is used inconsistently for keeping different mapping. we should always use fully qualified topic name as keys for keeping mappings.
- seek should clean up offset maps
- poll logic can potentially pop a message but not deliver it to the client
*Changes*
- Fixes those issues in kafka wrapper
- Enable kafka integration tests and use standalone test suite
- Enable kafka client logging
---
buildtools/src/main/resources/log4j2.xml | 1 +
.../clients/consumer/PulsarKafkaConsumer.java | 37 +++--
...luster-2-bookie-1-broker-unstarted-with-s3.yaml | 164 ---------------------
tests/integration/pom.xml | 11 ++
.../topologies/PulsarStandaloneTestBase.java | 1 -
tests/pom.xml | 1 +
.../pom.xml | 92 ++----------
.../integration/compat/kafka/KafkaApiTest.java | 83 ++++++-----
8 files changed, 102 insertions(+), 288 deletions(-)
diff --git a/buildtools/src/main/resources/log4j2.xml b/buildtools/src/main/resources/log4j2.xml
index 85a7c1e..a658b55 100644
--- a/buildtools/src/main/resources/log4j2.xml
+++ b/buildtools/src/main/resources/log4j2.xml
@@ -31,5 +31,6 @@
</Root>
<Logger name="org.apache.pulsar" level="info"/>
<Logger name="org.apache.bookkeeper" level="info"/>
+ <Logger name="org.apache.kafka" level="info"/>
</Loggers>
</Configuration>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 0ce91c2..cf1e716 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -62,6 +63,7 @@ import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+@Slf4j
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
private static final long serialVersionUID = 1L;
@@ -216,18 +218,28 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
- TopicPartition tp = new TopicPartition(topic, partitionIndex);
- future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
- futures.add(future);
+ TopicPartition tp = new TopicPartition(
+ TopicName.get(topic).getPartitionedTopicName(),
+ partitionIndex);
+ futures.add(future.thenApply(consumer -> {
+ log.info("Add consumer {} for partition {}", consumer, tp);
+ consumers.putIfAbsent(tp, consumer);
+ return consumer;
+ }));
topicPartitions.add(tp);
}
} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
- TopicPartition tp = new TopicPartition(topic, 0);
- future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
- futures.add(future);
+ TopicPartition tp = new TopicPartition(
+ TopicName.get(topic).getPartitionedTopicName(),
+ 0);
+ futures.add(future.thenApply(consumer -> {
+ log.info("Add consumer {} for partition {}", consumer, tp);
+ consumers.putIfAbsent(tp, consumer);
+ return consumer;
+ }));
topicPartitions.add(tp);
}
}
@@ -290,7 +302,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
int numberOfRecords = 0;
- while (item != null && ++numberOfRecords < MAX_RECORDS_IN_SINGLE_POLL) {
+ while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
@@ -320,11 +332,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
// Update last offset seen by application
lastReceivedOffset.put(tp, offset);
+ if (++numberOfRecords < MAX_RECORDS_IN_SINGLE_POLL) {
+ break;
+ }
+
// Check if we have an item already available
item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
}
- if (isAutoCommit) {
+ if (isAutoCommit && !records.isEmpty()) {
// Commit the offset of previously dequeued messages
commitAsync();
}
@@ -395,7 +411,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
-
lastCommittedOffset.put(topicPartition, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
});
@@ -435,6 +450,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
if (partitions.isEmpty()) {
partitions = consumers.keySet();
}
+ lastCommittedOffset.clear();
+ lastReceivedOffset.clear();
for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
@@ -456,6 +473,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
if (partitions.isEmpty()) {
partitions = consumers.keySet();
}
+ lastCommittedOffset.clear();
+ lastReceivedOffset.clear();
for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
deleted file mode 100644
index ec46571..0000000
--- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
+++ /dev/null
@@ -1,164 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-networks:
- pulsarnet*:
- driver: bridge
-
-zookeeper*:
- image: apachepulsar/pulsar-test-latest-version:latest
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env: [ZOOKEEPER_SERVERS=zookeeper]
- labels:
- cluster: test
- service: zookeeper
- entryPoint: [bin/run-local-zk.sh]
- aliases:
- - zookeeper
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.ZKJournalToTargetDirStopAction
- networkMode: pulsarnet*
-
-configuration-store*:
- image: apachepulsar/pulsar-test-latest-version:latest
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env: [ZOOKEEPER_SERVERS=configuration-store]
- labels:
- cluster: test
- service: configuration-store
- entryPoint: [bin/run-global-zk.sh]
- aliases:
- - configuration-store
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.ZKJournalToTargetDirStopAction
- networkMode: pulsarnet*
-
-init*:
- image: apachepulsar/pulsar-test-latest-version:latest
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env:
- - clusterName=test
- - zkServers=zookeeper
- - configurationStore=configuration-store:2184
- - pulsarNode=pulsar-broker1
- labels:
- cluster: test
- service: init
- entryPoint: [bin/init-cluster.sh]
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.LogToTargetDirStopAction
- networkMode: pulsarnet*
-
-bookkeeper1*:
- image: apachepulsar/pulsar-test-latest-version:latest
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env:
- - clusterName=test
- - zkServers=zookeeper
- - useHostNameAsBookieID=true
- labels:
- cluster: test
- service: bookie
- entryPoint: [bin/run-bookie.sh]
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
- networkMode: pulsarnet*
-
-bookkeeper2*:
- image: apachepulsar/pulsar-test-latest-version:latest
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env:
- - clusterName=test
- - zkServers=zookeeper
- - useHostNameAsBookieID=true
- labels:
- cluster: test
- service: bookie
- entryPoint: [bin/run-bookie.sh]
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
- networkMode: pulsarnet*
-
-pulsar-broker1*:
- image: apachepulsar/pulsar-test-latest-version:latest
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- aliases:
- - pulsar-broker1
- env:
- - zookeeperServers=zookeeper
- - configurationStoreServers=configuration-store:2184
- - clusterName=test
- - NO_AUTOSTART=true
- labels:
- cluster: test
- service: pulsar-broker
- entryPoint: [bin/run-broker.sh]
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
- networkMode: pulsarnet*
-
-pulsar-proxy*:
- image: apachepulsar/pulsar-test-latest-version:latest
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env:
- - zookeeperServers=zookeeper
- - configurationStoreServers=configuration-store:2184
- - clusterName=test
- - NO_AUTOSTART=true
- labels:
- cluster: test
- service: pulsar-proxy
- entryPoint: [bin/run-proxy.sh]
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
- networkMode: pulsarnet*
-
-s3*:
- ## use latest adobe/s3mock, for issue: https://github.com/adobe/S3Mock/issues/32
- ## TODO: https://github.com/apache/incubator-pulsar/issues/2133
- image: apachepulsar/s3mock
- await:
- strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env:
- - initialBuckets=pulsar-integtest
- labels:
- cluster: test
- service: s3
- beforeStop:
- - customBeforeStopAction:
- strategy: org.apache.pulsar.tests.LogToTargetDirStopAction
- networkMode: pulsarnet*
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 653c076..7f084b0 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -140,6 +140,17 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- only run tests when -DintegrationTests is specified //-->
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 48c7fe6..40be9ba 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -62,7 +62,6 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase {
container = new StandaloneContainer(clusterName)
.withNetwork(network)
.withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName);
- container.tailContainerLog();
container.start();
log.info("Pulsar cluster {} is up running:", clusterName);
log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl());
diff --git a/tests/pom.xml b/tests/pom.xml
index 074d4bc..5c410b1 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>docker-images</module>
<module>integration</module>
+ <module>pulsar-kafka-compat-client-test</module>
</modules>
<build>
<plugins>
diff --git a/tests/integration/pom.xml b/tests/pulsar-kafka-compat-client-test/pom.xml
similarity index 58%
copy from tests/integration/pom.xml
copy to tests/pulsar-kafka-compat-client-test/pom.xml
index 653c076..196f682 100644
--- a/tests/integration/pom.xml
+++ b/tests/pulsar-kafka-compat-client-test/pom.xml
@@ -28,37 +28,27 @@
<version>2.2.0-SNAPSHOT</version>
</parent>
- <artifactId>integration</artifactId>
+ <artifactId>pulsar-kafka-compat-client-test</artifactId>
<packaging>jar</packaging>
- <name>Apache Pulsar :: Tests :: Integration</name>
+ <name>Apache Pulsar :: Tests :: Pulsar Kafka Compat Client Tests</name>
<dependencies>
<dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-functions-api-examples</artifactId>
+ <groupId>org.apache.pulsar.tests</groupId>
+ <artifactId>integration</artifactId>
<version>${project.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-schema</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- <version>${project.version}</version>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -69,71 +59,16 @@
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
- <artifactId>managed-ledger-original</artifactId>
+ <artifactId>pulsar-client-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-core</artifactId>
<exclusions>
<exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io-kafka</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>mysql</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql-jdbc.version}</version>
- <scope>runtime</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io-jdbc</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>6.3.2</version>
- </dependency>
-
</dependencies>
<build>
@@ -177,9 +112,6 @@
-Dio.netty.leakDetectionLevel=advanced
</argLine>
<skipTests>false</skipTests>
- <suiteXmlFiles>
- <file>src/test/resources/pulsar.xml</file>
- </suiteXmlFiles>
<forkCount>1</forkCount>
</configuration>
</plugin>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
similarity index 88%
rename from tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
rename to tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
index 3dd8940..fe2a4d9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
+++ b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/KafkaApiTest.java
@@ -51,25 +51,32 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.testng.annotations.Test;
-@Test(enabled = false)
@Slf4j
-public class KafkaApiTest extends PulsarTestSuite {
+public class KafkaApiTest extends PulsarStandaloneTestSuite {
+
+ private static String getPlainTextServiceUrl() {
+ return container.getPlainTextServiceUrl();
+ }
+
+ private static String getHttpServiceUrl() {
+ return container.getHttpServiceUrl();
+ }
@Test(timeOut = 30000)
public void testSimpleProducerConsumer() throws Exception {
String topic = "persistent://public/default/testSimpleProducerConsumer";
Properties producerProperties = new Properties();
- producerProperties.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
producerProperties.put("key.serializer", IntegerSerializer.class.getName());
producerProperties.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(producerProperties);
Properties consumerProperties = new Properties();
- consumerProperties.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ consumerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
consumerProperties.put("group.id", "my-subscription-name");
consumerProperties.put("key.deserializer", IntegerDeserializer.class.getName());
consumerProperties.put("value.deserializer", StringDeserializer.class.getName());
@@ -110,16 +117,20 @@ public class KafkaApiTest extends PulsarTestSuite {
String topic = "testSimpleConsumer";
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
+ @Cleanup
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
+
+ @Cleanup
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
@@ -129,17 +140,21 @@ public class KafkaApiTest extends PulsarTestSuite {
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, String> records = consumer.poll(100);
- records.forEach(record -> {
- assertEquals(record.key(), Integer.toString(received.get()));
- assertEquals(record.value(), "hello-" + received.get());
-
- received.incrementAndGet();
- });
-
- consumer.commitSync();
+ if (!records.isEmpty()) {
+ records.forEach(record -> {
+ String key = Integer.toString(received.get());
+ String value = "hello-" + received.get();
+ log.info("Receive record : key = {}, value = {}, topic = {}, ptn = {}",
+ key, value, record.topic(), record.partition());
+ assertEquals(record.key(), key);
+ assertEquals(record.value(), value);
+
+ received.incrementAndGet();
+ });
+
+ consumer.commitSync();
+ }
}
-
- consumer.close();
}
@Test
@@ -147,7 +162,7 @@ public class KafkaApiTest extends PulsarTestSuite {
String topic = "testConsumerAutoCommit";
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
@@ -157,7 +172,7 @@ public class KafkaApiTest extends PulsarTestSuite {
consumer.subscribe(Arrays.asList(topic));
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
@@ -190,7 +205,7 @@ public class KafkaApiTest extends PulsarTestSuite {
String topic = "testConsumerManualOffsetCommit";
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
@@ -200,7 +215,7 @@ public class KafkaApiTest extends PulsarTestSuite {
consumer.subscribe(Arrays.asList(topic));
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
@@ -240,18 +255,18 @@ public class KafkaApiTest extends PulsarTestSuite {
// Create 8 partitions in topic
@Cleanup
- PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build();
admin.topics().createPartitionedTopic(topic, 8);
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic)
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition).create();
@@ -287,10 +302,10 @@ public class KafkaApiTest extends PulsarTestSuite {
@Test
public void testConsumerSeek() throws Exception {
- String topic = "testSimpleConsumer";
+ String topic = "testConsumerSeek";
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
@@ -301,7 +316,7 @@ public class KafkaApiTest extends PulsarTestSuite {
consumer.subscribe(Arrays.asList(topic));
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
@@ -344,10 +359,10 @@ public class KafkaApiTest extends PulsarTestSuite {
@Test
public void testConsumerSeekToEnd() throws Exception {
- String topic = "testSimpleConsumer";
+ String topic = "testConsumerSeekToEnd";
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
@@ -358,7 +373,7 @@ public class KafkaApiTest extends PulsarTestSuite {
consumer.subscribe(Arrays.asList(topic));
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<byte[]> pulsarProducer = pulsarClient.newProducer().topic(topic).create();
for (int i = 0; i < 10; i++) {
@@ -399,13 +414,13 @@ public class KafkaApiTest extends PulsarTestSuite {
String topic = "testSimpleProducer";
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
@@ -431,14 +446,14 @@ public class KafkaApiTest extends PulsarTestSuite {
String topic = "testProducerCallback";
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
- props.put("bootstrap.servers", pulsarCluster.getPlainTextServiceUrl());
+ props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());