You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/02/03 20:35:32 UTC

[GitHub] [pinot] mathieudruart commented on pull request #8017: Fetch Pulsar offsets from Consumer interface instead of Reader

mathieudruart commented on pull request #8017:
URL: https://github.com/apache/pinot/pull/8017#issuecomment-1029377970


   Hi @KKcorps 
   
   We have tested your PR and it seems to miss messages, it seems to have an issue in the method **getNextStreamParitionMsgOffsetAtIndex** : if the message is part of a Pulsar batch (BatchMessageIdImpl), you add +1 to the entry id every time, that doesn't seem to be correct because in fact the next message id will have only the batch index incremented with the same entry id (all messages inside a Pulsar batch share the same entry id). 
   We tried with this version of the method, and it seems to get all messages correctly :
   
   ```
     @Override
     public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
       MessageIdImpl currentMessageId = MessageIdImpl.convertToMessageIdImpl(_messageList.get(index).getMessageId());
       MessageId nextMessageId;
       
       long currentLedgerId = currentMessageId.getLedgerId();
       long currentEntryId = currentMessageId.getEntryId();
       int currentPartitionIndex = currentMessageId.getPartitionIndex();
       
       if (currentMessageId instanceof BatchMessageIdImpl) {
         int currentBatchIndex = ((BatchMessageIdImpl) currentMessageId).getBatchIndex();
         int currentBatchSize = ((BatchMessageIdImpl) currentMessageId).getBatchSize();
         
         if (currentBatchIndex < currentBatchSize - 1) {
           nextMessageId = new BatchMessageIdImpl(currentLedgerId, currentEntryId,
                   currentPartitionIndex, currentBatchIndex + 1, currentBatchSize, 
                   ((BatchMessageIdImpl) currentMessageId).getAcker());
         } else {
           nextMessageId = new BatchMessageIdImpl(currentLedgerId, currentEntryId + 1,
                   currentPartitionIndex, 0, currentBatchSize, ((BatchMessageIdImpl) currentMessageId).getAcker());
         }
       } else {
         nextMessageId =
                 DefaultImplementation.newMessageId(currentLedgerId, currentEntryId + 1,
                         currentPartitionIndex);
       }
       return new MessageIdStreamOffset(nextMessageId);
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org