You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Exidex (Jira)" <ji...@apache.org> on 2023/02/22 12:54:00 UTC

[jira] [Created] (FLINK-31186) Removing topic from kafka source does nothing

Exidex created FLINK-31186:
------------------------------

             Summary: Removing topic from kafka source does nothing
                 Key: FLINK-31186
                 URL: https://issues.apache.org/jira/browse/FLINK-31186
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.15.3
            Reporter: Exidex


As far as I can tell, there is no good way to remove topic from the list of topic that kafka source consumes from. 

We use {{StreamExecutionEnvironment.fromSource}} api with {{KafkaSource.setTopics}} which accepts list of topics. but when we remove the topic from list after some time the flink kafka source still consumes from it. 

My guess is that it relates to this TODO in code:
[GitHub|https://github.com/apache/flink/blob/cc66d4855e6f8ee9986809a18f68a458bcfe3c12/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]

You can kind of workaroud this by removing whole job state or changing uid of kafka source but that affects either whole job or whole source. The other way is to use state processor api but it doesn't expose source operator state, which in turn can be worked around using reflection and copying code from SourceCoordinator. None of those are satisfactory



--
This message was sent by Atlassian Jira
(v8.20.10#820010)