You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2021/09/14 23:13:00 UTC

[jira] [Assigned] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS

     [ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang reassigned KAFKA-13249:
-------------------------------------

    Assignee: Oliver Hutchison

> Checkpoints do not contain latest offsets on shutdown when using EOS
> --------------------------------------------------------------------
>
>                 Key: KAFKA-13249
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13249
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.0.0, 2.7.0, 2.8.0
>            Reporter: Oliver Hutchison
>            Assignee: Oliver Hutchison
>            Priority: Major
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app is shutdown does not always contain changelog offsets which represent the latest state of the state store. The offsets can often be behind the end of the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts after shutting down cleanly as streams thinks (based on the incorrect offsets in the checkpoint) that the state store is not up to date with the changelog. 
> This is increasing the time we see it takes to do a clean restart of a single instance streams app from around 10 second to sometime over 2 minutes in our case.
> I suspect the bug appears because an assumption about the {{commitNeeded}} field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
>   // commitNeeded indicates we may have processed some records since last commit
>   // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
>   if (commitNeeded) {
>     stateMgr.updateChangelogOffsets(checkpointableOffsets());
>   }
>   super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app where an app simply starts, runs and then shuts down the {{if (commitNeeded)}} test always fails when running with EOS which results in the latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this is the case as there's only 1 place in the code which calls {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} state.
> {code:java}
> case RUNNING:
>   if (enforceCheckpoint || !eosEnabled) {
>     maybeWriteCheckpoint(enforceCheckpoint);
>   }
>   log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", state(), eosEnabled, enforceCheckpoint);
>   break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? Again looking only at the steady state case we find that it's only called from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it happens *after* all active tasks have commited. Which means that {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always update the changelog offserts before we write the checkpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)