You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/08 05:26:15 UTC
[pulsar] branch master updated: Fix kafka_0_9 Consumer partition
topic throw error (#7179)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d8ab1a2 Fix kafka_0_9 Consumer partition topic throw error (#7179)
d8ab1a2 is described below
commit d8ab1a24b36143c91bf483116acc9b74e1a23f3a
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Mon Jun 8 13:26:04 2020 +0800
Fix kafka_0_9 Consumer partition topic throw error (#7179)
Master Issue: #7178
## Motivation
When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,
because
```
org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
```
## Modifications
BrokerService
```java
public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
List<TopicPartition> topicPartitions = new ArrayList<>();
try {
for (String topic : topics) {
// Create individual subscription on each partition, that way we can keep using the
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
//consumerBuilder.topics(topics);
if (numberOfPartitions > 1) {
// Subscribe to each partition
consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
TopicName.get(topic).getPartitionedTopicName(),
partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
return consumer;
}));
topicPartitions.add(tp);
```
We should remove consumerBuilder.topics(topics)。
---
.../main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 6d3c383..0a3526b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -218,7 +218,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
- consumerBuilder.topics(topics);
if (numberOfPartitions > 1) {
// Subscribe to each partition
consumerBuilder.consumerName(ConsumerName.generateRandomName());