You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2017/01/05 17:37:57 UTC

Kafka KeyedStream source

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the
same sessionId into the same Kafka partition. That way I already have all
events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which
I have to do a keyBy before my processing can continue. Such a keyBy will
redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that
immediately produces a keyed data stream?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Kafka KeyedStream source

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Niels,

I was more talking from a theoretical point of view.
Flink does not have a hook to inject a custom hash function (yet). I'm not
familiar with the details of the implementation to make an assessment
whether this would be possible or how much work it would be. However,
several users have asked about this feature. So there is definitely
interest for this.

Best, Fabian



2017-01-18 16:42 GMT+01:00 Niels Basjes <Ni...@basjes.nl>:

> Hi,
>
> > However, if you would like to keyBy the original key attribute, Flink
> would need to have access to the hash function that was used to assign
> events to partitions.
>
> So if my producing application and my consuming application use the same
> source attributes AND the same hashing function to determine the partition
> it should be able to make to work.
> Right now I simply provide the 'sessionId' to Kafka and thenit is probably
> a hashing function IN kafka that does the magic.
> I'm not sure if we can control that enough with Kafka right now.
>
>
> Niels
>
> On Mon, Jan 16, 2017 at 10:15 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Niels,
>>
>> I think the biggest problem for keyed sources is that Flink must be able
>> to co-locate key-partitioned state with the pre-partitioned data.
>>
>> This might work, if the key is the partition ID, i.e, not the original
>> key attribue that was hashed to assign events to partitions.
>> Flink could need to distribute topic partitions to source functions based
>> on its own hash function.
>>
>> However, if you would like to keyBy the original key attribute, Flink
>> would need to have access to the hash function that was used to assign
>> events to partitions.
>>
>> Best,
>> Fabian
>>
>> 2017-01-15 21:48 GMT+01:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>>
>>> Hi Niels,
>>>
>>> If it’s only for simple data filtering that does not depend on the key,
>>> a simple “flatMap” or “filter" directly after the source can be chained to
>>> the source instances.
>>> What that does is that the filter processing will be done within the
>>> same thread as the one fetching data from a Kafka partition, hence no
>>> excessive network transfers for this simple filtering.
>>> You can read more about operator chaining here: https://ci.apache.org/pr
>>> ojects/flink/flink-docs-release-1.2/concepts/runtime.html#ta
>>> sks-and-operator-chains
>>>
>>> So, what that sums up to is that you have a FlinkKafkaConsumer as
>>> source, do a filter transformation right after, and then a keyBy followed
>>> with your heavy-processing, key-wise computations.
>>> Does that makes sense for what you have in mind?
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On January 11, 2017 at 4:11:26 PM, Niels Basjes (niels@basjes.nl) wrote:
>>>
>>> Hi,
>>>
>>> Ok. I think I get it.
>>>
>>> WHAT IF:
>>> Assume we create a addKeyedSource(...) which will allow us to add a
>>> source that makes some guarantees about the data.
>>> And assume this source returns simply the Kafka partition id as the
>>> result of this 'hash' function.
>>> Then if I have 10 kafka partitions I would read these records in and I
>>> could filter the data more efficiently because the data would not need to
>>> go over the network before this filter.
>>> Afterwards I can scale it up to 'many' tasks for the heavier processing
>>> that follows.
>>>
>>> As a concept: Could that be made to work?
>>>
>>> Niels
>>>
>>> On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>>> > wrote:
>>>
>>>> Hi Niels,
>>>>
>>>> Thank you for bringing this up. I recall there was some previous
>>>> discussion related to this before: [1].
>>>>
>>>> I don’t think this is possible at the moment, mainly because of how the
>>>> API is designed.
>>>>
>>>> On the other hand, a KeyedStream in Flink is basically just a
>>>> DataStream with a hash partitioner that is used when deciding which
>>>> instance of the following downstream operator an emitted record of the
>>>> stream is sent to.
>>>> So, even if we have a Kafka source that directly produces a KeyedStream
>>>> on “addSource”, redistribution of data can still happen. I.e., if the
>>>> parallelism of the compute operators right after is different than the
>>>> number of Kafka partitions, redistribution will happen to let the key space
>>>> and state be evenly distributed in Flink.
>>>>
>>>> This leads to the argument that we probably need to think about whether
>>>> retaining the original partitioning of records in Kafka when consumed by
>>>> Flink is actually only a special case.
>>>> Flink, as a parallel compute engine, can freely adjust the parallelism
>>>> of its operators regardless of the parallelism of Kafka topics (rescaling
>>>> isn’t actually in yet, but is on the near-future roadmap).
>>>>
>>>> So, under the general case, the parallelism of a Flink operator may be
>>>> different than the number of Kafka partitions, and therefore redistributing
>>>> must occur.
>>>> For redistribution to not need to take place right after an already
>>>> partitioned Kafka topic, you’d need identical numbers of 1) Kafka
>>>> partitions, 2) Flink source instances consuming the partitions, and 3) the
>>>> parallelism of the keyed computation afterwards. This seems like a very
>>>> specific situation, considering that you’ll be able to rescale Flink
>>>> operators as the data’s key space / volume grows.
>>>>
>>>> The main observation, I think, is that Flink itself maintains how the
>>>> key space is partitioned within the system, which plays a crucial part in
>>>> rescaling. That’s why by default it doesn’t respect existing partitioning
>>>> of the key space in Kafka (or other external sources). Even if it initially
>>>> does at the beginning of a job, partitioning will most likely change as you
>>>> rescale your job / operators (which is a good thing, to be able to adapt).
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> [1] http://apache-flink-mailing-list-archive.1008284.n3.nabb
>>>> le.com/kafka-partition-assignment-td12123.html
>>>>
>>>> On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:
>>>>
>>>> Hi,
>>>>
>>>> In my scenario I have click stream data that I persist in Kafka.
>>>> I use the sessionId as the key to instruct Kafka to put everything with
>>>> the same sessionId into the same Kafka partition. That way I already have
>>>> all events of a visitor in a single kafka partition in a fixed order.
>>>>
>>>> When I read this data into Flink I get a generic data stream ontop of
>>>> which I have to do a keyBy before my processing can continue. Such a keyBy
>>>> will redistribute the data again to later tasks that can do the actual work.
>>>>
>>>> Is it possible to create an adapted version of the Kafka source that
>>>> immediately produces a keyed data stream?
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Kafka KeyedStream source

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

