You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yegor Roganov <ye...@gmail.com> on 2021/04/28 09:11:09 UTC

Key by Kafka partition / Kinesis shard

Hello

To learn Flink I'm trying to build a simple application where I want to
save events coming from Kinesis to S3.
I want to subscribe to each shard, and within each shard I want to batch
for 30 seconds, or until 1000 events are observed. These batches should
then be uploaded to S3.
What I don't understand is how to key my source on shard id, and do it in a
way that doesn't induce unnecessary shuffling.
Is this possible with Flink?

Re: Key by Kafka partition / Kinesis shard

Posted by Yegor Roganov <ye...@gmail.com>.
Hi Till, thank you for your reply.

> What you can do, though, is to create a custom operator or use a flatMap
to build your own windowing operator.
Since my stream wouldn't be keyed, does this mean that I would need to use
"Managed Operator State" (aka raw state)?

On Thu, Apr 29, 2021 at 10:34 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Yegor,
>
> If you want to use Flink's keyed windowing logic, then you need to insert
> a keyBy/shuffle operation because Flink currently cannot simply use the
> partitioning of the Kinesis shards. The reason is that Flink needs to group
> the keys into the correct key groups in order to support rescaling of the
> state.
>
> What you can do, though, is to create a custom operator or use a flatMap
> to build your own windowing operator. This operator could then use the
> partitioning of the Kinesis shards by simply collecting the events until
> either 30 seconds or 1000 events are observed.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov <ye...@gmail.com>
> wrote:
>
>> Hello
>>
>> To learn Flink I'm trying to build a simple application where I want to
>> save events coming from Kinesis to S3.
>> I want to subscribe to each shard, and within each shard I want to batch
>> for 30 seconds, or until 1000 events are observed. These batches should
>> then be uploaded to S3.
>> What I don't understand is how to key my source on shard id, and do it in
>> a way that doesn't induce unnecessary shuffling.
>> Is this possible with Flink?
>>
>

Re: Key by Kafka partition / Kinesis shard

Posted by Till Rohrmann <tr...@apache.org>.
Yes you would have to use the operator state for this. This would have the
limitation that rescaling would probably not properly work. Also if the
assignment of shards to operators changes upon failure recovery it can
happen that it generates some incorrect results (some elements from shard 1
might end up on an operator which then consumes shard 2, for example).

Cheers,
Till

On Thu, Apr 29, 2021 at 2:51 PM Yegor Roganov <ye...@gmail.com> wrote:

> Hi Raghavendar, thank you for your reply.
>
> >
> stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new
> TestWindow());
> What would this stream be keyed on?
>
> On Thu, Apr 29, 2021 at 11:58 AM Raghavendar T S <ra...@gmail.com>
> wrote:
>
>> Hi Yegor
>>
>> The trigger implementation in Flink does not support  trigger by event
>> count and duration together. You can update the existing CountTrigger
>> implementation to support your functionality.
>> You can use the CustomTrigger.java (minor enhancement of CountTrigger) as
>> such which I have attached in this thread. TestWindow is the window
>> function which lets you receive the grouped events. You check the diff of
>> CountTrigger and CustomTrigger for your better understanding.
>>
>> *Usage*
>> stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new
>> TestWindow());
>>
>> Thank you
>> Raghavendar T S
>> merasplugins.com
>> teknosrc.com
>>
>>
>>
>>
>>
>>
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
>> www.avast.com
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>> <#m_-5636128505747520398_m_2491226419129409135_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>
>> On Thu, Apr 29, 2021 at 1:04 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Yegor,
>>>
>>> If you want to use Flink's keyed windowing logic, then you need to
>>> insert a keyBy/shuffle operation because Flink currently cannot simply use
>>> the partitioning of the Kinesis shards. The reason is that Flink needs to
>>> group the keys into the correct key groups in order to support rescaling of
>>> the state.
>>>
>>> What you can do, though, is to create a custom operator or use a flatMap
>>> to build your own windowing operator. This operator could then use the
>>> partitioning of the Kinesis shards by simply collecting the events until
>>> either 30 seconds or 1000 events are observed.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov <ye...@gmail.com>
>>> wrote:
>>>
>>>> Hello
>>>>
>>>> To learn Flink I'm trying to build a simple application where I want to
>>>> save events coming from Kinesis to S3.
>>>> I want to subscribe to each shard, and within each shard I want to
>>>> batch for 30 seconds, or until 1000 events are observed. These batches
>>>> should then be uploaded to S3.
>>>> What I don't understand is how to key my source on shard id, and do it
>>>> in a way that doesn't induce unnecessary shuffling.
>>>> Is this possible with Flink?
>>>>
>>>
>>
>> --
>> Raghavendar T S
>> www.teknosrc.com
>>
>

Re: Key by Kafka partition / Kinesis shard

Posted by Yegor Roganov <ye...@gmail.com>.
Hi Raghavendar, thank you for your reply.

