You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christophe Jolif <cj...@gmail.com> on 2018/02/03 17:53:42 UTC

Kafka and parallelism

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink
job parallelism to the number of partions or will it adjust automatically
accordingly? In other word if I don't call setParallelism will get 1 or the
number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single
topic and the number of actual topic (and so partitions) behind the pattern
can change so it is not possible to know ahead ot time how many partitions
I will get.

Thanks!
-- 
Christophe

Re: Kafka and parallelism

Posted by Christophe Jolif <cj...@gmail.com>.
Ok thanks! I should have seen this. Sorry.

--
Christophe

On Wed, Feb 7, 2018 at 10:27 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Christophe,
>
> Yes, you can achieve writing to different topics per-message using the
> `KeyedSerializationSchema` provided to the Kafka producer.
> The schema interface has a `getTargetTopic` method which allows you to
> override the default target topic for a given record.
> I agree that the method is somewhat odd to be part of the serialization
> schema, so I have also been thinking about moving that elsewhere (maybe as
> part of the partitioner).
>
> If you want to route a record to some topic depending on which topic it
> came from on the consumer side, you’ll have to wrap the source topic
> information within the records so that it is available to the producer.
> You can access that in the `KeyedDeserializationSchema#deserialize`
> method, which exposes information about which topic and partition each
> record came from.
>
> Cheers,
> Gordon
>
> On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjolif@gmail.com)
> wrote:
>
> Hi Gordon, or anyone else reading this,
>
> Still on this idea that I consume a Kafka topic pattern.
>
> I want to then to sink the result of the processing in a set of topics
> depending on from where the original message came from (i.e. if this comes
> from origin-topic-1 I will serialize the result in destination-topic-1, if
> from topic-2 to topic-2 etc...). However the KafkaProducer is working on a
> fixed topic. You can provide a partitioning function
> (FlinkKafkaPartitioner) but not a "topic" function that would allow to
> decide to witch topic sending the message a bit like a BucketingSink would
> decide the bucket or ElasticsearchSinkFunction allows you to choose the
> index.
>
> Am I missing something? The reason I'm asking is that some of the sink
> ctor are talking about "defaultTopicId" and some about "topicId" just like
> if in some case there was some ability to override the topic. Is there
> there a feature that allows me to do that?
>
> If not do you think this would be a worthwhile addition?
>
> Thanks again,
> --
> Christophe
>
> On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Christophe,
>>
>> You can set the parallelism of the FlinkKafkaConsumer independently of
>> the total number of Kafka partitions (across all subscribed streams,
>> including newly created streams that match a subscribed pattern).
>>
>> The consumer deterministically assigns each partition to a single
>> consumer subtask, in a round-robin fashion.
>> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
>> partitions, each consumer subtask will be assigned 3 partitions.
>>
>> As for topic pattern subscription, FlinkKafkaConsumers starting from
>> version 1.4.0 support this feature. You can take a look at [1] on how to do
>> that.
>>
>> Hope this helps!
>>
>> Cheers,
>> Gordon
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>> 1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-
>> partition-discovery
>>
>> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjolif@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> If I'm sourcing from a KafkaConsumer do I have to explicitly set the
>> Flink job parallelism to the number of partions or will it adjust
>> automatically accordingly? In other word if I don't call setParallelism
>> will get 1 or the number of partitions?
>>
>> The reason I'm asking is that I'm listening to a topic pattern not a
>> single topic and the number of actual topic (and so partitions) behind the
>> pattern can change so it is not possible to know ahead ot time how many
>> partitions I will get.
>>
>> Thanks!
>> --
>> Christophe
>>
>>
>
>
> --
> Christophe
>
>


-- 
Christophe

Re: Kafka and parallelism

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Christophe,

Yes, you can achieve writing to different topics per-message using the `KeyedSerializationSchema` provided to the Kafka producer.
The schema interface has a `getTargetTopic` method which allows you to override the default target topic for a given record.
I agree that the method is somewhat odd to be part of the serialization schema, so I have also been thinking about moving that elsewhere (maybe as part of the partitioner).

If you want to route a record to some topic depending on which topic it came from on the consumer side, you’ll have to wrap the source topic information within the records so that it is available to the producer.
You can access that in the `KeyedDeserializationSchema#deserialize` method, which exposes information about which topic and partition each record came from.

Cheers,
Gordon

On 7 February 2018 at 9:40:50 AM, Christophe Jolif (cjolif@gmail.com) wrote:

Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern. 

I want to then to sink the result of the processing in a set of topics depending on from where the original message came from (i.e. if this comes from origin-topic-1 I will serialize the result in destination-topic-1, if from topic-2 to topic-2 etc...). However the KafkaProducer is working on a fixed topic. You can provide a partitioning function (FlinkKafkaPartitioner) but not a "topic" function that would allow to decide to witch topic sending the message a bit like a BucketingSink would decide the bucket or ElasticsearchSinkFunction allows you to choose the index. 

Am I missing something? The reason I'm asking is that some of the sink ctor are talking about "defaultTopicId" and some about "topicId" just like if in some case there was some ability to override the topic. Is there there a feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjolif@gmail.com) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe



--
Christophe

Re: Kafka and parallelism