> However, if you would like to keyBy the original key attribute, Flink
would need to have access to the hash function that was used to assign
events to partitions.

So if my producing application and my consuming application use the same
source attributes AND the same hashing function to determine the partition
it should be able to make to work.
Right now I simply provide the 'sessionId' to Kafka and thenit is probably
a hashing function IN kafka that does the magic.
I'm not sure if we can control that enough with Kafka right now.


Niels

On Mon, Jan 16, 2017 at 10:15 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Niels,
>
> I think the biggest problem for keyed sources is that Flink must be able
> to co-locate key-partitioned state with the pre-partitioned data.
>
> This might work, if the key is the partition ID, i.e, not the original key
> attribue that was hashed to assign events to partitions.
> Flink could need to distribute topic partitions to source functions based
> on its own hash function.
>
> However, if you would like to keyBy the original key attribute, Flink
> would need to have access to the hash function that was used to assign
> events to partitions.
>
> Best,
> Fabian
>
> 2017-01-15 21:48 GMT+01:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>
>> Hi Niels,
>>
>> If it’s only for simple data filtering that does not depend on the key, a
>> simple “flatMap” or “filter" directly after the source can be chained to
>> the source instances.
>> What that does is that the filter processing will be done within the same
>> thread as the one fetching data from a Kafka partition, hence no excessive
>> network transfers for this simple filtering.
>> You can read more about operator chaining here: https://ci.apache.org/pr
>> ojects/flink/flink-docs-release-1.2/concepts/runtime.html#
>> tasks-and-operator-chains
>>
>> So, what that sums up to is that you have a FlinkKafkaConsumer as source,
>> do a filter transformation right after, and then a keyBy followed with your
>> heavy-processing, key-wise computations.
>> Does that makes sense for what you have in mind?
>>
>> Cheers,
>> Gordon
>>
>> On January 11, 2017 at 4:11:26 PM, Niels Basjes (niels@basjes.nl) wrote:
>>
>> Hi,
>>
>> Ok. I think I get it.
>>
>> WHAT IF:
>> Assume we create a addKeyedSource(...) which will allow us to add a
>> source that makes some guarantees about the data.
>> And assume this source returns simply the Kafka partition id as the
>> result of this 'hash' function.
>> Then if I have 10 kafka partitions I would read these records in and I
>> could filter the data more efficiently because the data would not need to
>> go over the network before this filter.
>> Afterwards I can scale it up to 'many' tasks for the heavier processing
>> that follows.
>>
>> As a concept: Could that be made to work?
>>
>> Niels
>>
>> On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi Niels,
>>>
>>> Thank you for bringing this up. I recall there was some previous
>>> discussion related to this before: [1].
>>>
>>> I don’t think this is possible at the moment, mainly because of how the
>>> API is designed.
>>>
>>> On the other hand, a KeyedStream in Flink is basically just a DataStream
>>> with a hash partitioner that is used when deciding which instance of the
>>> following downstream operator an emitted record of the stream is sent to.
>>> So, even if we have a Kafka source that directly produces a KeyedStream
>>> on “addSource”, redistribution of data can still happen. I.e., if the
>>> parallelism of the compute operators right after is different than the
>>> number of Kafka partitions, redistribution will happen to let the key space
>>> and state be evenly distributed in Flink.
>>>
>>> This leads to the argument that we probably need to think about whether
>>> retaining the original partitioning of records in Kafka when consumed by
>>> Flink is actually only a special case.
>>> Flink, as a parallel compute engine, can freely adjust the parallelism
>>> of its operators regardless of the parallelism of Kafka topics (rescaling
>>> isn’t actually in yet, but is on the near-future roadmap).
>>>
>>> So, under the general case, the parallelism of a Flink operator may be
>>> different than the number of Kafka partitions, and therefore redistributing
>>> must occur.
>>> For redistribution to not need to take place right after an already
>>> partitioned Kafka topic, you’d need identical numbers of 1) Kafka
>>> partitions, 2) Flink source instances consuming the partitions, and 3) the
>>> parallelism of the keyed computation afterwards. This seems like a very
>>> specific situation, considering that you’ll be able to rescale Flink
>>> operators as the data’s key space / volume grows.
>>>
>>> The main observation, I think, is that Flink itself maintains how the
>>> key space is partitioned within the system, which plays a crucial part in
>>> rescaling. That’s why by default it doesn’t respect existing partitioning
>>> of the key space in Kafka (or other external sources). Even if it initially
>>> does at the beginning of a job, partitioning will most likely change as you
>>> rescale your job / operators (which is a good thing, to be able to adapt).
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1] http://apache-flink-mailing-list-archive.1008284.n3.nabb
>>> le.com/kafka-partition-assignment-td12123.html
>>>
>>> On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:
>>>
>>> Hi,
>>>
>>> In my scenario I have click stream data that I persist in Kafka.
>>> I use the sessionId as the key to instruct Kafka to put everything with
>>> the same sessionId into the same Kafka partition. That way I already have
>>> all events of a visitor in a single kafka partition in a fixed order.
>>>
>>> When I read this data into Flink I get a generic data stream ontop of
>>> which I have to do a keyBy before my processing can continue. Such a keyBy
>>> will redistribute the data again to later tasks that can do the actual work.
>>>
>>> Is it possible to create an adapted version of the Kafka source that
>>> immediately produces a keyed data stream?
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Kafka KeyedStream source

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Niels,

