You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/09 11:55:17 UTC

[GitHub] [pulsar-adapters] casuallc opened a new issue, #38: kafka adaptor can not handle non-partitioned topic

casuallc opened a new issue, #38:
URL: https://github.com/apache/pulsar-adapters/issues/38

   **Reproduce**
   - create non-paritioned topic
   - send message to this topic
   
   **error**
   ![image](https://user-images.githubusercontent.com/9473606/183640375-517bcea0-fa50-48af-8881-9696fab6d558.png)
   
   **probable reason**
   PulsarKafkaConsumer -> poll
   ```
   public ConsumerRecords<K, V> poll(long timeoutMillis) {
           try {
               QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
               if (item == null) {
                   return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
               }
   
               Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
   
               int numberOfRecords = 0;
   
               while (item != null) {
                   TopicName topicName = TopicName.get(item.consumer.getTopic());
                   String topic = topicName.getPartitionedTopicName();
                   int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                   Message<byte[]> msg = item.message;
                   MessageId msgId = msg.getMessageId();
                   if (msgId instanceof TopicMessageIdImpl) {
                       msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId();
                   }
                   long offset = MessageIdUtils.getOffset(msgId);
   
                   TopicPartition tp = new TopicPartition(topic, partition);
                   if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                   	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                   	resetOffsets(tp);
                   }
   
                  // .. other code
   
               // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
               return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           }
       }
   ```
   **int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;**
   This code can not discriminate partitioned-topic or non-paritioned-topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-adapters] casuallc commented on issue #38: kafka adaptor can not handle non-partitioned topic

Posted by GitBox <gi...@apache.org>.
casuallc commented on issue #38:
URL: https://github.com/apache/pulsar-adapters/issues/38#issuecomment-1210083991

   #37 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org