You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Boyang Chen (Jira)" <ji...@apache.org> on 2020/04/20 20:00:00 UTC

[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

    [ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17088044#comment-17088044 ] 

Boyang Chen commented on KAFKA-9891:
------------------------------------

Thanks for the report, will take a look this week.

> Invalid state store content after task migration with exactly_once and standby replicas
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9891
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9891
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Mateusz Jadczyk
>            Assignee: Boyang Chen
>            Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one from Kafka Streams examples) based on Kafka Streams State Stores. It stores command ids from the past hour in _persistentWindowStore_. We encountered a problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active task 1_2. First node in the topology analyses if this is a duplicate by checking in the state store (_COMMAND_ID_STORE_), if not puts the command id in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the duplicated command id is performed, it's not a duplicate, command id is put into the state store. Next node in the topology throws an exception which causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, offsets are not committed. I double checked for the changelog topic - relevant messages are not committed. Therefore, the changelog topic contains only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. It checks if this command id is a duplicate - no, it isn't - tries to process the faulty command and throws an exception. Again, transaction aborted, all looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, *it is a duplicate!* Even though the transaction has been aborted and the changelog doesn't contain this command id: _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) we concluded it has something to do with checkpoint files. Here are the detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpointNODE_1 log1:2020-04-15 21:11:33.942 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.StoreChangelogReader : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
> NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] o.a.k.s.p.i.AssignedStreamsTasks : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to process stream task 1_2 due to the following error: java.lang.RuntimeExceptionNODE_3 2020-04-15 21:12:05.346 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.NODE_3 2020-04-15 21:12:06.424 WARN 1 --- [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc
> {code}
> It seems that on NODE_3 there's a standby task 1_2 running on T-2, it replicates a first valid command, thus creating a checkpoint file. Invalid command causes an error on NODE_1, then NODE_3 T-2 takes over the task. It finds the checkpoint file (which is fine), and starts to process the invalid command. It crashes, same node T-1 takes over, finds the checkpoint file (!), thinks state store is clean (apparently it's not as it contains state modified by T-2) and finds a duplicated command id.
>  
> We use Java 11, kafka clients 4.1 and spring-kafka 2.4.5. We rolled back for a moment to kafka clients 2.3.1 and the problem persists.
> *We performed more tests with configuration changes and after changing `num.standby.replicas = 1` to `num.standby.replicas = 0` the problem disappeared. It is also resolved when changing the store to _inMemoryWindowStore._*
> In the SO question you can find the relevant java code. I don't have a sample project to share at the moment which replicates the problem, but it is easily repeatable in our project.
> Such behaviour can have serious implications on business logic, in our case accidentally skipped messages without properly processing them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)