You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2021/09/28 16:32:06 UTC

Re: Questions about keyed streams

Hi!  I'm just getting back to this.

Questions:
1. Across operators, does the same key group ids get mapped to the same
task managers?  E.g. if an item is in key group 1 of operator A and that
runs on taskmanager-0, will key group 1 of operator B also run on
taskmanager-0?
2. Are there any internal optimizations when shuffling to perform a forward
when the key group of the output row will hash to the same machine?



On Thu, Jul 29, 2021 at 4:49 AM Arvid Heise <ar...@apache.org> wrote:

> Afaik you can express the partition key in Table API now which will be
> used for co-location and optimization. So I'd probably give that a try
> first and convert the Table to DataStream where needed.
>
> On Sat, Jul 24, 2021 at 9:22 PM Dan Hill <qu...@gmail.com> wrote:
>
>> Thanks Fabian and Senhong!
>>
>> Here's an example diagram of the join that I want to do.  There are more
>> layers of joins.
>>
>> https://docs.google.com/presentation/d/17vYTBUIgrdxuYyEYXrSHypFhwwS7NdbyhVgioYMxPWc/edit#slide=id.p
>>
>> 1) Thanks!  I'll look into these.
>>
>> 2) I'm using the same key across multiple Kafka topics.  I can change the
>> producers and consumers to write to whatever partitions that would help.
>>
>> The job is pretty simple right now.  No optimizations.  We're currently
>> running this on one task manager.  The goal of the email was to start
>> thinking about optimizations.  If the usual practice is to let Flink
>> regroup the kafka sources on input, how do teams deal with the serde
>> overhead of this?  Just factor it into overhead and allocate more resources?
>>
>>
>>
>>
>> On Fri, Jul 23, 2021 at 3:21 AM Senhong Liu <se...@gmail.com> wrote:
>>
>>> Hi Dan,
>>>
>>> 1) If the key doesn’t change in the downstream operators and you want to
>>> avoid shuffling, maybe the DataStreamUtils#reinterpretAsKeyedStream would
>>> be helpful.
>>>
>>> 2) I am not sure that if you are saying that the data are already
>>> partitioned in the Kafka and you want to avoid shuffling in the Flink
>>> because of reusing keyBy(). One solution is that you can try to partition
>>> your data in the Kafka as if it was partitioned in the Flink when using
>>> keyBy(). After that, feel free to
>>> use  DataStreamUtils#reinterpretAsKeyedStream!
>>>
>>> If your use case is not what I described above, maybe you can provide us
>>> more information.
>>>
>>> Best,
>>> Senhong
>>>
>>> Sent with a Spark <https://sparkmailapp.com/source?from=signature>
>>> On Jul 22, 2021, 7:33 AM +0800, Dan Hill <qu...@gmail.com>, wrote:
>>>
>>> Hi.
>>>
>>> 1) If I use the same key in downstream operators (my key is a user id),
>>> will the rows stay on the same TaskManager machine?  I join in more info
>>> based on the user id as the key.  I'd like for these to stay on the same
>>> machine rather than shuffle a bunch of user-specific info to multiple task
>>> manager machines.
>>>
>>> 2) What are best practices to reduce the number of shuffles when having
>>> multiple kafka topics with similar keys (user id).  E.g. should I make make
>>> sure the same key writes to the same partition number and then manually
>>> which flink tasks get which kafka partitions?
>>>
>>>