You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@twill.apache.org by "Chengfeng Mao (JIRA)" <ji...@apache.org> on 2016/11/19 00:19:58 UTC

[jira] [Updated] (TWILL-199) Return next offset and handle offset error in KafkaConsumer.MessageCallback

     [ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chengfeng Mao updated TWILL-199:
--------------------------------
    Summary: Return next offset and handle offset error in KafkaConsumer.MessageCallback  (was: Support inconsistent Kafka offset correction in KafkaConsumer.MessageCallback)

> Return next offset and handle offset error in KafkaConsumer.MessageCallback
> ---------------------------------------------------------------------------
>
>                 Key: TWILL-199
>                 URL: https://issues.apache.org/jira/browse/TWILL-199
>             Project: Apache Twill
>          Issue Type: Improvement
>            Reporter: Chengfeng Mao
>
> Offsets of Kafka messages may not always be consistent. They can change when switching to a new Kafka instance while the messages' content remains the same. Therefore, we need a way to check the inconsistent offsets and correct them. An instance of a new class {{KafkaOffsetProvider}} can be added to the parameter of {{KafkaConsumer.MessageCallback}} for this purpose. For backward compatibility, if this instance is not provided, its default value will be null and no check for inconsistent offset will be done.
> The new class {{KafkaOffsetProvider}} requires the timestamp of the message to be processed. It provides a method to verify whether the message fetched with the given offset matches the intended one by checking whether the fetched message's timestamp is equal to the expected timestamp. If not, this method returns the offset of the first message found with the expected timestamp.
> Method {{void onReceived(Iterator<FetchedMessage> messages)}} in KafkaConsumer.MessageCallback should be changed to {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can call call the method in {{KafkaOffsetProvider}} to get the correct offset to start fetching messages with, when the original starting offset does not exist or the message given doesn't match the timestamp stored in KafkaOffsetProvider . If neither of the offset mismatches happen or the {{KafkaOffsetProvider}} instance is null, there's no check for inconsistent offset and this method will just return null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)