You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Padarn Wilson <pa...@gmail.com> on 2019/02/26 01:53:22 UTC

Collapsing watermarks after keyby

Hi Flink Mailing List,

Long story short - I want to somehow collapse watermarks at an operator
across keys, so that keys with dragging watermarks do not drag behind.
Details below:

---

I have an application in which I want to perform the follow sequence of
steps: Assume my data is made up of data that has: (time, user, location,
action)

-> Read source
-> KeyBy (UserId, Location)
-> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
-> TriggerOnFirst event
-> KeyBy (Location)
-> SlidingEventTimeWindow(5min length, 5 second gap)
-> Count

The end intention is to count the number of unique users in a given
location - the EventTimeSessionWindow is used to make sure users are only
counted once.

So I created a custom Trigger, which is the same as CountTrigger, but has
the following `TriggerResult" funtion:

@Override
public TriggerResult onElement(Object element, long timestamp, W
window, TriggerContext ctx) throws Exception {
  ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
  count.add(1L);
  if (count.get() == maxCount) {
    return TriggerResult.FIRE_AND_PURGE;
  } else if (count.get() > maxCount) {
    return TriggerResult.PURGE;
  }
  return TriggerResult.CONTINUE;

}

But my final SlidingEventTimeWindow does not fire properly. This is because
(I assume) there are some users with sessions windows that are not closed,
and so the watermark for those keys is running behind and so the
SlidingEventTimeWindow watermark is held back too.

What I feel like I want to achieve is essentially setting the watermark of
the SlidingEventTimeWindow operator to be the maximum (with lateness) of
the input keys, rather than the minimum, but I cannot tell if this is
possible, and if not, what another approach could be.

Thanks,
Padarn

Re: Collapsing watermarks after keyby

Posted by Padarn Wilson <pa...@gmail.com>.
I created a small test to see if I could replicate this... but I couldn't
:-) Below is my code that provides a counter example. It is not very clean,
but perhaps it is useful for someone else in the future:


class SessionWindowTest extends FunSuite with Matchers {

  test("Should advance watermark correctly") {


    val startTime = 0L

    val elements1 = List[Tester](
      Tester("id1:a", "id2:a", startTime),
      Tester("id1:b", "id2:a", startTime+1),
      Tester("id1:b", "id2:a", startTime+100),
      Tester("id1:a", "id2:a", startTime+1)
    )

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setParallelism(1)
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    streamEnv.getConfig.disableSysoutLogging()

    class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Tester] {

      override def extractTimestamp(element: Tester,
previousElementTimestamp: Long): Long = {
        element.time
      }
      override def checkAndGetNextWatermark(lastElement: Tester,
extractedTimestamp: Long): Watermark = {
        new Watermark(extractedTimestamp)
      }
    }

    val stream = streamEnv.addSource(new SourceFunction[Tester]() {
      def run(ctx: SourceFunction.SourceContext[Tester]) {
        elements1.foreach {
          ctx.collect
        }
      }
      override def cancel(): Unit = {}
    }).assignTimestampsAndWatermarks(new PunctuatedAssigner)


    val sessionsStream = stream
        .keyBy(_.id1)
        .window(EventTimeSessionWindows.withGap(Time.milliseconds(2)))
      .apply(
        (key: String, windowInfo, iter: Iterable[Tester], collector:
Collector[Tester]) => {
          val elements = iter.toList
          println("Session window. Elements:", elements)
          println(windowInfo)
          collector.collect(elements.reverse.head)
        })

    val countStream = sessionsStream
        .keyBy(_.id2)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
        .apply(
        (key: String, windowInfo, iter: Iterable[Tester], collector:
Collector[Tester]) => {
          val elements = iter.toList
          println("Tumbling window. Elements:", elements,
windowInfo.getStart, windowInfo.getEnd)
          collector.collect(elements.reverse.head)
        })

    sessionsStream.print()
    countStream.print()

    streamEnv.execute()

  }

}


On Tue, Feb 26, 2019 at 10:49 PM Padarn Wilson <pa...@gmail.com> wrote:

> Okay. I think I still must misunderstand something here. I will work on
> building a unit test around this, hopefully this clears up my confusion.
>
> Thank you,
> Padarn
>
> On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Operator's with multiple inputs emit the minimum of the input's
>> watermarks downstream. In case of a keyBy this means that the watermark is
>> sent to all downstream consumers.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pa...@gmail.com> wrote:
>>
>>> Just to add: by printing intermediate results I see that I definitely
>>> have more than five minutes of data, and by windowing without the session
>>> windows I see that event time watermarks do seem to be generated as
>>> expected.
>>>
>>> Thanks for your help and time.
>>>
>>> Padarn
>>>
>>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pa...@gmail.com> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I will work on an example, but I’m a little confused by how keyBy and
>>>> watermarks work in this case. This documentation says (
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>>>> ):
>>>>
>>>>
>>>> Some operators consume multiple input streams; a union, for example, or
>>>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>>>> operator’s current event time is the minimum of its input streams’ event
>>>> times. As its input streams update their event times, so does the operator.
>>>>
>>>>
>>>> This implies to me that the keyBy splits the watermark?
>>>>
>>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Padarn,
>>>>>
>>>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>>>> global. Therefore, I would suspect that it is rather a problem with
>>>>> generating watermarks at all. Could it be that your input data does not
>>>>> span a period longer than 5 minutes and also does not terminate? Another
>>>>> problem could be the CountTrigger which should not react to the window's
>>>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>>>> I think this will cause the window to not fire. Maybe a working example
>>>>> program with example input could be helpful for further debugging.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Flink Mailing List,
>>>>>>
>>>>>> Long story short - I want to somehow collapse watermarks at an
>>>>>> operator across keys, so that keys with dragging watermarks do not drag
>>>>>> behind. Details below:
>>>>>>
>>>>>> ---
>>>>>>
>>>>>> I have an application in which I want to perform the follow sequence
>>>>>> of steps: Assume my data is made up of data that has: (time, user,
>>>>>> location, action)
>>>>>>
>>>>>> -> Read source
>>>>>> -> KeyBy (UserId, Location)
>>>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>>>>> Session)
>>>>>> -> TriggerOnFirst event
>>>>>> -> KeyBy (Location)
>>>>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>>>>> -> Count
>>>>>>
>>>>>> The end intention is to count the number of unique users in a given
>>>>>> location - the EventTimeSessionWindow is used to make sure users are only
>>>>>> counted once.
>>>>>>
>>>>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>>>>> has the following `TriggerResult" funtion:
>>>>>>
>>>>>> @Override
>>>>>> public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>>>>>   count.add(1L);
>>>>>>   if (count.get() == maxCount) {
>>>>>>     return TriggerResult.FIRE_AND_PURGE;
>>>>>>   } else if (count.get() > maxCount) {
>>>>>>     return TriggerResult.PURGE;
>>>>>>   }
>>>>>>   return TriggerResult.CONTINUE;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>>>>> because (I assume) there are some users with sessions windows that are not
>>>>>> closed, and so the watermark for those keys is running behind and so the
>>>>>> SlidingEventTimeWindow watermark is held back too.
>>>>>>
>>>>>> What I feel like I want to achieve is essentially setting the
>>>>>> watermark of the SlidingEventTimeWindow operator to be the maximum (with
>>>>>> lateness) of the input keys, rather than the minimum, but I cannot tell if
>>>>>> this is possible, and if not, what another approach could be.
>>>>>>
>>>>>> Thanks,
>>>>>> Padarn
>>>>>>
>>>>>

