You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hang Ruan (Jira)" <ji...@apache.org> on 2023/03/16 07:05:00 UTC

[jira] [Commented] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

    [ https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17701005#comment-17701005 ] 

Hang Ruan commented on FLINK-31483:
-----------------------------------

Hi, Ruibin,

Actually a deletion event(SplitsRemoval) is planned to be added for the FLIP-208(https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records). Here is the PR([https://github.com/apache/flink/pull/21589).]

But this PR don't aim at adding the split deletion for the enumerator. The deletion event only is passed in the reader itself.

Maybe some part in this issue will be contained after this FLIP finished.

Best, Hang

> Implement Split Deletion Support in Flink Kafka Connector
> ---------------------------------------------------------
>
>                 Key: FLINK-31483
>                 URL: https://issues.apache.org/jira/browse/FLINK-31483
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Kafka, Connectors / Parent
>            Reporter: Ruibin Xing
>            Priority: Major
>
> Currently, the Flink Kafka Connector does not support split deletion and is left as a [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]]. I want to add this feature by doing these steps:
> 1. Add SplitsDeletion event to flink-connector-base, which currently only has SplitsAddition.
> 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a SplitsDeletion event to the source operator. To maintain compatibility, a default empty implementation for this method will be added.
> 3. Make SourceOperator handle the SplitsDeletion event, notifiying the SourceReader to delete splits.
> 4. Create a deleteSplits method in SourceReader to remove splits, including remove them from Split state and stopping SourceReader from reading the deleted splits.
> As an alternative, without modifying the flink-connector-base, KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for splits deletion and deal with it in the kafka-connector-specific code. But I think it's better to have SplitsDeletion in flink-connector-base, so other connectors can use it too.
> Let me know if you have any thoughts or ideas. Thanks!
> Related Issues: FLINK-30490



--
This message was sent by Atlassian Jira
(v8.20.10#820010)