>
stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new
TestWindow());
What would this stream be keyed on?

On Thu, Apr 29, 2021 at 11:58 AM Raghavendar T S <ra...@gmail.com>
wrote:

> Hi Yegor
>
> The trigger implementation in Flink does not support  trigger by event
> count and duration together. You can update the existing CountTrigger
> implementation to support your functionality.
> You can use the CustomTrigger.java (minor enhancement of CountTrigger) as
> such which I have attached in this thread. TestWindow is the window
> function which lets you receive the grouped events. You check the diff of
> CountTrigger and CustomTrigger for your better understanding.
>
> *Usage*
> stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new
> TestWindow());
>
> Thank you
> Raghavendar T S
> merasplugins.com
> teknosrc.com
>
>
>
>
>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> Virus-free.
> www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> <#m_2491226419129409135_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> On Thu, Apr 29, 2021 at 1:04 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Yegor,
>>
>> If you want to use Flink's keyed windowing logic, then you need to insert
>> a keyBy/shuffle operation because Flink currently cannot simply use the
>> partitioning of the Kinesis shards. The reason is that Flink needs to group
>> the keys into the correct key groups in order to support rescaling of the
>> state.
>>
>> What you can do, though, is to create a custom operator or use a flatMap
>> to build your own windowing operator. This operator could then use the
>> partitioning of the Kinesis shards by simply collecting the events until
>> either 30 seconds or 1000 events are observed.
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov <ye...@gmail.com>
>> wrote:
>>
>>> Hello
>>>
>>> To learn Flink I'm trying to build a simple application where I want to
>>> save events coming from Kinesis to S3.
>>> I want to subscribe to each shard, and within each shard I want to batch
>>> for 30 seconds, or until 1000 events are observed. These batches should
>>> then be uploaded to S3.
>>> What I don't understand is how to key my source on shard id, and do it
>>> in a way that doesn't induce unnecessary shuffling.
>>> Is this possible with Flink?
>>>
>>
>
> --
> Raghavendar T S
> www.teknosrc.com
>

Re: Key by Kafka partition / Kinesis shard

Posted by Raghavendar T S <ra...@gmail.com>.
Hi Yegor

The trigger implementation in Flink does not support  trigger by event
count and duration together. You can update the existing CountTrigger
implementation to support your functionality.
You can use the CustomTrigger.java (minor enhancement of CountTrigger) as
such which I have attached in this thread. TestWindow is the window
function which lets you receive the grouped events. You check the diff of
CountTrigger and CustomTrigger for your better understanding.

*Usage*
stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new
TestWindow());

Thank you
Raghavendar T S
merasplugins.com
teknosrc.com





<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
Virus-free.
www.avast.com
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

On Thu, Apr 29, 2021 at 1:04 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Yegor,
>
> If you want to use Flink's keyed windowing logic, then you need to insert
> a keyBy/shuffle operation because Flink currently cannot simply use the
> partitioning of the Kinesis shards. The reason is that Flink needs to group
> the keys into the correct key groups in order to support rescaling of the
> state.
>
> What you can do, though, is to create a custom operator or use a flatMap
> to build your own windowing operator. This operator could then use the
> partitioning of the Kinesis shards by simply collecting the events until
> either 30 seconds or 1000 events are observed.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov <ye...@gmail.com>
> wrote:
>
>> Hello
>>
>> To learn Flink I'm trying to build a simple application where I want to
>> save events coming from Kinesis to S3.
>> I want to subscribe to each shard, and within each shard I want to batch
>> for 30 seconds, or until 1000 events are observed. These batches should
>> then be uploaded to S3.
>> What I don't understand is how to key my source on shard id, and do it in
>> a way that doesn't induce unnecessary shuffling.
>> Is this possible with Flink?
>>
>

-- 
Raghavendar T S
www.teknosrc.com

Re: Key by Kafka partition / Kinesis shard

Posted by Till Rohrmann <tr...@apache.org>.
Hi Yegor,

If you want to use Flink's keyed windowing logic, then you need to insert a
keyBy/shuffle operation because Flink currently cannot simply use the
partitioning of the Kinesis shards. The reason is that Flink needs to group
the keys into the correct key groups in order to support rescaling of the
state.

What you can do, though, is to create a custom operator or use a flatMap to
build your own windowing operator. This operator could then use the
partitioning of the Kinesis shards by simply collecting the events until
either 30 seconds or 1000 events are observed.

Cheers,
Till

On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov <ye...@gmail.com> wrote:

> Hello
>
> To learn Flink I'm trying to build a simple application where I want to
> save events coming from Kinesis to S3.
> I want to subscribe to each shard, and within each shard I want to batch
> for 30 seconds, or until 1000 events are observed. These batches should
> then be uploaded to S3.
> What I don't understand is how to key my source on shard id, and do it in
> a way that doesn't induce unnecessary shuffling.
> Is this possible with Flink?
>