You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthias Schwalbe (Jira)" <ji...@apache.org> on 2022/01/11 16:40:00 UTC

[jira] [Updated] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state

     [ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias Schwalbe updated FLINK-25615:
--------------------------------------
    Description: 
I've found an unnoticed error in FlinkKafkaProvider when migrating from pre Flink 1.9 state to versions starting with Flink 1.9:
 * the operator state for next-transactional-id-hint should be deleted and replaced by operator state next-transactional-id-hint-v2, however
 * operator state next-transactional-id-hint is never deleted
 * see here: [1] :

{quote}        if (context.getOperatorStateStore()
                .getRegisteredStateNames()
                .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
            migrateNextTransactionalIdHindState(context);
        }{quote} * migrateNextTransactionalIdHindState is never called, as the condition cannot become true:
 ** getRegisteredStateNames returns a list of String, whereas NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)

The Effect is:
 * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
 * the state is not cleared,
 * each time the job restarts from a savepoint or checkpoint the size multiplies times the parallelism
 * then because each entry leaves an offset in metadata, akka.framesize becomes too small, before we run into memory overflow

 

The breaking change has been introduced in commit 70fa80e3862b367be22b593db685f9898a2838ef

 

A simple fix would be to change the code to:
{quote}        if (context.getOperatorStateStore()
                .getRegisteredStateNames()
                .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
            migrateNextTransactionalIdHindState(context);
        }
{quote}
 

Although FlinkKafkaProvider  is marked as deprecated it is probably a while here to stay

 

Greeting

Matthias (Thias) Schwalbe

 

[1] https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171

 

  was:TBD


> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> ----------------------------------------------------------------
>
>                 Key: FLINK-25615
>                 URL: https://issues.apache.org/jira/browse/FLINK-25615
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre Flink 1.9 state to versions starting with Flink 1.9:
>  * the operator state for next-transactional-id-hint should be deleted and replaced by operator state next-transactional-id-hint-v2, however
>  * operator state next-transactional-id-hint is never deleted
>  * see here: [1] :
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
>             migrateNextTransactionalIdHindState(context);
>         }{quote} * migrateNextTransactionalIdHindState is never called, as the condition cannot become true:
>  ** getRegisteredStateNames returns a list of String, whereas NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
>  * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
>  * the state is not cleared,
>  * each time the job restarts from a savepoint or checkpoint the size multiplies times the parallelism
>  * then because each entry leaves an offset in metadata, akka.framesize becomes too small, before we run into memory overflow
>  
> The breaking change has been introduced in commit 70fa80e3862b367be22b593db685f9898a2838ef
>  
> A simple fix would be to change the code to:
> {quote}        if (context.getOperatorStateStore()
>                 .getRegisteredStateNames()
>                 .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
>             migrateNextTransactionalIdHindState(context);
>         }
> {quote}
>  
> Although FlinkKafkaProvider  is marked as deprecated it is probably a while here to stay
>  
> Greeting
> Matthias (Thias) Schwalbe
>  
> [1] https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)