You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ruben Vargas <ru...@metova.com> on 2023/08/29 13:29:58 UTC

Issue with growing state/checkpoint size

Hello

I experimenting an issue with my beam pipeline

I have a pipeline in which I split the work into different branches, then I
do a join using CoGroupByKey, each message has its own unique Key.

For the Join, I used a Session Window, and discarding the messages after
trigger.

I'm using Flink Runner and deployed a KInesis application. But I'm
experiencing  an unbounded growth of the checkpoint data size. When I see
in Flink console, the  following task has the largest checkpoint

join_results/GBK -> ToGBKResult ->
join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V


Any Advice ?

Thank you very much!

Re: Issue with growing state/checkpoint size

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi,

Sorry for the late response - was busy with pretty similar problem.

First let me clarify on the pipeline You use first.

You have a pipeline with a step produces two stream outputs. They do 
flow thru some computation independently and then You join them by 
CoGroupByKey with session window and expect to have match in message id. 
So You are trying to achieve streaming inner join (which should emit 
output when same message id arrives on both streams). Is it right?

My finding:

1. sessions windows are very tricky to work with CoGroupByKey as they 
are merging windows (so beam may need to wait arbitrary long 
accumulating state if one of the streams continuously outputs same 
message id - this is first thing to check in You case). This is only one 
of potential corner cases with session windows. But I would highly 
advice to change this to just plain sliding window on computation time 
domain as it would allow You to clearly specify how long step should 
keep waiting for particular message id and how frequently it should emit.

2. There is also a case that one of the streams might not produce a 
message id for particular message. Double make sure that every message 
that is processes by You pipelines emits only one message per message id 
always. You also need to consider what to do with the state once pair of 
messages is joined and emitted. By default SessionWindows will wait for 
next messages with same message id accumulating state for some time 
after. This also might be the source of Your problem.

Now the solution that I worked for me in similar problem.

1. window both streams in FixedWidnows(1) on processing time domain. 
Make sure keys are set.

2. CoGroupByKey

3. window output of CoGroupByKey into GlobalWindows

4. Create state-full ParDoFn with timers (to do cleanup) that captures 
state if a message arrives on one stream, emits both if both messages 
arrived, does proper action (up to You) when next messages with same key 
arrive) and does the cleanup.

Explanation;

FixedWidnows - make sure that elements are groupped in 1s batches to 
asses join - this is just to meet requirement from CoGroupByKey to have 
windows defined on input streams.

CoGroupByKey  - does join messages that arrive on same second window

GlobalWindows - makes sure that state-full ParDoFn keeps state globally 
per key (as it will manage its state by itself).

state-full ParDoFn - does the waiting part for the message and contains 
all Your logic for the corner cases defined above.

above just worked perforce for me. Let me know If its a solution for 
Your case too. We might want to introduce some common api for such cases 
(its quite easy to code this in generic form).

Best

Wiśniowski Piotr

  On 1.09.2023 15:56, Byron Ellis via user wrote:
> Depends on why you're using a fan-out approach in the first place. You 
> might actually be better off doing all the work at the same time.
>
> On Fri, Sep 1, 2023 at 6:43 AM Ruben Vargas <ru...@metova.com> 
> wrote:
>
>     Ohh I see
>
>     That makes sense. Wondering if there is an strategy for my use
>     case, where I have an ID unique per pair of messages
>
>     Thanks for all your help!
>
>     On Fri, Sep 1, 2023 at 6:51 AM Sachin Mittal <sj...@gmail.com>
>     wrote:
>
>         Yes a very high and non deterministic cardinality can make the
>         stored state of join operation unbounded.
>         In my case we know the cardinality and it was not very high so
>         we could go with a lookup based approach using redis to enrich
>         the stream and avoid joins.
>
>
>
>         On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas
>         <ru...@metova.com> wrote:
>
>             Thanks for the reply and the advice
>
>             One more thing, Do you know if the key-space carnality
>             impacts on this? I'm assuming it is, but the thing is for
>             my case all the messages from the sources has a unique ID,
>             that makes my key-space huge and is not on my control .
>
>             On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal
>             <sj...@gmail.com> wrote:
>
>                 So for the smaller size of collection which does not
>                 grow with size for certain keys we stored the data in
>                 redis and instead of beam join in our DoFn we just did
>                 the lookup and got the data we need.
>
>
>                 On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas
>                 <ru...@metova.com> wrote:
>
>                     Hello,
>
>                     Thanks for the reply, Any strategy you followed to
>                     avoid joins when you rewrite your pipeline?
>
>
>
>                     On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal
>                     <sj...@gmail.com> wrote:
>
>                         Yes even we faced the same issue when trying
>                         to run a pipeline involving join of two
>                         collections. It was deployed using AWS KDA,
>                         which uses flink runner. The source was
>                         kinesis streams.
>
>                         Looks like join operations are not very
>                         efficient in terms of size management when run
>                         on flink.
>
>                         We had to rewrite our pipeline to avoid these
>                         joins.
>
>                         Thanks
>                         Sachin
>
>
>                         On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas
>                         <ru...@metova.com> wrote:
>
>                             Hello
>
>                             I experimenting an issue with my beam pipeline
>
>                             I have a pipeline in which I split the
>                             work into different branches, then I do a
>                             join using CoGroupByKey, each message has
>                             its own unique Key.
>
>                             For the Join, I used a Session Window, and
>                             discarding the messages after trigger.
>
>                             I'm using Flink Runner and deployed a
>                             KInesis application. But I'm experiencing 
>                             an unbounded growth of the checkpoint data
>                             size. When I see in Flink console, the 
>                             following task has the largest checkpoint
>
>                             join_results/GBK -> ToGBKResult ->
>                             join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult)
>                             -> V
>
>
>                             Any Advice ?
>
>                             Thank you very much!
>

