You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lucas Ariel Martinez (JIRA)" <ji...@apache.org> on 2016/07/19 09:36:20 UTC

[jira] [Commented] (KAFKA-2894) WorkerSinkTask doesn't handle rewinding offsets on rebalance

    [ https://issues.apache.org/jira/browse/KAFKA-2894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15383859#comment-15383859 ] 

Lucas Ariel Martinez commented on KAFKA-2894:
---------------------------------------------

I apologize if I am asking in the wrong place, but I believe this issue could solve the problem I am having, and which I’ll explain in a bit.
First, how do connectors actually manage their own offsets? I tried creating a sink connector extending SinkTask, and:

1) I tried assigning a Map of <TopicPartition, Offset> to the context obtained in initialize() by doing context.offset(offsetMap). But this results in an exception “No current assignment for partition…” because the partitions were not assigned yet.

2) I tried doing the same in the open() method by saving the context from initialize() in a variable and assigning to it a map built with the Collection<TopicPartition> parameter. The result was to fetch all the messages from the last kafka committed offset, plus again, all the messages from the offsets I assigned manually to the partitions!
I also tried setting a worker property “auto.offset.reset=latest” as a workaround, but apparently it is not supported and the default value of earliest was used anyway.

Am I wrong to think that rewinding offsets on rebalance would solve this issue and I would be able to manually assign offsets on the open() method?


> WorkerSinkTask doesn't handle rewinding offsets on rebalance
> ------------------------------------------------------------
>
>                 Key: KAFKA-2894
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2894
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.0
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Liquan Pei
>            Priority: Blocker
>             Fix For: 0.10.1.0
>
>
> rewind() is only invoked at the beginning of each poll(). This means that if a rebalance occurs in the poll, it's feasible to get data that doesn't match a request to change offsets during the rebalance. I think the consumer will hold on to consumer data across the rebalance if it is reassigned the same offset, so there may already be data ready to be delivered. Additionally we may already have data in an incomplete messageBatch that should be discarded when the rewind is requested.
> While connectors that care about this (i.e. ones that manage their own offsets) can handle this correctly by tracking the offsets they're expecting to see, it's a hassle, error prone, an pretty unintuitive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)