You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2021/10/06 00:58:00 UTC

[jira] [Updated] (KAFKA-13350) Handle task corrupted exception on a per state store basis

     [ https://issues.apache.org/jira/browse/KAFKA-13350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-13350:
------------------------------------
    Description: 
When we hit an `OffsetOutOfRangeException` during restore, we close a tasks as dirty and retry the restore process from scratch. For this case, we wipe out the task's state stores.

If a task has multiple state stores, we also wipe out state that is actually clean and thus need to redo work for no reason. Instead of wiping out all state store, we should only wipe out the single state store that corresponds to the changelog topic partition that hit the `OffsetOutOfRangeException`, but preserve the restore progress for all other state stores.

We need to consider persistent and in-memory stores: for persistent stores, it would be fine to close the not affected stores cleanly and also write the checkpoint file. For in-memory stores however, we should not close the store to avoid dropping the in-memory data.

*TODO:* verify consumer behavior: if a consumer subscribes to multiple partitions, and two or more partitions are on the same broker, both could trigger an `OffsetOutOfRangeException` from a single fetch request at the same time – however, it seems that the consumer only reports a single `TopicPartition` when it raises an `OffsetOutOfRangeException`. Thus, we need to ensure to not lose information and maybe need to update the consumer to report all affected partitions at once? Or maybe it won't be an issue, because the next fetch request would send the same offset for the "missed" partitions and thus we would get a new `OffsetOutOfRangeException` anyway (it might still be more efficient to get all affected partitions at once).

  was:
When we hit an `OffsetOutOfRangeException` during restore, we close a tasks as dirty and retry the restore process from scratch. For this case, we wipe out the task's state stores.

If a task has multiple state stores, we also wipe out state that is actually clean and thus need to redo work for no reason. Instead of wiping out all state store, we should only wipe out the single state store that corresponds to the changelog topic partition that hit the `OffsetOutOfRangeException`, but preserve the restore progress for all other state stores.

We need to consider persistent and in-memory stores: for persistent stores, it would be fine to close the not affected stores cleanly and also write the checkpoint file. For in-memory stores however, we should not close the store to avoid dropping the in-memory data.


> Handle task corrupted exception on a per state store basis
> ----------------------------------------------------------
>
>                 Key: KAFKA-13350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13350
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>
> When we hit an `OffsetOutOfRangeException` during restore, we close a tasks as dirty and retry the restore process from scratch. For this case, we wipe out the task's state stores.
> If a task has multiple state stores, we also wipe out state that is actually clean and thus need to redo work for no reason. Instead of wiping out all state store, we should only wipe out the single state store that corresponds to the changelog topic partition that hit the `OffsetOutOfRangeException`, but preserve the restore progress for all other state stores.
> We need to consider persistent and in-memory stores: for persistent stores, it would be fine to close the not affected stores cleanly and also write the checkpoint file. For in-memory stores however, we should not close the store to avoid dropping the in-memory data.
> *TODO:* verify consumer behavior: if a consumer subscribes to multiple partitions, and two or more partitions are on the same broker, both could trigger an `OffsetOutOfRangeException` from a single fetch request at the same time – however, it seems that the consumer only reports a single `TopicPartition` when it raises an `OffsetOutOfRangeException`. Thus, we need to ensure to not lose information and maybe need to update the consumer to report all affected partitions at once? Or maybe it won't be an issue, because the next fetch request would send the same offset for the "missed" partitions and thus we would get a new `OffsetOutOfRangeException` anyway (it might still be more efficient to get all affected partitions at once).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)