Re: Collapsing watermarks after keyby

Posted by Padarn Wilson <pa...@gmail.com>.
Okay. I think I still must misunderstand something here. I will work on
building a unit test around this, hopefully this clears up my confusion.

Thank you,
Padarn

On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <tr...@apache.org> wrote:

> Operator's with multiple inputs emit the minimum of the input's watermarks
> downstream. In case of a keyBy this means that the watermark is sent to all
> downstream consumers.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pa...@gmail.com> wrote:
>
>> Just to add: by printing intermediate results I see that I definitely
>> have more than five minutes of data, and by windowing without the session
>> windows I see that event time watermarks do seem to be generated as
>> expected.
>>
>> Thanks for your help and time.
>>
>> Padarn
>>
>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pa...@gmail.com> wrote:
>>
>>> Hi Till,
>>>
>>> I will work on an example, but I’m a little confused by how keyBy and
>>> watermarks work in this case. This documentation says (
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>>> ):
>>>
>>>
>>> Some operators consume multiple input streams; a union, for example, or
>>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>>> operator’s current event time is the minimum of its input streams’ event
>>> times. As its input streams update their event times, so does the operator.
>>>
>>>
>>> This implies to me that the keyBy splits the watermark?
>>>
>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Padarn,
>>>>
>>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>>> global. Therefore, I would suspect that it is rather a problem with
>>>> generating watermarks at all. Could it be that your input data does not
>>>> span a period longer than 5 minutes and also does not terminate? Another
>>>> problem could be the CountTrigger which should not react to the window's
>>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>>> I think this will cause the window to not fire. Maybe a working example
>>>> program with example input could be helpful for further debugging.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pa...@gmail.com> wrote:
>>>>
>>>>> Hi Flink Mailing List,
>>>>>
>>>>> Long story short - I want to somehow collapse watermarks at an
>>>>> operator across keys, so that keys with dragging watermarks do not drag
>>>>> behind. Details below:
>>>>>
>>>>> ---
>>>>>
>>>>> I have an application in which I want to perform the follow sequence
>>>>> of steps: Assume my data is made up of data that has: (time, user,
>>>>> location, action)
>>>>>
>>>>> -> Read source
>>>>> -> KeyBy (UserId, Location)
>>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>>>> Session)
>>>>> -> TriggerOnFirst event
>>>>> -> KeyBy (Location)
>>>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>>>> -> Count
>>>>>
>>>>> The end intention is to count the number of unique users in a given
>>>>> location - the EventTimeSessionWindow is used to make sure users are only
>>>>> counted once.
>>>>>
>>>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>>>> has the following `TriggerResult" funtion:
>>>>>
>>>>> @Override
>>>>> public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>>>>   count.add(1L);
>>>>>   if (count.get() == maxCount) {
>>>>>     return TriggerResult.FIRE_AND_PURGE;
>>>>>   } else if (count.get() > maxCount) {
>>>>>     return TriggerResult.PURGE;
>>>>>   }
>>>>>   return TriggerResult.CONTINUE;
>>>>>
>>>>> }
>>>>>
>>>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>>>> because (I assume) there are some users with sessions windows that are not
>>>>> closed, and so the watermark for those keys is running behind and so the
>>>>> SlidingEventTimeWindow watermark is held back too.
>>>>>
>>>>> What I feel like I want to achieve is essentially setting the
>>>>> watermark of the SlidingEventTimeWindow operator to be the maximum (with
>>>>> lateness) of the input keys, rather than the minimum, but I cannot tell if
>>>>> this is possible, and if not, what another approach could be.
>>>>>
>>>>> Thanks,
>>>>> Padarn
>>>>>
>>>>

Re: Collapsing watermarks after keyby

Posted by Till Rohrmann <tr...@apache.org>.
Operator's with multiple inputs emit the minimum of the input's watermarks
downstream. In case of a keyBy this means that the watermark is sent to all
downstream consumers.

Cheers,
Till

On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pa...@gmail.com> wrote:

> Just to add: by printing intermediate results I see that I definitely have
> more than five minutes of data, and by windowing without the session
> windows I see that event time watermarks do seem to be generated as
> expected.
>
> Thanks for your help and time.
>
> Padarn
>
> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pa...@gmail.com> wrote:
>
>> Hi Till,
>>
>> I will work on an example, but I’m a little confused by how keyBy and
>> watermarks work in this case. This documentation says (
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>> ):
>>
>>
>> Some operators consume multiple input streams; a union, for example, or
>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>> operator’s current event time is the minimum of its input streams’ event
>> times. As its input streams update their event times, so does the operator.
>>
>>
>> This implies to me that the keyBy splits the watermark?
>>
>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Padarn,
>>>
>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>> global. Therefore, I would suspect that it is rather a problem with
>>> generating watermarks at all. Could it be that your input data does not
>>> span a period longer than 5 minutes and also does not terminate? Another
>>> problem could be the CountTrigger which should not react to the window's
>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>> I think this will cause the window to not fire. Maybe a working example
>>> program with example input could be helpful for further debugging.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pa...@gmail.com> wrote:
>>>
>>>> Hi Flink Mailing List,
>>>>
>>>> Long story short - I want to somehow collapse watermarks at an operator
>>>> across keys, so that keys with dragging watermarks do not drag behind.
>>>> Details below:
>>>>
>>>> ---
>>>>
>>>> I have an application in which I want to perform the follow sequence of
>>>> steps: Assume my data is made up of data that has: (time, user, location,
>>>> action)
>>>>
>>>> -> Read source
>>>> -> KeyBy (UserId, Location)
>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>>> Session)
>>>> -> TriggerOnFirst event
>>>> -> KeyBy (Location)
>>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>>> -> Count
>>>>
>>>> The end intention is to count the number of unique users in a given
>>>> location - the EventTimeSessionWindow is used to make sure users are only
>>>> counted once.
>>>>
>>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>>> has the following `TriggerResult" funtion:
>>>>
>>>> @Override
>>>> public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>>>   count.add(1L);
>>>>   if (count.get() == maxCount) {
>>>>     return TriggerResult.FIRE_AND_PURGE;
>>>>   } else if (count.get() > maxCount) {
>>>>     return TriggerResult.PURGE;
>>>>   }
>>>>   return TriggerResult.CONTINUE;
>>>>
>>>> }
>>>>
>>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>>> because (I assume) there are some users with sessions windows that are not
>>>> closed, and so the watermark for those keys is running behind and so the
>>>> SlidingEventTimeWindow watermark is held back too.
>>>>
>>>> What I feel like I want to achieve is essentially setting the watermark
>>>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of
>>>> the input keys, rather than the minimum, but I cannot tell if this is
>>>> possible, and if not, what another approach could be.
>>>>
>>>> Thanks,
>>>> Padarn
>>>>
>>>

