You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Prasanna kumar <pr...@gmail.com> on 2021/08/04 12:25:44 UTC

Re: Topic assignment across Flink Kafka Consumer

Robert,

Flink version 1.12.2.
Flink connector Kafka Version 2..12

The partitions are assigned equally if we are reading from a single topic.

Our Use case is to read from multiple topics [topics r4 regex pattern] we
use 6 topics and 1 partition per topic for this job.

In this case , few of the kafka consumer tasks are not allocated.

Thanks,
Prasanna.

On Tue, 20 Jul 2021, 17:44 Robert Metzger, <rm...@apache.org> wrote:

> Hi Prasanna,
> which Flink version and Kafka connector are you using? (the "KafkaSource"
> or "FlinkKafkaConsumer"?)
>
> The partition assignment for the FlinkKafkaConsumer is defined here:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>
>
> I assume all your topics have one partition only. Still, the "startIndex"
> should be determined based on the hash of the topic name. My only
> explanation is that your unlucky with the distribution of the hashes.
> If this leads to performance issues, consider using topics with multiple
> partitions, change the name of the topics or increase the parallelism of
> your consumer.
>
>
>
>
> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
> prasannakumarramani@gmail.com> wrote:
>
>> Hi,
>>
>> We have a Flink job reading from multiple Kafka topics based on a regex
>> pattern.
>>
>> What we have found out is that the topics are not shared between the
>> kafka consumers in an even manner .
>>
>> Example if there are 8 topics and 4 kafka consumer operators . 1
>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>> consumer is not assigned at all.
>>
>> This leads to inadequate usage of the resources.
>>
>> I could not find any setting/configuration which would make them as even
>> as possible.
>>
>> Let me know if there's a way to do the same.
>>
>> Thanks,
>> Prasanna.
>>
>

Re: Topic assignment across Flink Kafka Consumer

Posted by Prasanna kumar <pr...@gmail.com>.
Robert

We are checking using the metric
flink_taskmanager_job_task_operator_KafkaConsumer_assigned_partitions{jobname="SPECIFICJOBNAME"}

This metric gives the number of partitions assigned to each task(kafka
consumer operator).

Prasanna.


On Wed, Aug 4, 2021 at 8:59 PM Robert Metzger <rm...@apache.org> wrote:

> Hi Prasanna,
>
> How are you checking the assignment of Kafka partitions to the consumers?
>
> The FlinkKafkaConsumer doesn't have a rebalance() method, this is a
> generic concept of the DataStream API. Is it possible that you are
> somehow partitioning your data in your Flink job, and this is causing the
> data distribution issues you are observing?
>
>
> On Wed, Aug 4, 2021 at 4:00 PM Prasanna kumar <
> prasannakumarramani@gmail.com> wrote:
>
>> Robert
>>
>> When we apply a rebalance method to the kafka consumer, it is assigning
>> partitions of various topics evenly.
>>
>> But my only concern is that the rebalance method might have a performance
>> impact .
>>
>> Thanks,
>> Prasanna.
>>
>>
>> On Wed, Aug 4, 2021 at 5:55 PM Prasanna kumar <
>> prasannakumarramani@gmail.com> wrote:
>>
>>> Robert,
>>>
>>> Flink version 1.12.2.
>>> Flink connector Kafka Version 2..12
>>>
>>> The partitions are assigned equally if we are reading from a single
>>> topic.
>>>
>>> Our Use case is to read from multiple topics [topics r4 regex pattern]
>>> we use 6 topics and 1 partition per topic for this job.
>>>
>>> In this case , few of the kafka consumer tasks are not allocated.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>> On Tue, 20 Jul 2021, 17:44 Robert Metzger, <rm...@apache.org> wrote:
>>>
>>>> Hi Prasanna,
>>>> which Flink version and Kafka connector are you using? (the
>>>> "KafkaSource" or "FlinkKafkaConsumer"?)
>>>>
>>>> The partition assignment for the FlinkKafkaConsumer is defined here:
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>>>>
>>>>
>>>> I assume all your topics have one partition only. Still, the
>>>> "startIndex" should be determined based on the hash of the topic name. My
>>>> only explanation is that your unlucky with the distribution of the hashes.
>>>> If this leads to performance issues, consider using topics with
>>>> multiple partitions, change the name of the topics or increase the
>>>> parallelism of your consumer.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
>>>> prasannakumarramani@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have a Flink job reading from multiple Kafka topics based on a
>>>>> regex pattern.
>>>>>
>>>>> What we have found out is that the topics are not shared between the
>>>>> kafka consumers in an even manner .
>>>>>
>>>>> Example if there are 8 topics and 4 kafka consumer operators . 1
>>>>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>>>>> consumer is not assigned at all.
>>>>>
>>>>> This leads to inadequate usage of the resources.
>>>>>
>>>>> I could not find any setting/configuration which would make them as
>>>>> even as possible.
>>>>>
>>>>> Let me know if there's a way to do the same.
>>>>>
>>>>> Thanks,
>>>>> Prasanna.
>>>>>
>>>>