Posted by Christophe Jolif <cj...@gmail.com>.
Hi Gordon, or anyone else reading this,

Still on this idea that I consume a Kafka topic pattern.

I want to then to sink the result of the processing in a set of topics
depending on from where the original message came from (i.e. if this comes
from origin-topic-1 I will serialize the result in destination-topic-1, if
from topic-2 to topic-2 etc...). However the KafkaProducer is working on a
fixed topic. You can provide a partitioning function
(FlinkKafkaPartitioner) but not a "topic" function that would allow to
decide to witch topic sending the message a bit like a BucketingSink would
decide the bucket or ElasticsearchSinkFunction allows you to choose the
index.

Am I missing something? The reason I'm asking is that some of the sink ctor
are talking about "defaultTopicId" and some about "topicId" just like if in
some case there was some ability to override the topic. Is there there a
feature that allows me to do that?

If not do you think this would be a worthwhile addition?

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Christophe,
>
> You can set the parallelism of the FlinkKafkaConsumer independently of the
> total number of Kafka partitions (across all subscribed streams, including
> newly created streams that match a subscribed pattern).
>
> The consumer deterministically assigns each partition to a single consumer
> subtask, in a round-robin fashion.
> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
> partitions, each consumer subtask will be assigned 3 partitions.
>
> As for topic pattern subscription, FlinkKafkaConsumers starting from
> version 1.4.0 support this feature. You can take a look at [1] on how to do
> that.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> topic-and-partition-discovery
>
> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjolif@gmail.com)
> wrote:
>
> Hi,
>
> If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink
> job parallelism to the number of partions or will it adjust automatically
> accordingly? In other word if I don't call setParallelism will get 1 or the
> number of partitions?
>
> The reason I'm asking is that I'm listening to a topic pattern not a
> single topic and the number of actual topic (and so partitions) behind the
> pattern can change so it is not possible to know ahead ot time how many
> partitions I will get.
>
> Thanks!
> --
> Christophe
>
>


-- 
Christophe

Re: Kafka and parallelism

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Yes, the answer to that would be no.

If you do not explicitly set a parallelism for the consumer, the parallelism by default will be whatever the parallelism of the job is, and is independent of how many Kafka partitions there are.

Cheers,
Gordon

On 5 February 2018 at 11:42:21 AM, Christophe Jolif (cjolif@gmail.com) wrote:

Thanks. It helps indeed.

I guess the last point it does not explicitly answer is "does just creating a kafka consumer reading from multiple partition set the parallelism to the number of partitions". But reading between the lines I think this answer is clearly no. You have to set your parallelism yourself and then it will round robin between them.

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjolif@gmail.com) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe




Re: Kafka and parallelism

Posted by Christophe Jolif <cj...@gmail.com>.
Thanks. It helps indeed.

I guess the last point it does not explicitly answer is "does just creating
a kafka consumer reading from multiple partition set the parallelism to the
number of partitions". But reading between the lines I think this answer is
clearly no. You have to set your parallelism yourself and then it will
round robin between them.

Thanks again,
--
Christophe

On Mon, Feb 5, 2018 at 9:52 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Christophe,
>
> You can set the parallelism of the FlinkKafkaConsumer independently of the
> total number of Kafka partitions (across all subscribed streams, including
> newly created streams that match a subscribed pattern).
>
> The consumer deterministically assigns each partition to a single consumer
> subtask, in a round-robin fashion.
> E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6
> partitions, each consumer subtask will be assigned 3 partitions.
>
> As for topic pattern subscription, FlinkKafkaConsumers starting from
> version 1.4.0 support this feature. You can take a look at [1] on how to do
> that.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> topic-and-partition-discovery
>
> On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjolif@gmail.com)
> wrote:
>
> Hi,
>
> If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink
> job parallelism to the number of partions or will it adjust automatically
> accordingly? In other word if I don't call setParallelism will get 1 or the
> number of partitions?
>
> The reason I'm asking is that I'm listening to a topic pattern not a
> single topic and the number of actual topic (and so partitions) behind the
> pattern can change so it is not possible to know ahead ot time how many
> partitions I will get.
>
> Thanks!
> --
> Christophe
>
>

Re: Kafka and parallelism

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Christophe,

You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern).

The consumer deterministically assigns each partition to a single consumer subtask, in a round-robin fashion.
E.g. if the parallelism of your FlinkKafkaConsumer is 2, and there is 6 partitions, each consumer subtask will be assigned 3 partitions.

As for topic pattern subscription, FlinkKafkaConsumers starting from version 1.4.0 support this feature. You can take a look at [1] on how to do that.

Hope this helps!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

On 3 February 2018 at 6:53:47 PM, Christophe Jolif (cjolif@gmail.com) wrote:

Hi,

If I'm sourcing from a KafkaConsumer do I have to explicitly set the Flink job parallelism to the number of partions or will it adjust automatically accordingly? In other word if I don't call setParallelism will get 1 or the number of partitions?

The reason I'm asking is that I'm listening to a topic pattern not a single topic and the number of actual topic (and so partitions) behind the pattern can change so it is not possible to know ahead ot time how many partitions I will get.

Thanks!
--
Christophe