You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Gökhan Imral <gi...@icloud.com> on 2020/09/02 08:24:37 UTC

Clearing states and timers in a Stateful Fn with Global Windows

Hi,

We have implemented a unbounded stream join using stateful DoFn and a global window by storing the elements when necessary in the state and using timers to clear the state when it should expire.  But after deploying our job we have discovered that the state size is growing all the time.While experimenting to find out the cause, I have discovered that the timers are not getting garbage collected.

This is the sample application I used for testing.

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
        Pipeline p = Pipeline.create(options);
        p.apply(GenerateSequence.from(0).withRate(10000, Duration.standardSeconds(1)))
                .apply(Filter.by((val) -> val < 1000000 || val % 500 == 0))
                .apply(WithKeys.of((val) -> val)).setCoder(KvCoder.of(VarLongCoder.of(),VarLongCoder.of()))
                .apply("Window", Window.<KV<Long, Long>>into(new GlobalWindows())
                        .triggering(AfterPane.elementCountAtLeast(1))
                        .discardingFiredPanes()
                        .withAllowedLateness(Duration.ZERO))
                .apply("State", ParDo.of(new DoFn<KV<Long, Long>, KV<Long, Long>>() {
                                             @StateId("state")
                                             private final StateSpec<ValueState<Long>> stateSpec = StateSpecs.value();
                                             @TimerId("gcTimer")
                                             private final TimerSpec gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

                                             @ProcessElement
                                             public void process(
                                                     ProcessContext c,
                                                     @Timestamp Instant ts,
                                                     @StateId("state") ValueState<Long> state,
                                                     @TimerId("gcTimer") Timer gcTimer
                                             ) {
                                                 state.write(c.element().getValue());
                                                 Instant expirationTime = new Instant(ts.getMillis()).plus(Duration.standardSeconds(5));
                                                 gcTimer.set(expirationTime);
                                             }

                                             @OnTimer("gcTimer")
                                             public void onGC(@StateId("state") ValueState<Long> state ) {
                                                 state.clear();
                                             }
                                         }

                ));

        p.run().waitUntilFinish();

After running the application a couple of minutes with FlinkRunner heap memory increases periodically. When I checked the heap dump I can see that there are many timers that is not being collected by GC. 
When I test it with fixed or sliding windows I can see the timers cleared at the end of the window. Is this the expected behavior because Global Windows never closes? Is there a way to clear the timers in global windows or should we try to use sliding windows with deduplications? Session windows can also work for our scenario but stateful fn do not support it.  

This was my first message here sorry for any mistakes.

Thank you all very much
Gokhan Imral


Re: Clearing states and timers in a Stateful Fn with Global Windows

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for reporting Gökhan! Please keep us updated. We'll likely merge 
the patch by the end of the week.

-Max

