You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Edward Rojas <ed...@gmail.com> on 2018/05/04 10:15:45 UTC

Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

Hello all,

We have a kafka consumer listening to a topic pattern "topic-*" with a
partition discovery interval.
We eventually add new topics and this is working perfectly, the consumer
discover the new topics (and partitions) and listen to them.

But we also remove topics eventually and in this case the consumer is not
updated. The consumer continue listen to the removed partitions *forever*
and we get logs like:


2018-05-04 11:32:11,462 WARN  org.apache.kafka.clients.NetworkClient                       
- Error while fetching metadata with correlation id 1154 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:11,965 WARN  org.apache.kafka.clients.NetworkClient                       
- Error while fetching metadata with correlation id 1156 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:12,468 WARN  org.apache.kafka.clients.NetworkClient                       
- Error while fetching metadata with correlation id 1158 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:12,970 WARN  org.apache.kafka.clients.NetworkClient                       
- Error while fetching metadata with correlation id 1160 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
2018-05-04 11:32:13,473 WARN  org.apache.kafka.clients.NetworkClient                       
- Error while fetching metadata with correlation id 1162 :
{topic-123=UNKNOWN_TOPIC_OR_PARTITION}
...

This requests continue *forever* and the logs are shown several times per
second hiding other possible problems and it's using resources that could be
freed for other processing.

I think the partition discovery mechanism should be modified to take into
account not only new partitions but also removing no longer available
partitions.

What do you think ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

Posted by Edward Alexander Rojas Clavijo <ed...@gmail.com>.
Hello,
I've being working on a fix for this, I posted more details on the JIRA
ticket.

Regards,
Edward

2018-05-07 5:51 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:

> Ah, correct, sorry for the incorrect link.
> Thanks Ted!
>
>
> On 7 May 2018 at 11:43:12 AM, Ted Yu (yuzhihong@gmail.com) wrote:
>
> It seems the correct JIRA should be FLINK-9303
> <https://issues.apache.org/jira/browse/FLINK-9303>
>
> On Sun, May 6, 2018 at 8:29 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Edward,
>>
>> Thanks for brining this up, and I think your suggestion makes sense.
>> The problem is that the Kafka consumer has no notion of "closed"
>> partitions
>> at the moment, so statically assigned partitions to the Kafka client is
>> never removed and is always continuously requested for records.
>>
>> For example, on the Kinesis consumer, there is a notion of closed shards,
>> and therefore is not an issue there.
>>
>> I've created a JIRA to track this:
>> https://issues.apache.org/jira/browse/FLINK-5720
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Re: Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Ah, correct, sorry for the incorrect link.
Thanks Ted!


On 7 May 2018 at 11:43:12 AM, Ted Yu (yuzhihong@gmail.com) wrote:

It seems the correct JIRA should beĀ FLINK-9303

On Sun, May 6, 2018 at 8:29 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Edward,

Thanks for brining this up, and I think your suggestion makes sense.
The problem is that the Kafka consumer has no notion of "closed" partitions
at the moment, so statically assigned partitions to the Kafka client is
never removed and is always continuously requested for records.

For example, on the Kinesis consumer, there is a notion of closed shards,
and therefore is not an issue there.

I've created a JIRA to track this:
https://issues.apache.org/jira/browse/FLINK-5720

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

Posted by Ted Yu <yu...@gmail.com>.
It seems the correct JIRA should be FLINK-9303
<https://issues.apache.org/jira/browse/FLINK-9303>

On Sun, May 6, 2018 at 8:29 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Edward,
>
> Thanks for brining this up, and I think your suggestion makes sense.
> The problem is that the Kafka consumer has no notion of "closed" partitions
> at the moment, so statically assigned partitions to the Kafka client is
> never removed and is always continuously requested for records.
>
> For example, on the Kinesis consumer, there is a notion of closed shards,
> and therefore is not an issue there.
>
> I've created a JIRA to track this:
> https://issues.apache.org/jira/browse/FLINK-5720
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Edward,

Thanks for brining this up, and I think your suggestion makes sense.
The problem is that the Kafka consumer has no notion of "closed" partitions
at the moment, so statically assigned partitions to the Kafka client is
never removed and is always continuously requested for records.

For example, on the Kinesis consumer, there is a notion of closed shards,
and therefore is not an issue there.

I've created a JIRA to track this:
https://issues.apache.org/jira/browse/FLINK-5720

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/