You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Marcin Kuthan <ma...@gmail.com> on 2022/01/28 06:57:06 UTC

Late event is not dropped due to lateness for session window in direct runner - bug or feature?

Hi

I observed that on Direct Runner when watermark is advanced programatically
(in tests) late event is not discarded for session window.

The aggregation:

type User = String
type Action = String

def activitiesInSessionWindow(
      userActions: SCollection[(User, Action)],
      gapDuration: Duration,
      allowedLateness: Duration = Duration.ZERO,
      accumulationMode: AccumulationMode =
AccumulationMode.DISCARDING_FIRED_PANES
): SCollection[(User, Iterable[Action])] = {
    val windowOptions = WindowOptions(
      allowedLateness = allowedLateness,
      accumulationMode = accumulationMode
    )

    userActions
      .withSessionWindows(gapDuration, windowOptions)
      .groupByKey
}

Test scenario (Scio with some additional syntactic sugar):

"Late event" should "not close the gap and two sessions are materialized"
in runWithContext { sc =>
  val userActions = testStreamOf[(User, Action)]

    .addElementsAt("00:00:00", ("jack", "open app"))

    .addElementsAt("00:01:00", ("jack", "search product"))

    .addElementsAt("00:01:30", ("jack", "open product"))

    .addElementsAt("00:03:00", ("jack", "add to cart"))

    .advanceWatermarkTo("00:13:00")

    .addElementsAt("00:09:30", ("jack", "checkout"))

    .addElementsAt("00:13:10", ("jack", "close app"))

    .advanceWatermarkToInfinity()



  val results = activitiesInSessionWindow(sc.testStream(userActions),
DefaultGapDuration)


  results.withTimestamp should inOnTimePane("00:00:00", "00:13:00") {

    containSingleValueAtWindowTime("00:13:00", ("jack", Iterable("open
app", "search product", "open product", "add to cart")))
  }



  // Why session window starts at 00:09:30 if watermark has been advanced
to 00:13:00? I would expect dropped "checkout" due to lateness.
  results.withTimestamp should inOnTimePane("00:09:30", "00:23:10") {

    containSingleValueAtWindowTime("00:23:10", ("jack",
Iterable("checkout", "close app")))

  }

}


Look at "checkout" event with event-time 00:09:30 emitted when watermark
has been already advanced to 00:13:00.
For me that event should be dropped due to lateness (exaclty like it is
done for the fixed window), but for some reason for sesion window the event
is counted for the second session window.

You can find the full source code at:
https://github.com/mkuthan/example-streaming/blob/beamsessions/src/test/scala/org/mkuthan/examples/streaming/usersessions/BeamUserSessionsTest.scala#L98

What do you think, bug or feature?

Thanks in advance,
Marcin

Re: Late event is not dropped due to lateness for session window in direct runner - bug or feature?

Posted by Marcin Kuthan <ma...@gmail.com>.
Haha, managed to explain the question by myself and one of my colegues.
Watermark triggers at the end of window, so fo the session window it is
event time + gap duration.

Look at the example below, when the watermark is advanced to 00:13:00 late
event at 00:03:00 is dropped due to latenes but event at 00:09:30 is
counted for the second session window.
For watermark X and session gap T, the event is considered late only if
"event time" < X - T.
For fixed window it is much more straightforward, all events with event
time behind the watermark are late.

"Late event" should "not close the gap and two sessions are materialized"
in runWithContext { sc =>
  val userActions = testStreamOf[(User, Action)]

    .addElementsAt("00:00:00", ("jack", "open app"))

    .addElementsAt("00:01:00", ("jack", "search product"))

    .addElementsAt("00:01:30", ("jack", "open product"))

    .advanceWatermarkTo("00:13:00")

    .addElementsAt("00:03:00", ("jack", "add to cart")) // dropped due to
lateness
    .addElementsAt("00:09:30", ("jack", "checkout"))

    .addElementsAt("00:13:10", ("jack", "close app"))

    .advanceWatermarkToInfinity()



  val results = activitiesInSessionWindow(sc.testStream(userActions),
DefaultGapDuration)


  results.withTimestamp should inOnTimePane("00:00:00", "00:11:30") {

    containSingleValueAtWindowTime("00:11:30", ("jack", Iterable("open
app", "search product", "open product")))
  }



  results.withTimestamp should inOnTimePane("00:09:30", "00:23:10") {

    containSingleValueAtWindowTime("00:23:10", ("jack",
Iterable("checkout", "close app")))
  }

}

Best regards,
Marcin


On Fri, 28 Jan 2022 at 07:57, Marcin Kuthan <ma...@gmail.com> wrote:

> Hi
>
> I observed that on Direct Runner when watermark is advanced
> programatically (in tests) late event is not discarded for session window.
>
> The aggregation:
>
> type User = String
> type Action = String
>
> def activitiesInSessionWindow(
>       userActions: SCollection[(User, Action)],
>       gapDuration: Duration,
>       allowedLateness: Duration = Duration.ZERO,
>       accumulationMode: AccumulationMode =
> AccumulationMode.DISCARDING_FIRED_PANES
> ): SCollection[(User, Iterable[Action])] = {
>     val windowOptions = WindowOptions(
>       allowedLateness = allowedLateness,
>       accumulationMode = accumulationMode
>     )
>
>     userActions
>       .withSessionWindows(gapDuration, windowOptions)
>       .groupByKey
> }
>
> Test scenario (Scio with some additional syntactic sugar):
>
> "Late event" should "not close the gap and two sessions are materialized"
> in runWithContext { sc =>
>   val userActions = testStreamOf[(User, Action)]
>
>     .addElementsAt("00:00:00", ("jack", "open app"))
>
>     .addElementsAt("00:01:00", ("jack", "search product"))
>
>     .addElementsAt("00:01:30", ("jack", "open product"))
>
>     .addElementsAt("00:03:00", ("jack", "add to cart"))
>
>     .advanceWatermarkTo("00:13:00")
>
>     .addElementsAt("00:09:30", ("jack", "checkout"))
>
>     .addElementsAt("00:13:10", ("jack", "close app"))
>
>     .advanceWatermarkToInfinity()
>
>
>
>   val results = activitiesInSessionWindow(sc.testStream(userActions),
> DefaultGapDuration)
>
>
>   results.withTimestamp should inOnTimePane("00:00:00", "00:13:00") {
>
>     containSingleValueAtWindowTime("00:13:00", ("jack", Iterable("open
> app", "search product", "open product", "add to cart")))
>   }
>
>
>
>   // Why session window starts at 00:09:30 if watermark has been advanced
> to 00:13:00? I would expect dropped "checkout" due to lateness.
>   results.withTimestamp should inOnTimePane("00:09:30", "00:23:10") {
>
>     containSingleValueAtWindowTime("00:23:10", ("jack",
> Iterable("checkout", "close app")))
>
>   }
>
> }
>
>
> Look at "checkout" event with event-time 00:09:30 emitted when watermark
> has been already advanced to 00:13:00.
> For me that event should be dropped due to lateness (exaclty like it is
> done for the fixed window), but for some reason for sesion window the event
> is counted for the second session window.
>
> You can find the full source code at:
> https://github.com/mkuthan/example-streaming/blob/beamsessions/src/test/scala/org/mkuthan/examples/streaming/usersessions/BeamUserSessionsTest.scala#L98
>
> What do you think, bug or feature?
>
> Thanks in advance,
> Marcin
>