You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Shengnan YU (JIRA)" <ji...@apache.org> on 2019/04/24 03:21:00 UTC

[jira] [Commented] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION

    [ https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16824781#comment-16824781 ] 

Shengnan YU commented on FLINK-11848:
-------------------------------------

It does not work, I have looked up the source code. Flink partition discovery get the topic list from consumer metadata. However the warning occurs when consumer fetch metadata from cluster. It is a issue with kafka-client not flink actually. Thank you very much for help.

> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> ------------------------------------------------------------
>
>                 Key: FLINK-11848
>                 URL: https://issues.apache.org/jira/browse/FLINK-11848
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.4
>            Reporter: Shengnan YU
>            Assignee: frank wang
>            Priority: Major
>
> Recently we are doing some streaming jobs with apache flink. There are multiple KAFKA topics with a format as xxxxxx_yy-mm-dd. We used a topic regex pattern to let a consumer to consume those topics. However, if we delete some older topics, it seems that the metadata in consumer does not update properly so It still remember those outdated topic in its topic list, which leads to *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It seems to occur in producer as well. Any idea to solve this problem? Thank you very much!
>  
> Example to reproduce problem:
> There are multiple kafka topics which are "test20190310","test20190311","test20190312" for instance. I run the job and everything is ok. Then if I delete topic "test20190310", the consumer does not perceive the topic is deleted, it will still go fetching metadata of that topic. In taskmanager's log, unknown errors display. 
> {code:java}
> public static void main(String []args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "localhost:9092\n");
>         props.put("group.id", "test10");
>         props.put("enable.auto.commit", "true");
>         props.put("auto.commit.interval.ms", "1000");
>         props.put("auto.offset.rest", "earliest");
>         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>                "1200000");
>         Pattern topics = Pattern.compile("^test.*$");
>         FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props);
>         DataStream<String> stream = env.addSource(consumer);
>         stream.writeToSocket("localhost", 44444, new SimpleStringSchema());
>         env.execute("test");
>     }
> }
> {code}
>   
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)