Re: Collapsing watermarks after keyby

Posted by Padarn Wilson <pa...@gmail.com>.
Just to add: by printing intermediate results I see that I definitely have
more than five minutes of data, and by windowing without the session
windows I see that event time watermarks do seem to be generated as
expected.

Thanks for your help and time.

Padarn

On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pa...@gmail.com> wrote:

> Hi Till,
>
> I will work on an example, but I’m a little confused by how keyBy and
> watermarks work in this case. This documentation says (
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
> ):
>
>
> Some operators consume multiple input streams; a union, for example, or
> operators following a *keyBy(…)*or *partition(…)* function. Such an
> operator’s current event time is the minimum of its input streams’ event
> times. As its input streams update their event times, so does the operator.
>
>
> This implies to me that the keyBy splits the watermark?
>
> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Padarn,
>>
>> Flink does not generate watermarks per keys. Atm watermarks are always
>> global. Therefore, I would suspect that it is rather a problem with
>> generating watermarks at all. Could it be that your input data does not
>> span a period longer than 5 minutes and also does not terminate? Another
>> problem could be the CountTrigger which should not react to the window's
>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>> I think this will cause the window to not fire. Maybe a working example
>> program with example input could be helpful for further debugging.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pa...@gmail.com> wrote:
>>
>>> Hi Flink Mailing List,
>>>
>>> Long story short - I want to somehow collapse watermarks at an operator
>>> across keys, so that keys with dragging watermarks do not drag behind.
>>> Details below:
>>>
>>> ---
>>>
>>> I have an application in which I want to perform the follow sequence of
>>> steps: Assume my data is made up of data that has: (time, user, location,
>>> action)
>>>
>>> -> Read source
>>> -> KeyBy (UserId, Location)
>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>> Session)
>>> -> TriggerOnFirst event
>>> -> KeyBy (Location)
>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>> -> Count
>>>
>>> The end intention is to count the number of unique users in a given
>>> location - the EventTimeSessionWindow is used to make sure users are only
>>> counted once.
>>>
>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>> has the following `TriggerResult" funtion:
>>>
>>> @Override
>>> public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>>   count.add(1L);
>>>   if (count.get() == maxCount) {
>>>     return TriggerResult.FIRE_AND_PURGE;
>>>   } else if (count.get() > maxCount) {
>>>     return TriggerResult.PURGE;
>>>   }
>>>   return TriggerResult.CONTINUE;
>>>
>>> }
>>>
>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>> because (I assume) there are some users with sessions windows that are not
>>> closed, and so the watermark for those keys is running behind and so the
>>> SlidingEventTimeWindow watermark is held back too.
>>>
>>> What I feel like I want to achieve is essentially setting the watermark
>>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of
>>> the input keys, rather than the minimum, but I cannot tell if this is
>>> possible, and if not, what another approach could be.
>>>
>>> Thanks,
>>> Padarn
>>>
>>

Re: Collapsing watermarks after keyby

Posted by Padarn Wilson <pa...@gmail.com>.
Hi Till,

I will work on an example, but I’m a little confused by how keyBy and
watermarks work in this case. This documentation says (
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
):


Some operators consume multiple input streams; a union, for example, or
operators following a *keyBy(…)*or *partition(…)* function. Such an
operator’s current event time is the minimum of its input streams’ event
times. As its input streams update their event times, so does the operator.


This implies to me that the keyBy splits the watermark?

On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Padarn,
>
> Flink does not generate watermarks per keys. Atm watermarks are always
> global. Therefore, I would suspect that it is rather a problem with
> generating watermarks at all. Could it be that your input data does not
> span a period longer than 5 minutes and also does not terminate? Another
> problem could be the CountTrigger which should not react to the window's
> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
> I think this will cause the window to not fire. Maybe a working example
> program with example input could be helpful for further debugging.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pa...@gmail.com> wrote:
>
>> Hi Flink Mailing List,
>>
>> Long story short - I want to somehow collapse watermarks at an operator
>> across keys, so that keys with dragging watermarks do not drag behind.
>> Details below:
>>
>> ---
>>
>> I have an application in which I want to perform the follow sequence of
>> steps: Assume my data is made up of data that has: (time, user, location,
>> action)
>>
>> -> Read source
>> -> KeyBy (UserId, Location)
>> -> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
>> -> TriggerOnFirst event
>> -> KeyBy (Location)
>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>> -> Count
>>
>> The end intention is to count the number of unique users in a given
>> location - the EventTimeSessionWindow is used to make sure users are only
>> counted once.
>>
>> So I created a custom Trigger, which is the same as CountTrigger, but has
>> the following `TriggerResult" funtion:
>>
>> @Override
>> public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>   count.add(1L);
>>   if (count.get() == maxCount) {
>>     return TriggerResult.FIRE_AND_PURGE;
>>   } else if (count.get() > maxCount) {
>>     return TriggerResult.PURGE;
>>   }
>>   return TriggerResult.CONTINUE;
>>
>> }
>>
>> But my final SlidingEventTimeWindow does not fire properly. This is
>> because (I assume) there are some users with sessions windows that are not
>> closed, and so the watermark for those keys is running behind and so the
>> SlidingEventTimeWindow watermark is held back too.
>>
>> What I feel like I want to achieve is essentially setting the watermark
>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of
>> the input keys, rather than the minimum, but I cannot tell if this is
>> possible, and if not, what another approach could be.
>>
>> Thanks,
>> Padarn
>>
>