On 03.09.20 08:40, Gökhan Imral wrote:
> Thanks for the quick response. I tried with a fix applied build and can 
> see that memory is much more stable.
> 
> Gokhan
> 
>> On 2 Sep 2020, at 12:51 PM, Jan Lukavský <je.ik@seznam.cz 
>> <ma...@seznam.cz>> wrote:
>>
>> Hi Gokhan,
>>
>> this is related to [1], which is just going to be fixed.
>>
>> Jan
>>
>> [1] https://github.com/apache/beam/pull/12733
>>
>> On 9/2/20 10:24 AM, Gökhan Imral wrote:
>>> Hi,
>>>
>>> We have implemented a unbounded stream join using stateful DoFn and a 
>>> global window by storing the elements when necessary in the state and 
>>> using timers to clear the state when it should expire.  But after 
>>> deploying our job we have discovered that the state size is growing 
>>> all the time.While experimenting to find out the cause, I have 
>>> discovered that the timers are not getting garbage collected.
>>>
>>> This is the sample application I used for testing.
>>>
>>>         PipelineOptions options = 
>>> PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
>>>         Pipeline p = Pipeline.create(options);
>>> p.apply(GenerateSequence.from(0).withRate(10000, 
>>> Duration.standardSeconds(1)))
>>>                 .apply(Filter.by((val) -> val < 1000000 || val % 500 
>>> == 0))
>>>                 .apply(WithKeys.of((val) -> 
>>> val)).setCoder(KvCoder.of(VarLongCoder.of(),VarLongCoder.of()))
>>>                 .apply("Window", Window.<KV<Long, Long>>into(new 
>>> GlobalWindows())
>>> .triggering(AfterPane.elementCountAtLeast(1))
>>>                         .discardingFiredPanes()
>>> .withAllowedLateness(Duration.ZERO))
>>>                 .apply("State", ParDo.of(new DoFn<KV<Long, Long>, 
>>> KV<Long, Long>>() {
>>>  @StateId("state")
>>>                                              private final 
>>> StateSpec<ValueState<Long>> stateSpec = StateSpecs.value();
>>>  @TimerId("gcTimer")
>>>                                              private final TimerSpec 
>>> gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>>  @ProcessElement
>>>                                              public void process(
>>>  ProcessContext c,
>>>  @Timestamp Instant ts,
>>>  @StateId("state") ValueState<Long> state,
>>>  @TimerId("gcTimer") Timer gcTimer
>>>                                              ) {
>>>  state.write(c.element().getValue());
>>>  Instant expirationTime = new 
>>> Instant(ts.getMillis()).plus(Duration.standardSeconds(5));
>>>  gcTimer.set(expirationTime);
>>>                                              }
>>>
>>>  @OnTimer("gcTimer")
>>>                                              public void 
>>> onGC(@StateId("state") ValueState<Long> state ) {
>>>  state.clear();
>>>                                              }
>>>                                          }
>>>
>>>                 ));
>>>
>>>         p.run().waitUntilFinish();
>>>
>>> After running the application a couple of minutes with FlinkRunner 
>>> heap memory increases periodically. When I checked the heap dump I 
>>> can see that there are many timers that is not being collected by GC.
>>> When I test it with fixed or sliding windows I can see the timers 
>>> cleared at the end of the window. Is this the expected behavior 
>>> because Global Windows never closes? Is there a way to clear the 
>>> timers in global windows or should we try to use sliding windows with 
>>> deduplications? Session windows can also work for our scenario but 
>>> stateful fn do not support it.
>>>
>>> This was my first message here sorry for any mistakes.
>>>
>>> Thank you all very much
>>> Gokhan Imral
>>>
>>> <Screen Shot 2020-09-02 at 11.36.15 AM.png><Screen Shot 2020-09-02 at 
>>> 12.21.22 PM.png>
> 

Re: Clearing states and timers in a Stateful Fn with Global Windows

Posted by Gökhan Imral <gi...@icloud.com>.
Thanks for the quick response. I tried with a fix applied build and can see that memory is much more stable.

Gokhan

> On 2 Sep 2020, at 12:51 PM, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Hi Gokhan,
> 
> this is related to [1], which is just going to be fixed.
> 
> Jan
> 
> [1] https://github.com/apache/beam/pull/12733 <https://github.com/apache/beam/pull/12733>
> On 9/2/20 10:24 AM, Gökhan Imral wrote:
>> Hi,
>> 
>> We have implemented a unbounded stream join using stateful DoFn and a global window by storing the elements when necessary in the state and using timers to clear the state when it should expire.  But after deploying our job we have discovered that the state size is growing all the time.While experimenting to find out the cause, I have discovered that the timers are not getting garbage collected.
>> 
>> This is the sample application I used for testing.
>> 
>>         PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
>>         Pipeline p = Pipeline.create(options);
>>         p.apply(GenerateSequence.from(0).withRate(10000, Duration.standardSeconds(1)))
>>                 .apply(Filter.by((val) -> val < 1000000 || val % 500 == 0))
>>                 .apply(WithKeys.of((val) -> val)).setCoder(KvCoder.of(VarLongCoder.of(),VarLongCoder.of()))
>>                 .apply("Window", Window.<KV<Long, Long>>into(new GlobalWindows())
>>                         .triggering(AfterPane.elementCountAtLeast(1))
>>                         .discardingFiredPanes()
>>                         .withAllowedLateness(Duration.ZERO))
>>                 .apply("State", ParDo.of(new DoFn<KV<Long, Long>, KV<Long, Long>>() {
>>                                              @StateId("state")
>>                                              private final StateSpec<ValueState<Long>> stateSpec = StateSpecs.value();
>>                                              @TimerId("gcTimer")
>>                                              private final TimerSpec gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>> 
>>                                              @ProcessElement
>>                                              public void process(
>>                                                      ProcessContext c,
>>                                                      @Timestamp Instant ts,
>>                                                      @StateId("state") ValueState<Long> state,
>>                                                      @TimerId("gcTimer") Timer gcTimer
>>                                              ) {
>>                                                  state.write(c.element().getValue());
>>                                                  Instant expirationTime = new Instant(ts.getMillis()).plus(Duration.standardSeconds(5));
>>                                                  gcTimer.set(expirationTime);
>>                                              }
>> 
>>                                              @OnTimer("gcTimer")
>>                                              public void onGC(@StateId("state") ValueState<Long> state ) {
>>                                                  state.clear();
>>                                              }
>>                                          }
>> 
>>                 ));
>> 
>>         p.run().waitUntilFinish();
>> 
>> After running the application a couple of minutes with FlinkRunner heap memory increases periodically. When I checked the heap dump I can see that there are many timers that is not being collected by GC. 
>> When I test it with fixed or sliding windows I can see the timers cleared at the end of the window. Is this the expected behavior because Global Windows never closes? Is there a way to clear the timers in global windows or should we try to use sliding windows with deduplications? Session windows can also work for our scenario but stateful fn do not support it.  
>> 
>> This was my first message here sorry for any mistakes.
>> 
>> Thank you all very much
>> Gokhan Imral
>> 
>> <Screen Shot 2020-09-02 at 11.36.15 AM.png><Screen Shot 2020-09-02 at 12.21.22 PM.png>