I think the biggest problem for keyed sources is that Flink must be able to
co-locate key-partitioned state with the pre-partitioned data.

This might work, if the key is the partition ID, i.e, not the original key
attribue that was hashed to assign events to partitions.
Flink could need to distribute topic partitions to source functions based
on its own hash function.

However, if you would like to keyBy the original key attribute, Flink would
need to have access to the hash function that was used to assign events to
partitions.

Best,
Fabian

2017-01-15 21:48 GMT+01:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:

> Hi Niels,
>
> If it’s only for simple data filtering that does not depend on the key, a
> simple “flatMap” or “filter" directly after the source can be chained to
> the source instances.
> What that does is that the filter processing will be done within the same
> thread as the one fetching data from a Kafka partition, hence no excessive
> network transfers for this simple filtering.
> You can read more about operator chaining here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/concepts/runtime.
> html#tasks-and-operator-chains
>
> So, what that sums up to is that you have a FlinkKafkaConsumer as source,
> do a filter transformation right after, and then a keyBy followed with your
> heavy-processing, key-wise computations.
> Does that makes sense for what you have in mind?
>
> Cheers,
> Gordon
>
> On January 11, 2017 at 4:11:26 PM, Niels Basjes (niels@basjes.nl) wrote:
>
> Hi,
>
> Ok. I think I get it.
>
> WHAT IF:
> Assume we create a addKeyedSource(...) which will allow us to add a source
> that makes some guarantees about the data.
> And assume this source returns simply the Kafka partition id as the result
> of this 'hash' function.
> Then if I have 10 kafka partitions I would read these records in and I
> could filter the data more efficiently because the data would not need to
> go over the network before this filter.
> Afterwards I can scale it up to 'many' tasks for the heavier processing
> that follows.
>
> As a concept: Could that be made to work?
>
> Niels
>
> On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Niels,
>>
>> Thank you for bringing this up. I recall there was some previous
>> discussion related to this before: [1].
>>
>> I don’t think this is possible at the moment, mainly because of how the
>> API is designed.
>>
>> On the other hand, a KeyedStream in Flink is basically just a DataStream
>> with a hash partitioner that is used when deciding which instance of the
>> following downstream operator an emitted record of the stream is sent to.
>> So, even if we have a Kafka source that directly produces a KeyedStream
>> on “addSource”, redistribution of data can still happen. I.e., if the
>> parallelism of the compute operators right after is different than the
>> number of Kafka partitions, redistribution will happen to let the key space
>> and state be evenly distributed in Flink.
>>
>> This leads to the argument that we probably need to think about whether
>> retaining the original partitioning of records in Kafka when consumed by
>> Flink is actually only a special case.
>> Flink, as a parallel compute engine, can freely adjust the parallelism of
>> its operators regardless of the parallelism of Kafka topics (rescaling
>> isn’t actually in yet, but is on the near-future roadmap).
>>
>> So, under the general case, the parallelism of a Flink operator may be
>> different than the number of Kafka partitions, and therefore redistributing
>> must occur.
>> For redistribution to not need to take place right after an already
>> partitioned Kafka topic, you’d need identical numbers of 1) Kafka
>> partitions, 2) Flink source instances consuming the partitions, and 3) the
>> parallelism of the keyed computation afterwards. This seems like a very
>> specific situation, considering that you’ll be able to rescale Flink
>> operators as the data’s key space / volume grows.
>>
>> The main observation, I think, is that Flink itself maintains how the key
>> space is partitioned within the system, which plays a crucial part in
>> rescaling. That’s why by default it doesn’t respect existing partitioning
>> of the key space in Kafka (or other external sources). Even if it initially
>> does at the beginning of a job, partitioning will most likely change as you
>> rescale your job / operators (which is a good thing, to be able to adapt).
>>
>> Cheers,
>> Gordon
>>
>> [1] http://apache-flink-mailing-list-archive.1008284.n3.
>> nabble.com/kafka-partition-assignment-td12123.html
>>
>> On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:
>>
>> Hi,
>>
>> In my scenario I have click stream data that I persist in Kafka.
>> I use the sessionId as the key to instruct Kafka to put everything with
>> the same sessionId into the same Kafka partition. That way I already have
>> all events of a visitor in a single kafka partition in a fixed order.
>>
>> When I read this data into Flink I get a generic data stream ontop of
>> which I have to do a keyBy before my processing can continue. Such a keyBy
>> will redistribute the data again to later tasks that can do the actual work.
>>
>> Is it possible to create an adapted version of the Kafka source that
>> immediately produces a keyed data stream?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>

