You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Konstantin Knauf <ko...@tngtech.com> on 2016/03/24 11:30:13 UTC

Memory Leak using ProcessingTimeTimers?

Hi everyone,

we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
KeyedStream with custom Trigger.

On each element the trigger function registers a processing time timer
and deletes the currently registered processing time timer. So we are
registering a lot of timers, but also deleting most of them right away.

The desired functionality is, that the window is purged (and all state
is set to null) after a timeout (last event for this key + timeout).

The performance tests showed, that after a short time (5mins or so) all
the time went to garbage collection. From the heap dumnps, we can tell
that the problem were retained TriggerTasks (with reference to the
TriggerContext) off all the registered processing time timers.

The problems seems to be that when deleting the TriggerTasks the
corresponding Callables are not removed form the queue, the
deleteProcessingTimeTimer-method only removes the Timer from the
set/queues of the TriggerContext itself, but not from the RuntimeContext.

Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
wrong way? If so, is there any other way to achieve the desired
functionality?

We have a workaround in place now (basically just a timeout starting
with the first element in window instead of the last element in the window).

Cheers,

Konstantin

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Re: Memory Leak using ProcessingTimeTimers?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
by the way, form looking at your email I gather that you want to do some
kind of session windowing. Is that correct? I have a pull request that
should make it into the next version that adds proper support for session
windows. Right now this is only implemented for event-time, since this is
the hard part. But support for processing-time will be trivial to add.

The PR is here: https://github.com/apache/flink/pull/1802

Cheers,
Aljoscha

On Wed, 30 Mar 2016 at 09:51 Konstantin Knauf <ko...@tngtech.com>
wrote:

> Hi Aljoscha,
>
> thanks for looking into it. I have moved the discussion to the issue.
>
> Cheers,
>
> Konstantin
>
> On 27.03.2016 09:35, Aljoscha Krettek wrote:
> > Hi,
> > you are right, this is a problem. In an earlier version we were only
> > setting very few actual timers using the RuntimeContext because a firing
> > timer will trigger all the timers with a lower timestamp that we have
> > stored in the trigger queue. We have to change the lower level trigger
> > service (in StreamTask) to only store one timer per very short time
> > window, so that if the window operator registers thousands of timers
> > for, say, time 15:30:03 it actually only saves one timer.
> >
> > I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf
> > <konstantin.knauf@tngtech.com <ma...@tngtech.com>>
> wrote:
> >
> >     Hi everyone,
> >
> >     we were testing a Flink streaming job (1.0.0), with a GlobalWindow
> on a
> >     KeyedStream with custom Trigger.
> >
> >     On each element the trigger function registers a processing time
> timer
> >     and deletes the currently registered processing time timer. So we are
> >     registering a lot of timers, but also deleting most of them right
> away.
> >
> >     The desired functionality is, that the window is purged (and all
> state
> >     is set to null) after a timeout (last event for this key + timeout).
> >
> >     The performance tests showed, that after a short time (5mins or so)
> all
> >     the time went to garbage collection. From the heap dumnps, we can
> tell
> >     that the problem were retained TriggerTasks (with reference to the
> >     TriggerContext) off all the registered processing time timers.
> >
> >     The problems seems to be that when deleting the TriggerTasks the
> >     corresponding Callables are not removed form the queue, the
> >     deleteProcessingTimeTimer-method only removes the Timer from the
> >     set/queues of the TriggerContext itself, but not from the
> >     RuntimeContext.
> >
> >     Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
> >     wrong way? If so, is there any other way to achieve the desired
> >     functionality?
> >
> >     We have a workaround in place now (basically just a timeout starting
> >     with the first element in window instead of the last element in the
> >     window).
> >
> >     Cheers,
> >
> >     Konstantin
> >
> >     --
> >     Konstantin Knauf * konstantin.knauf@tngtech.com
> >     <ma...@tngtech.com> * +49-174-3413182
> >     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
>
> --
> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

