You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Isuru Suriarachchi <is...@gmail.com> on 2017/08/29 04:26:45 UTC

Consuming a Kafka topic with multiple partitions from Flink

Hi all,

I'm trying to implement a Flink consumer which consumes a Kafka topic with
3 partitions. I've set the parallelism of the execution environment to 3 as
I want to make sure that each Kafka partition is consumed by a separate
parallel task in Flink. My first question is whether it's always guaranteed
to have a one-to-one mapping between Kafka partitions and Flink tasks in
this setup?

So far, I've just setup a single Kafka broker and created a topic with 3
partitions and tried to consume it from my flink application with
parallelism set to 3 (all on same machine). I see 3 parallel processes of
each operation being created on Flink log. However, when I execute the
Flink job, messages from all 3 Kafka partitions are consumed by a single
task (Process (3/3)). Other two parallel tasks are idling. Am I mission
something here? In addition to setting the parallelism, is there any other
configuration that I have to do here?

Here are the details about my setup.

Kafka version: 0.10.2.1
Flink version: 1.3.1
Connector: FlinkKafkaConsumer010

Thanks,
Isuru

Re: Consuming a Kafka topic with multiple partitions from Flink

Posted by Isuru Suriarachchi <is...@gmail.com>.
Hi Tony,

Thanks for your thoughts. I found the issue in my Flink processing chain. I
had Kafka partition ids 0, 1, 2 and therefore it was a different issue. I
had a keyBy operation before my process operation (which contains my main
stream processing logic) and there was only one key being assigned.
Therefore, after keyBy, stream had only one partition and only one process
operation was getting messages.

Regards,
Isuru

On Tue, Aug 29, 2017 at 3:00 AM, Tony Wei <to...@gmail.com> wrote:

> Hi Isuru,
>
> The way to assign partitions by FlinkKafkaConsumer can refer to this java
> document. (https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/api/java/org/apache/flink/streaming/
> connectors/kafka/internals/KafkaTopicPartitionAssigner.html)
> That means your partitions should have increasing id one by one, then each
> subtask would be assigned equally.
> If your partition ids are 0, 3, 6, then only one subtask was assigned with
> parallelism set to 3.
>
> In my case, I created a new topic with two partitions which id are 0 and
> 1, and my consumer with 2 subtasks were assigned fairly with one to one
> mapping.
> You can check your topic's  setup or create another topic to try this out.
>
> Hope this will help you.
>
> Best Regards,
> Tony Wei
>
> 2017-08-29 12:26 GMT+08:00 Isuru Suriarachchi <is...@gmail.com>:
>
>> Hi all,
>>
>> I'm trying to implement a Flink consumer which consumes a Kafka topic
>> with 3 partitions. I've set the parallelism of the execution environment to
>> 3 as I want to make sure that each Kafka partition is consumed by a
>> separate parallel task in Flink. My first question is whether it's always
>> guaranteed to have a one-to-one mapping between Kafka partitions and Flink
>> tasks in this setup?
>>
>> So far, I've just setup a single Kafka broker and created a topic with 3
>> partitions and tried to consume it from my flink application with
>> parallelism set to 3 (all on same machine). I see 3 parallel processes of
>> each operation being created on Flink log. However, when I execute the
>> Flink job, messages from all 3 Kafka partitions are consumed by a single
>> task (Process (3/3)). Other two parallel tasks are idling. Am I mission
>> something here? In addition to setting the parallelism, is there any other
>> configuration that I have to do here?
>>
>> Here are the details about my setup.
>>
>> Kafka version: 0.10.2.1
>> Flink version: 1.3.1
>> Connector: FlinkKafkaConsumer010
>>
>> Thanks,
>> Isuru
>>
>
>

Re: Consuming a Kafka topic with multiple partitions from Flink

Posted by Tony Wei <to...@gmail.com>.
Hi Isuru,

The way to assign partitions by FlinkKafkaConsumer can refer to this java
document. (
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.html
)
That means your partitions should have increasing id one by one, then each
subtask would be assigned equally.
If your partition ids are 0, 3, 6, then only one subtask was assigned with
parallelism set to 3.

In my case, I created a new topic with two partitions which id are 0 and 1,
and my consumer with 2 subtasks were assigned fairly with one to one
mapping.
You can check your topic's  setup or create another topic to try this out.

Hope this will help you.

Best Regards,
Tony Wei

2017-08-29 12:26 GMT+08:00 Isuru Suriarachchi <is...@gmail.com>:

> Hi all,
>
> I'm trying to implement a Flink consumer which consumes a Kafka topic with
> 3 partitions. I've set the parallelism of the execution environment to 3 as
> I want to make sure that each Kafka partition is consumed by a separate
> parallel task in Flink. My first question is whether it's always guaranteed
> to have a one-to-one mapping between Kafka partitions and Flink tasks in
> this setup?
>
> So far, I've just setup a single Kafka broker and created a topic with 3
> partitions and tried to consume it from my flink application with
> parallelism set to 3 (all on same machine). I see 3 parallel processes of
> each operation being created on Flink log. However, when I execute the
> Flink job, messages from all 3 Kafka partitions are consumed by a single
> task (Process (3/3)). Other two parallel tasks are idling. Am I mission
> something here? In addition to setting the parallelism, is there any other
> configuration that I have to do here?
>
> Here are the details about my setup.
>
> Kafka version: 0.10.2.1
> Flink version: 1.3.1
> Connector: FlinkKafkaConsumer010
>
> Thanks,
> Isuru
>