You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/03/22 12:15:00 UTC

[jira] [Work logged] (CAMEL-13339) Partition revoke implemented to save offset state using KafkaConsumer.position API results in message loss

     [ https://issues.apache.org/jira/browse/CAMEL-13339?focusedWorklogId=217202&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-217202 ]

ASF GitHub Bot logged work on CAMEL-13339:
------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Mar/19 12:14
            Start Date: 22/Mar/19 12:14
    Worklog Time Spent: 10m 
      Work Description: viswaramamoorthy commented on pull request #2838: CAMEL-13339 - Fix to use last processed offset maintained by Camel KafkaConsumer to avoid message loss upon partition revoke
URL: https://github.com/apache/camel/pull/2838
 
 
   In message processing loop introduced saved last offset processed in map and used the last processed offset from the map when partition revoke called by Kafka broker
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 217202)
            Time Spent: 10m
    Remaining Estimate: 0h

> Partition revoke implemented to save offset state using KafkaConsumer.position API results in message loss
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-13339
>                 URL: https://issues.apache.org/jira/browse/CAMEL-13339
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.23.0
>            Reporter: Viswa Ramamoorthy
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Current implementation of org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords's onPartitionsRevoked, uses org.apache.kafka.clients.consumer.KafkaConsumer.position(partition). This approach causes message loss when multiple processes listening to same topic for point to point messaging (like a QUEUE) type implementation.
>  
> Issue is noticed when partition gets assigned and then gets revoked in quick succession. Upon partition assignment, say at the beginning of processing offset is set to 0, and say there was no poll for this partition (may be due to earlier poll brought in bunch of records and they are still being processed). Subsequently, say partition got revoked, before polling.
> In this case, as onPartitionsRevoked looks at org.apache.kafka.clients.consumer.KafkaConsumer.position(partition) to save offset state and so 0 gets saved in this case in StateRepository implementation. When the same partition get assigned again, StateRepository.getState returns 0. Since Camel KafkaConsumer treats this as last processed offset, it adds 1 to it moving pointer to offset 1. Because of this, message at offset 0 never gets processed.
>  
> Two fixes might be needed
>  # a) onPartitionsRevoked should look at last processed offset (possibly store 'last processed offset' for each topic/partition in a memory map) and use it to save offset
>  # b) Currently onPartitionsRevoked just saves offset state when an implementation of StateRepository configured. Ideally it should call KafkaFetchRecords.commitOffset so commitSync call goes through when partition revoked and no StateRepository implementation configured



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)