You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anchit Jatana <de...@gmail.com> on 2016/10/21 07:08:08 UTC

Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Hi All,

I have a use case where in I'm supposed to work with Session Windows to
maintain some values for some sessionIDs/keys.

The use case is as follows:

I need to maintain a session window for the incoming data and discard the
window after some set gap/period of inactivity but what I want is that as
soon as new element gets added to the window, all the records that are
currently in the window get processed using the window
transformation/function and the window does not get discarded.

The "Session windows implementation" as get processed only after the window
is consider complete(based on some gap time settings). But I wish to
process the all the elements contained in the window as soon as a new
element gets added to the window(means addition of a new element triggers
the processing of all elements of the window) but the discarding of the
window happens only if there is a gap/inactivity for some set time. And
when the window gets discarded/expires I don't want it to be re-evaluated,
since it's contents were processed when the last element was added to the
window.

Is this implementation possible? If yes, can someone please share some
sample code to explain the implementation.

Thank you!

Regards,
Anchit

Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Anchit,
the timers don't necessarily have to be cleaned up. So you should be good
to go.

Cheers,
Aljoscha

On Fri, 28 Oct 2016 at 23:33 Anchit Jatana <de...@gmail.com>
wrote:

> Hi Aljoscha,
>
> I am using the custom trigger with GlobalWindows window assigner. Do I
> still
> need to override clear method and delete the ProcessingTimeTimer using-
> triggerContext.deleteProcessingTimeTimer(prevTime)?
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9774.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Posted by Anchit Jatana <de...@gmail.com>.
Hi Aljoscha,

I am using the custom trigger with GlobalWindows window assigner. Do I still
need to override clear method and delete the ProcessingTimeTimer using-
triggerContext.deleteProcessingTimeTimer(prevTime)?

Regards,
Anchit



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9774.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Bart,
are you using your custom Trigger together with a merging session window
assigner?

You might want to consider overriding the clear() method in your trigger to
clean up the state that you use. If you don't you might run into memory
leaks because the state is never cleaned up.

Cheers,
Aljoscha

On Sat, 22 Oct 2016 at 07:06 Anchit Jatana <de...@gmail.com>
wrote:

> Hi Bart,
>
> Thank you so much for sharing the approach. Looks like this solved my
> problem. Here's what I have as an implementation for my use-case:
>
> package org.apache.flink.quickstart
>
> import org.apache.flink.api.common.state.{ ReducingState,
> ReducingStateDescriptor, ValueState, ValueStateDescriptor }
> import
> org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
> import org.apache.flink.streaming.api.windowing.time.Time
> import
> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
> import org.apache.flink.streaming.api.windowing.triggers.{ Trigger,
> TriggerResult }
> import org.apache.flink.streaming.api.windowing.windows.Window
> import org.slf4j.LoggerFactory
>
> class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E,
> Window] {
>
>   val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer",
> classOf[Option[Long]], None)
>
>   override def onElement(t: E, l: Long, w: Window, triggerContext:
> TriggerContext): TriggerResult = {
>
>     // remove old timer
>     val time_state: ValueState[Option[Long]] =
> triggerContext.getPartitionedState(timeState)
>     val time_set = time_state.value()
>     if (time_set.isDefined) {
>       triggerContext.deleteProcessingTimeTimer(time_set.get)
>     }
>     // set new time and continue
>     val new_time = triggerContext.getCurrentProcessingTime +
> Time.seconds(sessionPauseHours).toMilliseconds()
>     time_state.update(Some(new_time))
>     triggerContext.registerProcessingTimeTimer(new_time)
>     TriggerResult.FIRE
>   }
>
>   override def onProcessingTime(l: Long, w: Window, triggerContext:
> TriggerContext): TriggerResult = {
>     TriggerResult.PURGE
>   }
>
>   override def onEventTime(l: Long, w: Window, triggerContext:
> TriggerContext): TriggerResult = {
>     TriggerResult.CONTINUE
>   }
> }
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9676.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

RE: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Posted by Anchit Jatana <de...@gmail.com>.
Hi Bart,

Thank you so much for sharing the approach. Looks like this solved my
problem. Here's what I have as an implementation for my use-case:

package org.apache.flink.quickstart

import org.apache.flink.api.common.state.{ ReducingState,
ReducingStateDescriptor, ValueState, ValueStateDescriptor }
import
org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
import org.apache.flink.streaming.api.windowing.time.Time
import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{ Trigger,
TriggerResult }
import org.apache.flink.streaming.api.windowing.windows.Window
import org.slf4j.LoggerFactory

class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E,
Window] {

  val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer",
classOf[Option[Long]], None)

  override def onElement(t: E, l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {

    // remove old timer
    val time_state: ValueState[Option[Long]] =
triggerContext.getPartitionedState(timeState)
    val time_set = time_state.value()
    if (time_set.isDefined) {
      triggerContext.deleteProcessingTimeTimer(time_set.get)
    }
    // set new time and continue
    val new_time = triggerContext.getCurrentProcessingTime +
Time.seconds(sessionPauseHours).toMilliseconds()
    time_state.update(Some(new_time))
    triggerContext.registerProcessingTimeTimer(new_time)
    TriggerResult.FIRE
  }

  override def onProcessingTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
    TriggerResult.PURGE
  }

  override def onEventTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