Re: Topic assignment across Flink Kafka Consumer

Posted by Robert Metzger <rm...@apache.org>.
Hi Prasanna,

How are you checking the assignment of Kafka partitions to the consumers?

The FlinkKafkaConsumer doesn't have a rebalance() method, this is a generic
concept of the DataStream API. Is it possible that you are
somehow partitioning your data in your Flink job, and this is causing the
data distribution issues you are observing?


On Wed, Aug 4, 2021 at 4:00 PM Prasanna kumar <pr...@gmail.com>
wrote:

> Robert
>
> When we apply a rebalance method to the kafka consumer, it is assigning
> partitions of various topics evenly.
>
> But my only concern is that the rebalance method might have a performance
> impact .
>
> Thanks,
> Prasanna.
>
>
> On Wed, Aug 4, 2021 at 5:55 PM Prasanna kumar <
> prasannakumarramani@gmail.com> wrote:
>
>> Robert,
>>
>> Flink version 1.12.2.
>> Flink connector Kafka Version 2..12
>>
>> The partitions are assigned equally if we are reading from a single topic.
>>
>> Our Use case is to read from multiple topics [topics r4 regex pattern] we
>> use 6 topics and 1 partition per topic for this job.
>>
>> In this case , few of the kafka consumer tasks are not allocated.
>>
>> Thanks,
>> Prasanna.
>>
>> On Tue, 20 Jul 2021, 17:44 Robert Metzger, <rm...@apache.org> wrote:
>>
>>> Hi Prasanna,
>>> which Flink version and Kafka connector are you using? (the
>>> "KafkaSource" or "FlinkKafkaConsumer"?)
>>>
>>> The partition assignment for the FlinkKafkaConsumer is defined here:
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>>>
>>>
>>> I assume all your topics have one partition only. Still, the
>>> "startIndex" should be determined based on the hash of the topic name. My
>>> only explanation is that your unlucky with the distribution of the hashes.
>>> If this leads to performance issues, consider using topics with multiple
>>> partitions, change the name of the topics or increase the parallelism of
>>> your consumer.
>>>
>>>
>>>
>>>
>>> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
>>> prasannakumarramani@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have a Flink job reading from multiple Kafka topics based on a regex
>>>> pattern.
>>>>
>>>> What we have found out is that the topics are not shared between the
>>>> kafka consumers in an even manner .
>>>>
>>>> Example if there are 8 topics and 4 kafka consumer operators . 1
>>>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>>>> consumer is not assigned at all.
>>>>
>>>> This leads to inadequate usage of the resources.
>>>>
>>>> I could not find any setting/configuration which would make them as
>>>> even as possible.
>>>>
>>>> Let me know if there's a way to do the same.
>>>>
>>>> Thanks,
>>>> Prasanna.
>>>>
>>>

Re: Topic assignment across Flink Kafka Consumer

Posted by Prasanna kumar <pr...@gmail.com>.
Robert

When we apply a rebalance method to the kafka consumer, it is assigning
partitions of various topics evenly.

But my only concern is that the rebalance method might have a performance
impact .

Thanks,
Prasanna.


On Wed, Aug 4, 2021 at 5:55 PM Prasanna kumar <pr...@gmail.com>
wrote:

> Robert,
>
> Flink version 1.12.2.
> Flink connector Kafka Version 2..12
>
> The partitions are assigned equally if we are reading from a single topic.
>
> Our Use case is to read from multiple topics [topics r4 regex pattern] we
> use 6 topics and 1 partition per topic for this job.
>
> In this case , few of the kafka consumer tasks are not allocated.
>
> Thanks,
> Prasanna.
>
> On Tue, 20 Jul 2021, 17:44 Robert Metzger, <rm...@apache.org> wrote:
>
>> Hi Prasanna,
>> which Flink version and Kafka connector are you using? (the "KafkaSource"
>> or "FlinkKafkaConsumer"?)
>>
>> The partition assignment for the FlinkKafkaConsumer is defined here:
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>>
>>
>> I assume all your topics have one partition only. Still, the "startIndex"
>> should be determined based on the hash of the topic name. My only
>> explanation is that your unlucky with the distribution of the hashes.
>> If this leads to performance issues, consider using topics with multiple
>> partitions, change the name of the topics or increase the parallelism of
>> your consumer.
>>
>>
>>
>>
>> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
>> prasannakumarramani@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We have a Flink job reading from multiple Kafka topics based on a regex
>>> pattern.
>>>
>>> What we have found out is that the topics are not shared between the
>>> kafka consumers in an even manner .
>>>
>>> Example if there are 8 topics and 4 kafka consumer operators . 1
>>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>>> consumer is not assigned at all.
>>>
>>> This leads to inadequate usage of the resources.
>>>
>>> I could not find any setting/configuration which would make them as even
>>> as possible.
>>>
>>> Let me know if there's a way to do the same.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>