Re: Issue with growing state/checkpoint size

Posted by Byron Ellis via user <us...@beam.apache.org>.
Depends on why you're using a fan-out approach in the first place. You
might actually be better off doing all the work at the same time.

On Fri, Sep 1, 2023 at 6:43 AM Ruben Vargas <ru...@metova.com> wrote:

> Ohh I see
>
> That makes sense. Wondering if there is an strategy for my use case, where
> I have an ID unique per pair of messages
>
> Thanks for all your help!
>
> On Fri, Sep 1, 2023 at 6:51 AM Sachin Mittal <sj...@gmail.com> wrote:
>
>> Yes a very high and non deterministic cardinality can make the stored
>> state of join operation unbounded.
>> In my case we know the cardinality and it was not very high so we could
>> go with a lookup based approach using redis to enrich the stream and avoid
>> joins.
>>
>>
>>
>> On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas <ru...@metova.com>
>> wrote:
>>
>>> Thanks for the reply and the advice
>>>
>>> One more thing, Do you know if the key-space carnality impacts on this?
>>> I'm assuming it is, but the thing is for my case all the messages from the
>>> sources has a unique ID, that makes my key-space huge and is not on my
>>> control .
>>>
>>> On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal <sj...@gmail.com>
>>> wrote:
>>>
>>>> So for the smaller size of collection which does not grow with size for
>>>> certain keys we stored the data in redis and instead of beam join in our
>>>> DoFn we just did the lookup and got the data we need.
>>>>
>>>>
>>>> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas <ru...@metova.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Thanks for the reply, Any strategy you followed to avoid joins when
>>>>> you rewrite your pipeline?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal <sj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes even we faced the same issue when trying to run a pipeline
>>>>>> involving join of two collections. It was deployed using AWS KDA, which
>>>>>> uses flink runner. The source was kinesis streams.
>>>>>>
>>>>>> Looks like join operations are not very efficient in terms of size
>>>>>> management when run on flink.
>>>>>>
>>>>>> We had to rewrite our pipeline to avoid these joins.
>>>>>>
>>>>>> Thanks
>>>>>> Sachin
>>>>>>
>>>>>>
>>>>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ru...@metova.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello
>>>>>>>
>>>>>>> I experimenting an issue with my beam pipeline
>>>>>>>
>>>>>>> I have a pipeline in which I split the work into different branches,
>>>>>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>>>>>
>>>>>>> For the Join, I used a Session Window, and discarding the messages
>>>>>>> after trigger.
>>>>>>>
>>>>>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>>>>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>>>>>> in Flink console, the  following task has the largest checkpoint
>>>>>>>
>>>>>>> join_results/GBK -> ToGBKResult ->
>>>>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>>>>>
>>>>>>>
>>>>>>> Any Advice ?
>>>>>>>
>>>>>>> Thank you very much!
>>>>>>>
>>>>>>>

Re: Issue with growing state/checkpoint size

Posted by Ruben Vargas <ru...@metova.com>.
Ohh I see

That makes sense. Wondering if there is an strategy for my use case, where
I have an ID unique per pair of messages

Thanks for all your help!

On Fri, Sep 1, 2023 at 6:51 AM Sachin Mittal <sj...@gmail.com> wrote:

> Yes a very high and non deterministic cardinality can make the stored
> state of join operation unbounded.
> In my case we know the cardinality and it was not very high so we could go
> with a lookup based approach using redis to enrich the stream and avoid
> joins.
>
>
>
> On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas <ru...@metova.com>
> wrote:
>
>> Thanks for the reply and the advice
>>
>> One more thing, Do you know if the key-space carnality impacts on this?
>> I'm assuming it is, but the thing is for my case all the messages from the
>> sources has a unique ID, that makes my key-space huge and is not on my
>> control .
>>
>> On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal <sj...@gmail.com> wrote:
>>
>>> So for the smaller size of collection which does not grow with size for
>>> certain keys we stored the data in redis and instead of beam join in our
>>> DoFn we just did the lookup and got the data we need.
>>>
>>>
>>> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas <ru...@metova.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Thanks for the reply, Any strategy you followed to avoid joins when you
>>>> rewrite your pipeline?
>>>>
>>>>
>>>>
>>>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal <sj...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yes even we faced the same issue when trying to run a pipeline
>>>>> involving join of two collections. It was deployed using AWS KDA, which
>>>>> uses flink runner. The source was kinesis streams.
>>>>>
>>>>> Looks like join operations are not very efficient in terms of size
>>>>> management when run on flink.
>>>>>
>>>>> We had to rewrite our pipeline to avoid these joins.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ru...@metova.com>
>>>>> wrote:
>>>>>
>>>>>> Hello
>>>>>>
>>>>>> I experimenting an issue with my beam pipeline
>>>>>>
>>>>>> I have a pipeline in which I split the work into different branches,
>>>>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>>>>
>>>>>> For the Join, I used a Session Window, and discarding the messages
>>>>>> after trigger.
>>>>>>
>>>>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>>>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>>>>> in Flink console, the  following task has the largest checkpoint
>>>>>>
>>>>>> join_results/GBK -> ToGBKResult ->
>>>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>>>>
>>>>>>
>>>>>> Any Advice ?
>>>>>>
>>>>>> Thank you very much!
>>>>>>
>>>>>>

Re: Issue with growing state/checkpoint size

Posted by Sachin Mittal <sj...@gmail.com>.
Yes a very high and non deterministic cardinality can make the stored state
of join operation unbounded.
In my case we know the cardinality and it was not very high so we could go
with a lookup based approach using redis to enrich the stream and avoid
joins.



On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas <ru...@metova.com>
wrote:

> Thanks for the reply and the advice
>
> One more thing, Do you know if the key-space carnality impacts on this?
> I'm assuming it is, but the thing is for my case all the messages from the
> sources has a unique ID, that makes my key-space huge and is not on my
> control .
>
> On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal <sj...@gmail.com> wrote:
>
>> So for the smaller size of collection which does not grow with size for
>> certain keys we stored the data in redis and instead of beam join in our
>> DoFn we just did the lookup and got the data we need.
>>
>>
>> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas <ru...@metova.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Thanks for the reply, Any strategy you followed to avoid joins when you
>>> rewrite your pipeline?
>>>
>>>
>>>
>>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal <sj...@gmail.com>
>>> wrote:
>>>
>>>> Yes even we faced the same issue when trying to run a pipeline
>>>> involving join of two collections. It was deployed using AWS KDA, which
>>>> uses flink runner. The source was kinesis streams.
>>>>
>>>> Looks like join operations are not very efficient in terms of size
>>>> management when run on flink.
>>>>
>>>> We had to rewrite our pipeline to avoid these joins.
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>
>>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ru...@metova.com>
>>>> wrote:
>>>>
>>>>> Hello
>>>>>
>>>>> I experimenting an issue with my beam pipeline
>>>>>
>>>>> I have a pipeline in which I split the work into different branches,
>>>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>>>
>>>>> For the Join, I used a Session Window, and discarding the messages
>>>>> after trigger.
>>>>>
>>>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>>>> in Flink console, the  following task has the largest checkpoint
>>>>>
>>>>> join_results/GBK -> ToGBKResult ->
>>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>>>
>>>>>
>>>>> Any Advice ?
>>>>>
>>>>> Thank you very much!
>>>>>
>>>>>

Re: Issue with growing state/checkpoint size

Posted by Ruben Vargas <ru...@metova.com>.
Thanks for the reply and the advice

One more thing, Do you know if the key-space carnality impacts on this? I'm
assuming it is, but the thing is for my case all the messages from the
sources has a unique ID, that makes my key-space huge and is not on my
control .

On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal <sj...@gmail.com> wrote:

