You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2020/06/16 00:41:00 UTC

[jira] [Commented] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

    [ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136215#comment-17136215 ] 

Guozhang Wang commented on KAFKA-10167:
---------------------------------------

The proposed solution is that even under EOS, do not try to use consumer.endOffset that would set `read-committed` flag, but to just use list-offset with `read-uncommitted` to get the end-offset.

The rationale is that, since we know that this changelog-topic is a single-writer, single-reader, and we control all the writer / reader of it, we can safely assume that the on-going txn is only from our previous writer. 

If the task migration is due to a graceful rebalance (i.e. the task is indeed being revoked from the other host), then the old host would always commit in which it would block on `producer.flush` to make sure all data are written (although by default we do not override replication factor on changelog topics and producer's ack.mode, so if user change the one without the other they may bump into other issues where data are not replicated completely and hence high-watermark returned from list-offset can be smaller). And therefore the end-offset returned would return the actual log-end-offset with or without the txn-marker, either of which is fine.

If the task migration is due to an unexpected task migration (i.e. the task was not proactively revoked, the old host may not know it is out of the group or has been crashed), then although not all records sent from the old host are guaranteed to be on the broker and be covered with end-offset, it is fine since these records will be aborted eventually anyways.

> Streams EOS-Beta should not try to get end-offsets as read-committed
> --------------------------------------------------------------------
>
>                 Key: KAFKA-10167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10167
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Priority: Major
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the context:
> In Streams when we are assigned with the new active tasks, we would first try to restore the state from the changelog topic all the way to the log end offset, and then we can transit from the `restoring` to the `running` state to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the synchronization barrier at the txn-coordinator which would guarantee that the txn-marker has been sent and received (otherwise we would error with CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker is received, it also means that the marker has been fully replicated, which in turn guarantees that the data written before that marker has been fully replicated. As a result, when we send the list-offset with `read-committed` flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on group-coordinator upon offset-fetch, and the group-coordinator would return the fetching offset right after it has received the replicated the txn-marker sent to it. However, since the txn-marker are sent to different brokers in parallel, and even within the same broker markers of different partitions are appended / replicated independently as well, so when the fetch-offset request returns it is NOT guaranteed that the LSO on other data partitions would have been advanced as well. And hence in that case the `endOffset` call may returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)