You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/08 05:26:15 UTC

[pulsar] branch master updated: Fix kafka_0_9 Consumer partition topic throw error (#7179)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8ab1a2  Fix kafka_0_9 Consumer partition topic throw error (#7179)
d8ab1a2 is described below

commit d8ab1a24b36143c91bf483116acc9b74e1a23f3a
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Mon Jun 8 13:26:04 2020 +0800

    Fix kafka_0_9 Consumer partition topic throw error (#7179)
    
    Master Issue: #7178
    ## Motivation
    
    When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,
    because
    ```
    org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
    ```
    ## Modifications
    BrokerService
    ```java
     public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
            List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
    
            List<TopicPartition> topicPartitions = new ArrayList<>();
            try {
                for (String topic : topics) {
                    // Create individual subscription on each partition, that way we can keep using the
                    // acknowledgeCumulative()
                    int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
    
                    ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
                    consumerBuilder.subscriptionType(SubscriptionType.Failover);
                    consumerBuilder.messageListener(this);
                    consumerBuilder.subscriptionName(groupId);
                    //consumerBuilder.topics(topics);
                    if (numberOfPartitions > 1) {
                        // Subscribe to each partition
                        consumerBuilder.consumerName(ConsumerName.generateRandomName());
                        for (int i = 0; i < numberOfPartitions; i++) {
                            String partitionName = TopicName.get(topic).getPartition(i).toString();
                            CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                                    .topic(partitionName).subscribeAsync();
                            int partitionIndex = i;
                            TopicPartition tp = new TopicPartition(
                                    TopicName.get(topic).getPartitionedTopicName(),
                                    partitionIndex);
                            futures.add(future.thenApply(consumer -> {
                                log.info("Add consumer {} for partition {}", consumer, tp);
                                consumers.putIfAbsent(tp, consumer);
                                return consumer;
                            }));
                            topicPartitions.add(tp);
    ```
    We should remove consumerBuilder.topics(topics)。
---
 .../main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java | 1 -
 1 file changed, 1 deletion(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 6d3c383..0a3526b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -218,7 +218,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 consumerBuilder.subscriptionType(SubscriptionType.Failover);
                 consumerBuilder.messageListener(this);
                 consumerBuilder.subscriptionName(groupId);
-                consumerBuilder.topics(topics);
                 if (numberOfPartitions > 1) {
                     // Subscribe to each partition
                     consumerBuilder.consumerName(ConsumerName.generateRandomName());