Regards,
Anchit



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9676.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

RE: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Posted by ba...@kpn.com.
Here is a session trigger that I wrote (not quite the same rules around what a
session is, but should hopefully be a good start to work from).  I'd love to get
any feedback on how it could be improved.

- bart


import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window

import org.slf4j.LoggerFactory

/**
  *
  * @param maxSessionLength maximum number of pages that can be in a single session
  * @param sessionPauseMillis time duration in milliseconds indicating length of quiet after which a session has ended
  */
class sessionTrigger[E](val maxSessionLength: Int, val sessionPauseMillis: Long) extends Trigger[E, Window] {

  val logger = LoggerFactory.getLogger(classOf[sessionTrigger[E]])
  val countState = new ReducingStateDescriptor[Long]("sessionCount", new ScalaReduceFunction[Long](_ + _), classOf[Long])
  val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer", classOf[Option[Long]], None)

  override def onElement(t: E, l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = {
    logger.trace("received: " + t.toString)

    // remove old timer
    val time_state: ValueState[Option[Long]] = triggerContext.getPartitionedState(timeState)
    val time_set = time_state.value()
    if (time_set.isDefined) {
      triggerContext.deleteProcessingTimeTimer(time_set.get)
    }


    // update count and check if over limit
    val state: ReducingState[Long] = triggerContext.getPartitionedState(countState)
    val ct = state.get()
    logger.trace("count: " + ct + " : " + t.toString)
    state.add(1)
    if (ct > maxSessionLength) {
      logger.trace("triggered by count")
      time_state.update(None)
      TriggerResult.FIRE_AND_PURGE
    } else {
      // set new time and continue
      val new_time = triggerContext.getCurrentProcessingTime + 2000
      time_state.update(Some(new_time))
      triggerContext.registerProcessingTimeTimer(new_time)
      TriggerResult.CONTINUE
    }
  }

  override def onProcessingTime(l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = {
    println("proc time trigger")
    TriggerResult.FIRE_AND_PURGE
  }

  override def onEventTime(l: Long, w: Window, triggerContext: TriggerContext): TriggerResult = {
    println("even time trigger")
    TriggerResult.CONTINUE
  }
}



________________________________
Van: Manu Zhang [owenzhang1990@gmail.com]
Verzonden: vrijdag 21 oktober 2016 10:52
Aan: user@flink.apache.org
Onderwerp: Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Hi Anchit,

I think you need a customized EventTimeTrigger<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java#L43> which returns "TriggerResult.FIRE" both on new element and watermark.

Thanks,
Manu Zhang

On Fri, Oct 21, 2016 at 3:08 PM Anchit Jatana <de...@gmail.com>> wrote:
Hi All,

I have a use case where in I'm supposed to work with Session Windows to maintain some values for some sessionIDs/keys.

The use case is as follows:

I need to maintain a session window for the incoming data and discard the window after some set gap/period of inactivity but what I want is that as soon as new element gets added to the window, all the records that are currently in the window get processed using the window transformation/function and the window does not get discarded.

The "Session windows implementation" as get processed only after the window is consider complete(based on some gap time settings). But I wish to process the all the elements contained in the window as soon as a new element gets added to the window(means addition of a new element triggers the processing of all elements of the window) but the discarding of the window happens only if there is a gap/inactivity for some set time. And when the window gets discarded/expires I don't want it to be re-evaluated, since it's contents were processed when the last element was added to the window.

Is this implementation possible? If yes, can someone please share some sample code to explain the implementation.

Thank you!

Regards,
Anchit

Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

Posted by Manu Zhang <ow...@gmail.com>.
Hi Anchit,

I think you need a customized EventTimeTrigger
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java#L43>
which
returns "TriggerResult.FIRE" both on new element and watermark.

Thanks,
Manu Zhang

On Fri, Oct 21, 2016 at 3:08 PM Anchit Jatana <de...@gmail.com>
wrote:

> Hi All,
>
> I have a use case where in I'm supposed to work with Session Windows to
> maintain some values for some sessionIDs/keys.
>
> The use case is as follows:
>
> I need to maintain a session window for the incoming data and discard the
> window after some set gap/period of inactivity but what I want is that as
> soon as new element gets added to the window, all the records that are
> currently in the window get processed using the window
> transformation/function and the window does not get discarded.
>
> The "Session windows implementation" as get processed only after the
> window is consider complete(based on some gap time settings). But I wish to
> process the all the elements contained in the window as soon as a new
> element gets added to the window(means addition of a new element triggers
> the processing of all elements of the window) but the discarding of the
> window happens only if there is a gap/inactivity for some set time. And
> when the window gets discarded/expires I don't want it to be re-evaluated,
> since it's contents were processed when the last element was added to the
> window.
>
> Is this implementation possible? If yes, can someone please share some
> sample code to explain the implementation.
>
> Thank you!
>
> Regards,
> Anchit
>