Re: Clearing states and timers in a Stateful Fn with Global Windows

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Gokhan,

this is related to [1], which is just going to be fixed.

Jan

[1] https://github.com/apache/beam/pull/12733

On 9/2/20 10:24 AM, Gökhan Imral wrote:
> Hi,
>
> We have implemented a unbounded stream join using stateful DoFn and a 
> global window by storing the elements when necessary in the state and 
> using timers to clear the state when it should expire.  But after 
> deploying our job we have discovered that the state size is growing 
> all the time.While experimenting to find out the cause, I have 
> discovered that the timers are not getting garbage collected.
>
> This is the sample application I used for testing.
>
>         PipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
>         Pipeline p = Pipeline.create(options);
> p.apply(GenerateSequence.from(0).withRate(10000, 
> Duration.standardSeconds(1)))
>                 .apply(Filter.by((val) -> val < 1000000 || val % 500 
> == 0))
>                 .apply(WithKeys.of((val) -> 
> val)).setCoder(KvCoder.of(VarLongCoder.of(),VarLongCoder.of()))
>                 .apply("Window", Window.<KV<Long, Long>>into(new 
> GlobalWindows())
> .triggering(AfterPane.elementCountAtLeast(1))
>                         .discardingFiredPanes()
> .withAllowedLateness(Duration.ZERO))
>                 .apply("State", ParDo.of(new DoFn<KV<Long, Long>, 
> KV<Long, Long>>() {
>  @StateId("state")
>                                              private final 
> StateSpec<ValueState<Long>> stateSpec = StateSpecs.value();
>  @TimerId("gcTimer")
>                                              private final TimerSpec 
> gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>  @ProcessElement
>                                              public void process(
>  ProcessContext c,
>  @Timestamp Instant ts,
>  @StateId("state") ValueState<Long> state,
>  @TimerId("gcTimer") Timer gcTimer
>                                              ) {
>  state.write(c.element().getValue());
>  Instant expirationTime = new 
> Instant(ts.getMillis()).plus(Duration.standardSeconds(5));
>  gcTimer.set(expirationTime);
>                                              }
>
>  @OnTimer("gcTimer")
>                                              public void 
> onGC(@StateId("state") ValueState<Long> state ) {
>  state.clear();
>                                              }
>                                          }
>
>                 ));
>
>         p.run().waitUntilFinish();
>
> After running the application a couple of minutes with FlinkRunner 
> heap memory increases periodically. When I checked the heap dump I can 
> see that there are many timers that is not being collected by GC.
> When I test it with fixed or sliding windows I can see the timers 
> cleared at the end of the window. Is this the expected behavior 
> because Global Windows never closes? Is there a way to clear the 
> timers in global windows or should we try to use sliding windows with 
> deduplications? Session windows can also work for our scenario but 
> stateful fn do not support it.
>
> This was my first message here sorry for any mistakes.
>
> Thank you all very much
> Gokhan Imral
>