Re: Kafka KeyedStream source

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a simple “flatMap” or “filter" directly after the source can be chained to the source instances.
What that does is that the filter processing will be done within the same thread as the one fetching data from a Kafka partition, hence no excessive network transfers for this simple filtering.
You can read more about operator chaining here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#tasks-and-operator-chains

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a filter transformation right after, and then a keyBy followed with your heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?

Cheers,
Gordon

On January 11, 2017 at 4:11:26 PM, Niels Basjes (niels@basjes.nl) wrote:

Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I could filter the data more efficiently because the data would not need to go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that follows.

As a concept: Could that be made to work?

Niels 

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Kafka KeyedStream source

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi,

Ok. I think I get it.

WHAT IF:
Assume we create a addKeyedSource(...) which will allow us to add a source
that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result
of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I
could filter the data more efficiently because the data would not need to
go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing
that follows.

As a concept: Could that be made to work?

Niels

On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Niels,
>
> Thank you for bringing this up. I recall there was some previous
> discussion related to this before: [1].
>
> I don’t think this is possible at the moment, mainly because of how the
> API is designed.
>
> On the other hand, a KeyedStream in Flink is basically just a DataStream
> with a hash partitioner that is used when deciding which instance of the
> following downstream operator an emitted record of the stream is sent to.
> So, even if we have a Kafka source that directly produces a KeyedStream on
> “addSource”, redistribution of data can still happen. I.e., if the
> parallelism of the compute operators right after is different than the
> number of Kafka partitions, redistribution will happen to let the key space
> and state be evenly distributed in Flink.
>
> This leads to the argument that we probably need to think about whether
> retaining the original partitioning of records in Kafka when consumed by
> Flink is actually only a special case.
> Flink, as a parallel compute engine, can freely adjust the parallelism of
> its operators regardless of the parallelism of Kafka topics (rescaling
> isn’t actually in yet, but is on the near-future roadmap).
>
> So, under the general case, the parallelism of a Flink operator may be
> different than the number of Kafka partitions, and therefore redistributing
> must occur.
> For redistribution to not need to take place right after an already
> partitioned Kafka topic, you’d need identical numbers of 1) Kafka
> partitions, 2) Flink source instances consuming the partitions, and 3) the
> parallelism of the keyed computation afterwards. This seems like a very
> specific situation, considering that you’ll be able to rescale Flink
> operators as the data’s key space / volume grows.
>
> The main observation, I think, is that Flink itself maintains how the key
> space is partitioned within the system, which plays a crucial part in
> rescaling. That’s why by default it doesn’t respect existing partitioning
> of the key space in Kafka (or other external sources). Even if it initially
> does at the beginning of a job, partitioning will most likely change as you
> rescale your job / operators (which is a good thing, to be able to adapt).
>
> Cheers,
> Gordon
>
> [1] http://apache-flink-mailing-list-archive.1008284.
> n3.nabble.com/kafka-partition-assignment-td12123.html
>
> On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:
>
> Hi,
>
> In my scenario I have click stream data that I persist in Kafka.
> I use the sessionId as the key to instruct Kafka to put everything with
> the same sessionId into the same Kafka partition. That way I already have
> all events of a visitor in a single kafka partition in a fixed order.
>
> When I read this data into Flink I get a generic data stream ontop of
> which I have to do a keyBy before my processing can continue. Such a keyBy
> will redistribute the data again to later tasks that can do the actual work.
>
> Is it possible to create an adapted version of the Kafka source that
> immediately produces a keyed data stream?
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Kafka KeyedStream source

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner that is used when deciding which instance of the following downstream operator an emitted record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”, redistribution of data can still happen. I.e., if the parallelism of the compute operators right after is different than the number of Kafka partitions, redistribution will happen to let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future roadmap).

So, under the general case, the parallelism of a Flink operator may be different than the number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic, you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like a very specific situation, considering that you’ll be able to rescale Flink operators as the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t respect existing partitioning of the key space in Kafka (or other external sources). Even if it initially does at the beginning of a job, partitioning will most likely change as you rescale your job / operators (which is a good thing, to be able to adapt).

Cheers,
Gordon

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:

Hi,

In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId into the same Kafka partition. That way I already have all events of a visitor in a single kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a keyBy before my processing can continue. Such a keyBy will redistribute the data again to later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces a keyed data stream?
 

--
Best regards / Met vriendelijke groeten,

Niels Basjes