You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yan Wang <y....@oracle.com> on 2021/09/02 02:51:47 UTC

Use FlinkKafkaConsumer to synchronize multiple Kafka topics

Hi,

We are currently using a single FlinkKafkaConsumer to consume multiple Kafka topics, however, we find that if one of the Kafka topics goes down at run time(like rebooting one of the topics), the FlinkKafkaConsumer will keep throwing warning message of the dead Kafka topic, and will also continue consume other live Kafka topics.
However, what we want is that, if one of the topics goes down, the FlinkKafkaConsumer will wait and stop consuming other live topics until the dead topic goes live.

Code example:

List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList( “KafkaTopic1”,  “KafkaTopic2” ) );
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>( kafkaTopicsList, new SimpleStringSchema(), properties);

As shown in the code example, kafkaTopicsList contains two Kafka topics, and flinkKafkaConsumer consumes both two topics. We hope that if KafkaTopic1 goes down at run-time(we may reboot KafkaTopic1 at run time), the flinkKafkaConsumer will wait and stop consuming KafkaTopic2, until KafkaTopic1 goes live again.

May I ask is it possible to achieve this purpose using current Flink API? Do we need to edit configuration somewhere? Or we have to overwrite FlinkKafkaConsumer Class to achieve this? Thank you very much!

Thanks, Yan

Re: [External] : Re: Use FlinkKafkaConsumer to synchronize multiple Kafka topics

Posted by Yan Wang <y....@oracle.com>.
Hi Arvid,

Thanks for your reply.
Yes, the warning is throwed by Kafka-clients. Here is the warning log after I deleted the topic that Kafka consumer is listening to.
18:46:27,297 WARN  org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-2, groupId=osstest] Error while fetching metadata with correlation id 30288 : {oss-consumer-test-1=UNKNOWN_TOPIC_OR_PARTITION, oss-consumer-test-2=UNKNOWN_TOPIC_OR_PARTITION}

I have a follow up question.
“What you could do is try to config Kafka consumer to fail hard when topic metadata cannot be retrieved with a small timeout.”
May I ask how to config Flink Kafka Consumer to fail when one of the topic metadata cannot be retrieved?
For example,  a FlinkKafkaConsumer is listening to 3 different Kafka topics. If one of the Kafka topics is deleted and FlinkKafkaConsumer cannot retrive the corresponding topic metadata. We hope that FlinkKafkaConsumer will fail hard and stop retrieving other two live topics.
Thank you very much!

Thanks, Yan

From: Arvid Heise <ar...@apache.org>
Date: Thursday, September 2, 2021 at 1:27 PM
To: Yan Wang <y....@oracle.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: [External] : Re: Use FlinkKafkaConsumer to synchronize multiple Kafka topics
Hi Yan,

Afaik this is not directly supported and would be surprising to other users since it's a rather specific requirement.
In fact, Flink delegates reading the topics to Kafka consumer API and I suspect that the warning you received is also coming from Kafka consumer (I have not found a respective warning in Flink's code base but you could also show the exact log statement so I can recheck).

What you could do is try to config Kafka consumer to fail hard when topic metadata cannot be retrieved with a small timeout.

Note that I'm a bit confused by the terms "dead" topic and "rebooted" topic. Afaik you can only have dead brokers and rebooted brokers and maybe deleted topics. But I have yet to understand a use case where you would delete a topic while the consumer is running.

On Thu, Sep 2, 2021 at 4:58 AM Yan Wang <y....@oracle.com>> wrote:
Hi,

We are currently using a single FlinkKafkaConsumer to consume multiple Kafka topics, however, we find that if one of the Kafka topics goes down at run time(like rebooting one of the topics), the FlinkKafkaConsumer will keep throwing warning message of the dead Kafka topic, and will also continue consume other live Kafka topics.
However, what we want is that, if one of the topics goes down, the FlinkKafkaConsumer will wait and stop consuming other live topics until the dead topic goes live.

Code example:
List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList( “KafkaTopic1”,  “KafkaTopic2” ) );
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>( kafkaTopicsList, new SimpleStringSchema(), properties);

As shown in the code example, kafkaTopicsList contains two Kafka topics, and flinkKafkaConsumer consumes both two topics. We hope that if KafkaTopic1 goes down at run-time(we may reboot KafkaTopic1 at run time), the flinkKafkaConsumer will wait and stop consuming KafkaTopic2, until KafkaTopic1 goes live again.

May I ask is it possible to achieve this purpose using current Flink API? Do we need to edit configuration somewhere? Or we have to overwrite FlinkKafkaConsumer Class to achieve this? Thank you very much!

Thanks, Yan

Re: Use FlinkKafkaConsumer to synchronize multiple Kafka topics

Posted by Arvid Heise <ar...@apache.org>.
Hi Yan,

Afaik this is not directly supported and would be surprising to other users
since it's a rather specific requirement.
In fact, Flink delegates reading the topics to Kafka consumer API and I
suspect that the warning you received is also coming from Kafka consumer (I
have not found a respective warning in Flink's code base but you could also
show the exact log statement so I can recheck).

What you could do is try to config Kafka consumer to fail hard when topic
metadata cannot be retrieved with a small timeout.

Note that I'm a bit confused by the terms "dead" topic and "rebooted"
topic. Afaik you can only have dead brokers and rebooted brokers and maybe
deleted topics. But I have yet to understand a use case where you would
delete a topic while the consumer is running.

On Thu, Sep 2, 2021 at 4:58 AM Yan Wang <y....@oracle.com> wrote:

> Hi,
>
>
>
> We are currently using a single FlinkKafkaConsumer to consume multiple
> Kafka topics, however, we find that if one of the Kafka topics goes down at
> run time(like rebooting one of the topics), the FlinkKafkaConsumer will
> keep throwing warning message of the dead Kafka topic, and will also
> continue consume other live Kafka topics.
>
> However, what we want is that, if one of the topics goes down, the
> FlinkKafkaConsumer will wait and stop consuming other live topics until the
> dead topic goes live.
>
>
> Code example:
>
> *List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList(
> “KafkaTopic1”,  “KafkaTopic2” ) );*
>
> *FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(
> kafkaTopicsList, new SimpleStringSchema(), properties);*
>
>
>
> As shown in the code example, *kafkaTopicsList* contains two Kafka
> topics, and flinkKafkaConsumer consumes both two topics. We hope that if *KafkaTopic1
> *goes down at run-time(we may reboot *KafkaTopic1 *at run time), the
> flinkKafkaConsumer will wait and stop consuming *KafkaTopic2, *until*
> KafkaTopic1 *goes live again.
>
>
>
> May I ask is it possible to achieve this purpose using current Flink API?
> Do we need to edit configuration somewhere? Or we have to overwrite *FlinkKafkaConsumer
> *Class to achieve this? Thank you very much!
>
>
>
> Thanks, Yan
>