Re: Collapsing watermarks after keyby

Posted by Till Rohrmann <tr...@apache.org>.
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always
global. Therefore, I would suspect that it is rather a problem with
generating watermarks at all. Could it be that your input data does not
span a period longer than 5 minutes and also does not terminate? Another
problem could be the CountTrigger which should not react to the window's
end time. The method onEventTime simply returns TriggerResult.CONTINUE and
I think this will cause the window to not fire. Maybe a working example
program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pa...@gmail.com> wrote:

> Hi Flink Mailing List,
>
> Long story short - I want to somehow collapse watermarks at an operator
> across keys, so that keys with dragging watermarks do not drag behind.
> Details below:
>
> ---
>
> I have an application in which I want to perform the follow sequence of
> steps: Assume my data is made up of data that has: (time, user, location,
> action)
>
> -> Read source
> -> KeyBy (UserId, Location)
> -> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
> -> TriggerOnFirst event
> -> KeyBy (Location)
> -> SlidingEventTimeWindow(5min length, 5 second gap)
> -> Count
>
> The end intention is to count the number of unique users in a given
> location - the EventTimeSessionWindow is used to make sure users are only
> counted once.
>
> So I created a custom Trigger, which is the same as CountTrigger, but has
> the following `TriggerResult" funtion:
>
> @Override
> public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>   count.add(1L);
>   if (count.get() == maxCount) {
>     return TriggerResult.FIRE_AND_PURGE;
>   } else if (count.get() > maxCount) {
>     return TriggerResult.PURGE;
>   }
>   return TriggerResult.CONTINUE;
>
> }
>
> But my final SlidingEventTimeWindow does not fire properly. This is
> because (I assume) there are some users with sessions windows that are not
> closed, and so the watermark for those keys is running behind and so the
> SlidingEventTimeWindow watermark is held back too.
>
> What I feel like I want to achieve is essentially setting the watermark of
> the SlidingEventTimeWindow operator to be the maximum (with lateness) of
> the input keys, rather than the minimum, but I cannot tell if this is
> possible, and if not, what another approach could be.
>
> Thanks,
> Padarn
>