You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hector Geraldino (Jira)" <ji...@apache.org> on 2021/12/08 15:27:00 UTC

[jira] [Updated] (KAFKA-13521) Supress changelog schema version breaks migration

     [ https://issues.apache.org/jira/browse/KAFKA-13521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hector Geraldino updated KAFKA-13521:
-------------------------------------
    Issue Type: Bug  (was: Improvement)

> Supress changelog schema version breaks migration
> -------------------------------------------------
>
>                 Key: KAFKA-13521
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13521
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0, 2.5.0, 2.5.1
>            Reporter: Hector Geraldino
>            Priority: Major
>
> Hi,
> We recently updated the kafka-streams library in one of our apps from v2.5.0 to v2.5.1. This upgrade changes the header format of the state store for suppress changelog topics (see https://issues.apache.org/jira/browse/KAFKA-10173 and [https://github.com/apache/kafka/pull/8905)]
> What we noticed was that, introducing a new version on the binary schema header breaks older clients. I.e. applications running on v2.5.1 can parse the v3, v2, v1 and 0 headers, while the ones running on 2.5.0 (and, I assume, previous versions) cannot read headers in v3 format.
> The logged exception is:
>  
> {code:java}
> java.lang.IllegalArgumentException: Restoring apparently invalid changelog record: ConsumerRecord(topic = msgequator-kfns-msgequator-msgequator-suppress-buffer-store-changelog, partition = 8, leaderEpoch = 405, offset = 711400430, CreateTime = 1638828473341, serialized key size = 32, serialized value size = 90, headers = RecordHeaders(headers = [RecordHeader(key = v, value = [3])], isReadOnly = false), key = [B@5cf0e540, value = [B@40abc004)
> 	at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:372) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) ~[msgequator-1.59.3.jar:1.59.3]
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[msgequator-1.59.3.jar:1.59.3] {code}
>  
>  
> There's obviously no clear solution for this other than stopping/starting all instances at once. A rolling bounce that takes some time to complete (in our case, days) will break  instances that haven't been upgraded yet after a rebalance that causes older clients to pick up the newly encoded changelog partition(s)
>  
> I don't know if adding a flag on the client side that lists the supported protocol versions (so it behaves like Kafka Consumers when picking the rebalance protocol - cooperative or eager), or if it just needs to be explicitly stated on the migration guide that a full stop/start migration is required in cases where the protocol version changes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)