Re: Memory Leak using ProcessingTimeTimers?

Posted by Konstantin Knauf <ko...@tngtech.com>.
Hi Aljoscha,

thanks for looking into it. I have moved the discussion to the issue.

Cheers,

Konstantin

On 27.03.2016 09:35, Aljoscha Krettek wrote:
> Hi,
> you are right, this is a problem. In an earlier version we were only
> setting very few actual timers using the RuntimeContext because a firing
> timer will trigger all the timers with a lower timestamp that we have
> stored in the trigger queue. We have to change the lower level trigger
> service (in StreamTask) to only store one timer per very short time
> window, so that if the window operator registers thousands of timers
> for, say, time 15:30:03 it actually only saves one timer.
> 
> I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669
> 
> Cheers,
> Aljoscha
> 
> On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf
> <konstantin.knauf@tngtech.com <ma...@tngtech.com>> wrote:
> 
>     Hi everyone,
> 
>     we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
>     KeyedStream with custom Trigger.
> 
>     On each element the trigger function registers a processing time timer
>     and deletes the currently registered processing time timer. So we are
>     registering a lot of timers, but also deleting most of them right away.
> 
>     The desired functionality is, that the window is purged (and all state
>     is set to null) after a timeout (last event for this key + timeout).
> 
>     The performance tests showed, that after a short time (5mins or so) all
>     the time went to garbage collection. From the heap dumnps, we can tell
>     that the problem were retained TriggerTasks (with reference to the
>     TriggerContext) off all the registered processing time timers.
> 
>     The problems seems to be that when deleting the TriggerTasks the
>     corresponding Callables are not removed form the queue, the
>     deleteProcessingTimeTimer-method only removes the Timer from the
>     set/queues of the TriggerContext itself, but not from the
>     RuntimeContext.
> 
>     Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
>     wrong way? If so, is there any other way to achieve the desired
>     functionality?
> 
>     We have a workaround in place now (basically just a timeout starting
>     with the first element in window instead of the last element in the
>     window).
> 
>     Cheers,
> 
>     Konstantin
> 
>     --
>     Konstantin Knauf * konstantin.knauf@tngtech.com
>     <ma...@tngtech.com> * +49-174-3413182
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Re: Memory Leak using ProcessingTimeTimers?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you are right, this is a problem. In an earlier version we were only
setting very few actual timers using the RuntimeContext because a firing
timer will trigger all the timers with a lower timestamp that we have
stored in the trigger queue. We have to change the lower level trigger
service (in StreamTask) to only store one timer per very short time window,
so that if the window operator registers thousands of timers for, say, time
15:30:03 it actually only saves one timer.

I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669

Cheers,
Aljoscha

On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf <ko...@tngtech.com>
wrote:

> Hi everyone,
>
> we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
> KeyedStream with custom Trigger.
>
> On each element the trigger function registers a processing time timer
> and deletes the currently registered processing time timer. So we are
> registering a lot of timers, but also deleting most of them right away.
>
> The desired functionality is, that the window is purged (and all state
> is set to null) after a timeout (last event for this key + timeout).
>
> The performance tests showed, that after a short time (5mins or so) all
> the time went to garbage collection. From the heap dumnps, we can tell
> that the problem were retained TriggerTasks (with reference to the
> TriggerContext) off all the registered processing time timers.
>
> The problems seems to be that when deleting the TriggerTasks the
> corresponding Callables are not removed form the queue, the
> deleteProcessingTimeTimer-method only removes the Timer from the
> set/queues of the TriggerContext itself, but not from the RuntimeContext.
>
> Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
> wrong way? If so, is there any other way to achieve the desired
> functionality?
>
> We have a workaround in place now (basically just a timeout starting
> with the first element in window instead of the last element in the
> window).
>
> Cheers,
>
> Konstantin
>
> --
> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>