You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Chandan Bhattad <ch...@gmail.com> on 2021/07/07 06:39:28 UTC

Help: Apache Beam Session Window with limit on number of events and time elapsed from window start

Hi Team,

Hope you are doing well.

I have a use case around session windowing with some customizations.

We need to have create user sessions based on *any *of the 3 conditions
below

1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no
event for 30 minutes for a user)
2. Number of events in the session window reaches 20,000
3. 4 hours have elapsed since window start

Below is what I have tried.

beam.WindowInto(window.Sessions(session_timeout_seconds),
                trigger=trigger.Repeatedly(
                    trigger.AfterAny(
                        trigger.AfterCount(20000),
                        trigger.DefaultTrigger(),
                        TriggerWhenWindowStartPassesXHours(hours=0.2)
                    )
                ),
                timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW,
                accumulation_mode=trigger.AccumulationMode.DISCARDING
)


# Custom Trigger Implementation
from apache_beam.transforms.trigger import DefaultTrigger
from apache_beam.utils.timestamp import Timestamp


class TriggerWhenWindowStartPassesXHours(DefaultTrigger):

    def __init__(self, hours=4):
        super(TriggerWhenWindowStartPassesXHours, self).__init__()
        self.hours = hours

    def __repr__(self):
        return 'TriggerWhenWindowStartPassesXHours()'

    def should_fire(self, time_domain, watermark, window, context):
        should_fire = (Timestamp.now() - window.start).micros >=
3600000000 * self.hours
        return should_fire

    @staticmethod
    def from_runner_api(proto, context):
        return TriggerWhenWindowStartPassesXHours()

The above works well, but there is an issue. Whenever Trigger No. 3 above
fires -- it does not create a new session window, but the same window is
continued.
What happens due to this is, the No. 3 would keep on firing on every
subsequent after 4 hours since window start, since should_fire condition is:

should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours

and since window.start is never updated after the first time trigger is
fired - it will fire for every subsequent event after the first trigger.

I have also posted this on stackoverflow:
https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events

I would be very grateful for any help as to how to achieve this.
Thanks a lot in advance.

Regards,
Chandan

Re: Help: Apache Beam Session Window with limit on number of events and time elapsed from window start

Posted by Kenneth Knowles <ke...@apache.org>.
Hi Chandan,

I am moving this thread to user@beam.apache.org. I think that is the best
place to discuss.

Kenn

On Wed, Jul 7, 2021 at 9:32 AM Chandan Bhattad <ch...@gmail.com>
wrote:

> Hi Team,
>
> Hope you are doing well.
>
> I have a use case around session windowing with some customizations.
>
> We need to have create user sessions based on *any *of the 3 conditions
> below
>
> 1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no
> event for 30 minutes for a user)
> 2. Number of events in the session window reaches 20,000
> 3. 4 hours have elapsed since window start
>
> Below is what I have tried.
>
> beam.WindowInto(window.Sessions(session_timeout_seconds),
>                 trigger=trigger.Repeatedly(
>                     trigger.AfterAny(
>                         trigger.AfterCount(20000),
>                         trigger.DefaultTrigger(),
>                         TriggerWhenWindowStartPassesXHours(hours=0.2)
>                     )
>                 ),
>                 timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW,
>                 accumulation_mode=trigger.AccumulationMode.DISCARDING
> )
>
>
> # Custom Trigger Implementation
> from apache_beam.transforms.trigger import DefaultTrigger
> from apache_beam.utils.timestamp import Timestamp
>
>
> class TriggerWhenWindowStartPassesXHours(DefaultTrigger):
>
>     def __init__(self, hours=4):
>         super(TriggerWhenWindowStartPassesXHours, self).__init__()
>         self.hours = hours
>
>     def __repr__(self):
>         return 'TriggerWhenWindowStartPassesXHours()'
>
>     def should_fire(self, time_domain, watermark, window, context):
>         should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours
>         return should_fire
>
>     @staticmethod
>     def from_runner_api(proto, context):
>         return TriggerWhenWindowStartPassesXHours()
>
> The above works well, but there is an issue. Whenever Trigger No. 3 above
> fires -- it does not create a new session window, but the same window is
> continued.
> What happens due to this is, the No. 3 would keep on firing on every
> subsequent after 4 hours since window start, since should_fire condition is:
>
> should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours
>
> and since window.start is never updated after the first time trigger is
> fired - it will fire for every subsequent event after the first trigger.
>
> I have also posted this on stackoverflow:
> https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events
>
> I would be very grateful for any help as to how to achieve this.
> Thanks a lot in advance.
>
> Regards,
> Chandan
>

Re: Help: Apache Beam Session Window with limit on number of events and time elapsed from window start

Posted by Kenneth Knowles <ke...@apache.org>.
Hi Chandan,

I am moving this thread to user@beam.apache.org. I think that is the best
place to discuss.

Kenn

On Wed, Jul 7, 2021 at 9:32 AM Chandan Bhattad <ch...@gmail.com>
wrote:

> Hi Team,
>
> Hope you are doing well.
>
> I have a use case around session windowing with some customizations.
>
> We need to have create user sessions based on *any *of the 3 conditions
> below
>
> 1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no
> event for 30 minutes for a user)
> 2. Number of events in the session window reaches 20,000
> 3. 4 hours have elapsed since window start
>
> Below is what I have tried.
>
> beam.WindowInto(window.Sessions(session_timeout_seconds),
>                 trigger=trigger.Repeatedly(
>                     trigger.AfterAny(
>                         trigger.AfterCount(20000),
>                         trigger.DefaultTrigger(),
>                         TriggerWhenWindowStartPassesXHours(hours=0.2)
>                     )
>                 ),
>                 timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW,
>                 accumulation_mode=trigger.AccumulationMode.DISCARDING
> )
>
>
> # Custom Trigger Implementation
> from apache_beam.transforms.trigger import DefaultTrigger
> from apache_beam.utils.timestamp import Timestamp
>
>
> class TriggerWhenWindowStartPassesXHours(DefaultTrigger):
>
>     def __init__(self, hours=4):
>         super(TriggerWhenWindowStartPassesXHours, self).__init__()
>         self.hours = hours
>
>     def __repr__(self):
>         return 'TriggerWhenWindowStartPassesXHours()'
>
>     def should_fire(self, time_domain, watermark, window, context):
>         should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours
>         return should_fire
>
>     @staticmethod
>     def from_runner_api(proto, context):
>         return TriggerWhenWindowStartPassesXHours()
>
> The above works well, but there is an issue. Whenever Trigger No. 3 above
> fires -- it does not create a new session window, but the same window is
> continued.
> What happens due to this is, the No. 3 would keep on firing on every
> subsequent after 4 hours since window start, since should_fire condition is:
>
> should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours
>
> and since window.start is never updated after the first time trigger is
> fired - it will fire for every subsequent event after the first trigger.
>
> I have also posted this on stackoverflow:
> https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events
>
> I would be very grateful for any help as to how to achieve this.
> Thanks a lot in advance.
>
> Regards,
> Chandan
>