You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/05 06:48:20 UTC

[GitHub] [pulsar] liudezhi2098 opened a new pull request #7179: fix kafka_0_9 Consumer partition topic throw error

liudezhi2098 opened a new pull request #7179:
URL: https://github.com/apache/pulsar/pull/7179


   Master Issue: #7178
   ## Motivation
   
   When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,  
   because 
   ```
   org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
   ```
   ## Modifications
   BrokerService
   ```java
    public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
           List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
   
           List<TopicPartition> topicPartitions = new ArrayList<>();
           try {
               for (String topic : topics) {
                   // Create individual subscription on each partition, that way we can keep using the
                   // acknowledgeCumulative()
                   int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
   
                   ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
                   consumerBuilder.subscriptionType(SubscriptionType.Failover);
                   consumerBuilder.messageListener(this);
                   consumerBuilder.subscriptionName(groupId);
                   //consumerBuilder.topics(topics);
                   if (numberOfPartitions > 1) {
                       // Subscribe to each partition
                       consumerBuilder.consumerName(ConsumerName.generateRandomName());
                       for (int i = 0; i < numberOfPartitions; i++) {
                           String partitionName = TopicName.get(topic).getPartition(i).toString();
                           CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                                   .topic(partitionName).subscribeAsync();
                           int partitionIndex = i;
                           TopicPartition tp = new TopicPartition(
                                   TopicName.get(topic).getPartitionedTopicName(),
                                   partitionIndex);
                           futures.add(future.thenApply(consumer -> {
                               log.info("Add consumer {} for partition {}", consumer, tp);
                               consumers.putIfAbsent(tp, consumer);
                               return consumer;
                           }));
                           topicPartitions.add(tp);
   ```
   We should remove consumerBuilder.topics(topics)。


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #7179: fix kafka_0_9 Consumer partition topic throw error

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #7179:
URL: https://github.com/apache/pulsar/pull/7179#issuecomment-640312028


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7179: fix kafka_0_9 Consumer partition topic throw error

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7179:
URL: https://github.com/apache/pulsar/pull/7179


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org