> So for the smaller size of collection which does not grow with size for
> certain keys we stored the data in redis and instead of beam join in our
> DoFn we just did the lookup and got the data we need.
>
>
> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas <ru...@metova.com>
> wrote:
>
>> Hello,
>>
>> Thanks for the reply, Any strategy you followed to avoid joins when you
>> rewrite your pipeline?
>>
>>
>>
>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal <sj...@gmail.com> wrote:
>>
>>> Yes even we faced the same issue when trying to run a pipeline involving
>>> join of two collections. It was deployed using AWS KDA, which uses flink
>>> runner. The source was kinesis streams.
>>>
>>> Looks like join operations are not very efficient in terms of size
>>> management when run on flink.
>>>
>>> We had to rewrite our pipeline to avoid these joins.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ru...@metova.com>
>>> wrote:
>>>
>>>> Hello
>>>>
>>>> I experimenting an issue with my beam pipeline
>>>>
>>>> I have a pipeline in which I split the work into different branches,
>>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>>
>>>> For the Join, I used a Session Window, and discarding the messages
>>>> after trigger.
>>>>
>>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>>> in Flink console, the  following task has the largest checkpoint
>>>>
>>>> join_results/GBK -> ToGBKResult ->
>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>>
>>>>
>>>> Any Advice ?
>>>>
>>>> Thank you very much!
>>>>
>>>>

Re: Issue with growing state/checkpoint size

Posted by Sachin Mittal <sj...@gmail.com>.
So for the smaller size of collection which does not grow with size for
certain keys we stored the data in redis and instead of beam join in our
DoFn we just did the lookup and got the data we need.


On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas <ru...@metova.com>
wrote:

> Hello,
>
> Thanks for the reply, Any strategy you followed to avoid joins when you
> rewrite your pipeline?
>
>
>
> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal <sj...@gmail.com> wrote:
>
>> Yes even we faced the same issue when trying to run a pipeline involving
>> join of two collections. It was deployed using AWS KDA, which uses flink
>> runner. The source was kinesis streams.
>>
>> Looks like join operations are not very efficient in terms of size
>> management when run on flink.
>>
>> We had to rewrite our pipeline to avoid these joins.
>>
>> Thanks
>> Sachin
>>
>>
>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ru...@metova.com>
>> wrote:
>>
>>> Hello
>>>
>>> I experimenting an issue with my beam pipeline
>>>
>>> I have a pipeline in which I split the work into different branches,
>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>
>>> For the Join, I used a Session Window, and discarding the messages after
>>> trigger.
>>>
>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>> in Flink console, the  following task has the largest checkpoint
>>>
>>> join_results/GBK -> ToGBKResult ->
>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>
>>>
>>> Any Advice ?
>>>
>>> Thank you very much!
>>>
>>>

Re: Issue with growing state/checkpoint size

Posted by Ruben Vargas <ru...@metova.com>.
Hello,

Thanks for the reply, Any strategy you followed to avoid joins when you
rewrite your pipeline?



On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal <sj...@gmail.com> wrote:

> Yes even we faced the same issue when trying to run a pipeline involving
> join of two collections. It was deployed using AWS KDA, which uses flink
> runner. The source was kinesis streams.
>
> Looks like join operations are not very efficient in terms of size
> management when run on flink.
>
> We had to rewrite our pipeline to avoid these joins.
>
> Thanks
> Sachin
>
>
> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ru...@metova.com>
> wrote:
>
>> Hello
>>
>> I experimenting an issue with my beam pipeline
>>
>> I have a pipeline in which I split the work into different branches, then
>> I do a join using CoGroupByKey, each message has its own unique Key.
>>
>> For the Join, I used a Session Window, and discarding the messages after
>> trigger.
>>
>> I'm using Flink Runner and deployed a KInesis application. But I'm
>> experiencing  an unbounded growth of the checkpoint data size. When I see
>> in Flink console, the  following task has the largest checkpoint
>>
>> join_results/GBK -> ToGBKResult ->
>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>
>>
>> Any Advice ?
>>
>> Thank you very much!
>>
>>

Re: Issue with growing state/checkpoint size

Posted by Sachin Mittal <sj...@gmail.com>.
Yes even we faced the same issue when trying to run a pipeline involving
join of two collections. It was deployed using AWS KDA, which uses flink
runner. The source was kinesis streams.

Looks like join operations are not very efficient in terms of size
management when run on flink.

We had to rewrite our pipeline to avoid these joins.

Thanks
Sachin


On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas <ru...@metova.com>
wrote:

> Hello
>
> I experimenting an issue with my beam pipeline
>
> I have a pipeline in which I split the work into different branches, then
> I do a join using CoGroupByKey, each message has its own unique Key.
>
> For the Join, I used a Session Window, and discarding the messages after
> trigger.
>
> I'm using Flink Runner and deployed a KInesis application. But I'm
> experiencing  an unbounded growth of the checkpoint data size. When I see
> in Flink console, the  following task has the largest checkpoint
>
> join_results/GBK -> ToGBKResult ->
> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>
>
> Any Advice ?
>
> Thank you very much!
>
>