You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/01/11 19:03:00 UTC

[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

    [ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473100#comment-17473100 ] 

Martijn Visser commented on FLINK-25615:
----------------------------------------

[~Matthias Schwalbe] Thanks for the report. Since the Flink community doesn't support version below 1.12 anymore and the FlinkKafkaProvider is indeed deprecated, what do you think should happen with this ticket? 

> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> ----------------------------------------------------------------
>
>                 Key: FLINK-25615
>                 URL: https://issues.apache.org/jira/browse/FLINK-25615
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)