You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by gerardg <ge...@talaia.io> on 2017/09/15 09:39:24 UTC

Clean GlobalWidnow state

Hi,

I have the following operator:

mainStream
      .coGroup(coStream)
      .where(_.uuid).equalTo(_.uuid)
      .window(GlobalWindows.create())
      .trigger(triggerWhenAllReceived)
      .apply(mergeElements)

TLDR; It seems that the checkpointed state of the operator keeps growing
forever even if I clear the state and purge the buffered elements using a
processing time trigger.

Details:

Basically I have a main stream that gets elements from another stream and
when it has received all the elements that have been waiting for it outputs
a new element that has been created using the information of all the
received elements.

To do so I use a GlobalWindow and a custom trigger. The custom trigger has
as state two counters, the elements that it has to receive (extracted from
the element received from the main stream) and the elements that it has
received so far from the other stream. When the two counters have the same
value I use the FIRE_AND_PURGE trigger to output all the elements in the
pane (I understand that each set of elements is stored in a pane defined by
the global window and the UUID key). 

To cleanup the state (and to not keep elements waiting forever) I setup a
processing time timer which basically clears the state and outputs
FIRE_AND_PURGE to remove the buffered elements.

I must be missing something because the checkpointed state keeps growing
forever so I suspect that the pane is not completely removed.

Gerard




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for creating the JIRA issue!

Best, Fabian

2017-09-20 12:26 GMT+02:00 gerardg <ge...@talaia.io>:

> I have prepared a repo that reproduces the issue:
> https://github.com/GerardGarcia/flink-global-window-growing-state
>
> Maybe this way it is easier to spot the error or we can determine if it is
> a
> bug.
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Clean GlobalWidnow state

Posted by gerardg <ge...@talaia.io>.
I have prepared a repo that reproduces the issue: 
https://github.com/GerardGarcia/flink-global-window-growing-state

Maybe this way it is easier to spot the error or we can determine if it is a
bug.

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

Posted by gerardg <ge...@talaia.io>.
The UUIDs are assigned. 

As far as I can see (inspecting the metrics and how the task behaves) the
mergeElements apply function receives all the elements (the main element and
the other elements that it expects) so it seems that the correlation is
correct. Also, nothing indicates that there are elements lost inside the
window (everything that enters goes out).

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Are the UUIDs randomly generated when calling .uuid or are they assigned and then .uuid will return the same UUID when calling multiple times? The latter would be problematic because we would not correctly assign state.

Best,
Aljoscha
> On 19. Sep 2017, at 11:41, Fabian Hueske <fh...@gmail.com> wrote:
> 
> If this would be the case, that would be a bug in Flink.
> As I said before, your implementation looked good to me.
> All state of window and trigger should be wiped if the trigger returns FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented. 
> 
> I'll CC Aljoscha again for his opinion. 
> We might need to file a JIRA for the issue.
> 
> Thanks, 
> Fabian
> 
> 2017-09-19 11:32 GMT+02:00 gerardg <gerard@talaia.io <ma...@talaia.io>>:
> Thanks Fabian, I'll take a look to these improvements.
> 
> I was wondering if the increasing state size could be due to that the UUID
> used in the keyBy are randomly generated. Maybe even if I correctly delete
> all the state related to a given key there is still some metadata related to
> the key wandering around.
> 
> Gerard
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 


Re: Clean GlobalWidnow state

Posted by Fabian Hueske <fh...@gmail.com>.
If this would be the case, that would be a bug in Flink.
As I said before, your implementation looked good to me.
All state of window and trigger should be wiped if the trigger returns
FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented.

I'll CC Aljoscha again for his opinion.
We might need to file a JIRA for the issue.

Thanks,
Fabian

2017-09-19 11:32 GMT+02:00 gerardg <ge...@talaia.io>:

> Thanks Fabian, I'll take a look to these improvements.
>
> I was wondering if the increasing state size could be due to that the UUID
> used in the keyBy are randomly generated. Maybe even if I correctly delete
> all the state related to a given key there is still some metadata related
> to
> the key wandering around.
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Clean GlobalWidnow state

Posted by gerardg <ge...@talaia.io>.
Thanks Fabian, I'll take a look to these improvements.

I was wondering if the increasing state size could be due to that the UUID
used in the keyBy are randomly generated. Maybe even if I correctly delete
all the state related to a given key there is still some metadata related to
the key wandering around. 

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Gerard,

I had a look at your Trigger implementation but did not spot something
suspicious that would cause the state size to grow.
However, I notices a few things that can be improved:

- use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to
make the Trigger easier to test (there some test harnesses that can set the
processing time manually)
- timers are not overwritten, so each timeout timer will yield a callback
to onProcessingTime(). It is not possible to delete timers (so you cannot
prevent the onProcessingTime() method to be called multiple times), but you
can save the most recent timer timestamps as ValueState[Long] and compare
against the state to only act on the last timer call.
- You can get the state objects just once and apply multiple operations on
the state object, i.e.,

var elementsToReceive = ctx.getPartitionedState(elementsToReceiveDesc)
var elementsReceived = ctx.getPartitionedState(elementsReceivedDesc)

elementsToReceive.update(x)
val cnt: Int = elementsToReceive.get()
...

Maybe Aljoscha can check the code as well and see if he finds the reason
why the state grows.

