You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/01/11 19:03:00 UTC
[jira] [Commented] (FLINK-25615) FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
[ https://issues.apache.org/jira/browse/FLINK-25615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473100#comment-17473100 ]
Martijn Visser commented on FLINK-25615:
----------------------------------------
[~Matthias Schwalbe] Thanks for the report. Since the Flink community doesn't support version below 1.12 anymore and the FlinkKafkaProvider is indeed deprecated, what do you think should happen with this ticket?
> FlinkKafkaProducer fail to correctly migrate pre Flink 1.9 state
> ----------------------------------------------------------------
>
> Key: FLINK-25615
> URL: https://issues.apache.org/jira/browse/FLINK-25615
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.9.0
> Reporter: Matthias Schwalbe
> Priority: Major
>
> I've found an unnoticed error in FlinkKafkaProvider when migrating from pre Flink 1.9 state to versions starting with Flink 1.9:
> * the operator state for next-transactional-id-hint should be deleted and replaced by operator state next-transactional-id-hint-v2, however
> * operator state next-transactional-id-hint is never deleted
> * see here: [1] :
> {quote} if (context.getOperatorStateStore()
> .getRegisteredStateNames()
> .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
> migrateNextTransactionalIdHindState(context);
> }{quote} * migrateNextTransactionalIdHindState is never called, as the condition cannot become true:
> ** getRegisteredStateNames returns a list of String, whereas NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is ListStateDescriptor (type mismatch)
> The Effect is:
> * because NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR is for a UnionListState, and
> * the state is not cleared,
> * each time the job restarts from a savepoint or checkpoint the size multiplies times the parallelism
> * then because each entry leaves an offset in metadata, akka.framesize becomes too small, before we run into memory overflow
>
> The breaking change has been introduced in commit 70fa80e3862b367be22b593db685f9898a2838ef
>
> A simple fix would be to change the code to:
> {quote} if (context.getOperatorStateStore()
> .getRegisteredStateNames()
> .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.getName())) {
> migrateNextTransactionalIdHindState(context);
> }
> {quote}
>
> Although FlinkKafkaProvider is marked as deprecated it is probably a while here to stay
>
> Greeting
> Matthias (Thias) Schwalbe
>
> [1] https://github.com/apache/flink/blob/d7cf2c10f8d4fba81173854cbd8be27c657c7c7f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1167-L1171
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)