You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "avinash-fk (via GitHub)" <gi...@apache.org> on 2023/05/04 08:51:42 UTC

[GitHub] [pulsar-adapters] avinash-fk opened a new issue, #48: Bug in doCommitOffsets of PulsarKafkaConsumer [Ver: 2.11.0]

avinash-fk opened a new issue, #48:
URL: https://github.com/apache/pulsar-adapters/issues/48

   **Describe the bug**
   
   The 2.11.0 version of  pulsar-client-kafka-compat has bug that failes commitSync always. This seems to have been introduced in this PR - https://github.com/apache/pulsar-adapters/pull/37.
   
   ```java
   if (consumer instanceof MultiTopicsConsumerImpl) {
                   msgId = new TopicMessageIdImpl(topicPartition.topic(), tp.topic(), msgId);
   }
   ``` 
   
   Now in MultiTopicsConsumerImpl map of the consumer with key as a topic for non-partitioned topic and topicPartitionName for the partitioned topic. 
   
   TopicMessageIdImpl takes the partition name as the first argument, but here topic name is passed. In the case of the partitioned topic, the value should have been suffixed with -partition-<parition-number>. But what is passed is just the topic name for which there is no consumer created in MultiTopicConsumerImpl resulting 
   
   ```java
   Caused by: org.apache.pulsar.client.api.PulsarClientException$NotConnectedException: Not connected to broker
   	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doAcknowledge(MultiTopicsConsumerImpl.java:503)
   	at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:650)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:616)
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:601)
   	at org.apache.kafka.clients.consumer.PulsarKafkaConsumer.lambda$doCommitOffsets$9(PulsarKafkaConsumer.java:526)
   	at java.util.Collections$SingletonMap.forEach(Collections.java:4912)
   	at org.apache.kafka.clients.consumer.PulsarKafkaConsumer.doCommitOffsets(PulsarKafkaConsumer.java:517)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org