Best, Fabian

2017-09-18 15:27 GMT+02:00 gerardg <ge...@talaia.io>:

> I may be able to better know what is happening if I could get what is being
> stored in the state. Is there any way to read the RocksDB db state?
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Clean GlobalWidnow state

Posted by gerardg <ge...@talaia.io>.
I may be able to better know what is happening if I could get what is being
stored in the state. Is there any way to read the RocksDB db state?

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

Posted by gerardg <ge...@talaia.io>.
I'm using nabble and seems that it has removed the code between raw tags.
Here it is again:

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{ReducingStateDescriptor,
ValueStateDescriptor}
import
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger,
TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window

object TriggerMerge {
  @SerialVersionUID(1L)
  private class Sum extends ReduceFunction[Long] {
    @throws[Exception]
    override def reduce(value1: Long, value2: Long): Long = value1 + value2
  }
}

class TriggerMerge(val timeout: Long) extends
Trigger[TaggedUnion[MainElement, OtherElement], Window] 
 with LazyLogging {

  val elementsToReceiveDesc = new
ValueStateDescriptor[Int]("elements-to-receive", classOf[Int])
  val elementsReceivedDesc = new
ReducingStateDescriptor[Long]("elements-received", new TriggerMerge.Sum,
classOf[Long])

  override def onElement(element: TaggedUnion[MainElement, OtherElement],
timestamp: Long, window: Window, ctx: TriggerContext): TriggerResult = {
    var elementsToReceive =
Option(ctx.getPartitionedState(elementsToReceiveDesc).value())
    var elementsReceived =
Option(ctx.getPartitionedState(elementsReceivedDesc).get())

    // Update counters
    if (element.getOne != null) {
      elementsToReceive match {
        case Some(_) => logger.error("Received two main elements with the
same UUID.")
        case _ =>
ctx.getPartitionedState(elementsToReceiveDesc).update(element.getOne.elementsToReceive)
      }
    }
    if (element.getTwo != null) {
      ctx.getPartitionedState(elementsReceivedDesc).add(1)
    }

    // Update deadline timeout
    val newDeadline = System.currentTimeMillis + timeout
    ctx.registerProcessingTimeTimer(newDeadline)

    // Get updated values
    elementsToReceive =
Option(ctx.getPartitionedState(elementsToReceiveDesc).value())
    elementsReceived =
Option(ctx.getPartitionedState(elementsReceivedDesc).get())

    // Check if everything is going as it should
    if (elementsToReceive.nonEmpty && elementsReceived.nonEmpty &&
      elementsToReceive.get == elementsReceived.get) {
      TriggerResult.FIRE_AND_PURGE
    } else {
      TriggerResult.CONTINUE
    }
  }

  override def clear(window: Window, ctx: TriggerContext): Unit = {
    // Cleanup state
    ctx.getPartitionedState(elementsToReceiveDesc).clear()
    ctx.getPartitionedState(elementsReceivedDesc).clear()
  }

  override def onProcessingTime(time: Long, window: Window, ctx:
TriggerContext): TriggerResult =  {
    this.clear(window, ctx)
    TriggerResult.FIRE_AND_PURGE
  }

  override def onEventTime(time: Long, window: Window, ctx: TriggerContext):
TriggerResult = TriggerResult.CONTINUE
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

Posted by Aljoscha Krettek <al...@apache.org>.
Sure, but how does the Trigger actually work?

> On 15. Sep 2017, at 12:20, gerardg <ge...@talaia.io> wrote:
> 
> Sure:
> 
> 
> 
> The application is configured to use processing time.
> 
> Thanks,
> 
> Gerard
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Clean GlobalWidnow state

Posted by gerardg <ge...@talaia.io>.
Sure:



The application is configured to use processing time.

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Could you maybe show the code of your trigger?

Best,
Aljoscha

> On 15. Sep 2017, at 11:39, gerardg <ge...@talaia.io> wrote:
> 
> Hi,
> 
> I have the following operator:
> 
> mainStream
>      .coGroup(coStream)
>      .where(_.uuid).equalTo(_.uuid)
>      .window(GlobalWindows.create())
>      .trigger(triggerWhenAllReceived)
>      .apply(mergeElements)
> 
> TLDR; It seems that the checkpointed state of the operator keeps growing
> forever even if I clear the state and purge the buffered elements using a
> processing time trigger.
> 
> Details:
> 
> Basically I have a main stream that gets elements from another stream and
> when it has received all the elements that have been waiting for it outputs
> a new element that has been created using the information of all the
> received elements.
> 
> To do so I use a GlobalWindow and a custom trigger. The custom trigger has
> as state two counters, the elements that it has to receive (extracted from
> the element received from the main stream) and the elements that it has
> received so far from the other stream. When the two counters have the same
> value I use the FIRE_AND_PURGE trigger to output all the elements in the
> pane (I understand that each set of elements is stored in a pane defined by
> the global window and the UUID key). 
> 
> To cleanup the state (and to not keep elements waiting forever) I setup a
> processing time timer which basically clears the state and outputs
> FIRE_AND_PURGE to remove the buffered elements.
> 
> I must be missing something because the checkpointed state keeps growing
> forever so I suspect that the pane is not completely removed.
> 
> Gerard
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/