You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Carlos Alonso <ca...@mrcalonso.com> on 2018/01/09 19:43:12 UTC

Triggers based on size

Hi everyone!!

I was wondering if there is an option to trigger window panes based on the
size of the pane itself (rather than the number of elements).

To provide a little bit more of context we're backing up a PubSub topic
into GCS with the "special" feature that, depending on the "type" of the
message, the GCS destination is one or another.

Messages' 'shape' published there is quite random, some of them are very
frequent and small, some others very big but sparse... We have around 150
messages per second (in total) and we're firing every 15 minutes and
experiencing OOM errors, we've considered firing based on the number of
items as well, but given the randomness of the input, I don't think it will
be a final solution either.

Having a trigger based on size would be great, another option would be to
have a dynamic shards number for the PTransform that actually writes the
files.

What is your recommendation for this use case?

Thanks!!

Re: Triggers based on size

Posted by Carlos Alonso <ca...@mrcalonso.com>.
I've finally managed to understand, write and run my job using stateful and
timely processing. Here:
https://gist.github.com/calonso/14b06f4f58faf286cc79181cb46a9b85 you can
see the code should someone need inspiration.

Thanks a lot for your help, for encouraging me going that way, for such a
great product and the amazing community you're building around it.

On Wed, Jan 10, 2018 at 6:11 PM Robert Bradshaw <ro...@google.com> wrote:

> Sounds like you have enough to get started. Feel free to come back
> here with more specifics if you can't get it working.
>
> On Wed, Jan 10, 2018 at 9:09 AM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
> > Thanks Robert!!
> >
> > After reading this and the former post about stateful processing
> Kenneth's
> > suggestions sounds sensible. I'll probably give them a try!! Is there
> > anything you would like to advice me before starting?
> >
> > Thanks!
> >
> > On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw <ro...@google.com>
> > wrote:
> >>
> >> Unfortunately, the metadata driven trigger is still just an idea, not
> >> yet implemented.
> >>
> >> A good introduction to state and timers can be found at
> >> https://beam.apache.org/blog/2017/08/28/timely-processing.html
> >>
> >> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso <ca...@mrcalonso.com>
> >> wrote:
> >> > Hi Robert, Kenneth.
> >> >
> >> > Thanks a lot to both of you for your responses!!
> >> >
> >> > Kenneth, unfortunately I'm not sure we're experienced enough with
> Apache
> >> > Beam to get anywhere close to your suggestion, but thanks anyway!!
> >> >
> >> > Robert, your suggestion sounds great to me, could you please provide
> any
> >> > example on how to use that 'metadata driven' trigger?
> >> >
> >> > Thanks!
> >> >
> >> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <kl...@google.com>
> wrote:
> >> >>
> >> >> Often, when you need or want more control than triggers provide, such
> >> >> as
> >> >> input-type-specific logic like yours, you can use state and timers in
> >> >> ParDo
> >> >> to control when to output. You lose any potential optimizations of
> >> >> Combine
> >> >> based on associativity/commutativity and assume the burden of making
> >> >> sure
> >> >> your output is sensible, but dropping to low-level stateful
> computation
> >> >> may
> >> >> be your best bet.
> >> >>
> >> >> Kenn
> >> >>
> >> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <
> robertwb@google.com>
> >> >> wrote:
> >> >>>
> >> >>> We've tossed around the idea of "metadata-driven" triggers which
> would
> >> >>> essentially let you provide a mapping element -> metadata and a
> >> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
> >> >>> AfterCount being a special case of this, with the mapping fn being _
> >> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
> >> >>> provide a (perhaps approximate) sizing mapping fn).
> >> >>>
> >> >>> Note, however, that there's no guarantee that the trigger fire as
> soon
> >> >>> as possible; due to runtime characteristics a significant amount of
> >> >>> data may be buffered (or come in at once) before the trigger is
> >> >>> queried. One possibility would be to follow your triggering with a
> >> >>> DoFn that breaks up large value streams into multiple manageable
> sized
> >> >>> ones as needed.
> >> >>>
> >> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <
> carlos@mrcalonso.com>
> >> >>> wrote:
> >> >>> > Hi everyone!!
> >> >>> >
> >> >>> > I was wondering if there is an option to trigger window panes
> based
> >> >>> > on
> >> >>> > the
> >> >>> > size of the pane itself (rather than the number of elements).
> >> >>> >
> >> >>> > To provide a little bit more of context we're backing up a PubSub
> >> >>> > topic
> >> >>> > into
> >> >>> > GCS with the "special" feature that, depending on the "type" of
> the
> >> >>> > message,
> >> >>> > the GCS destination is one or another.
> >> >>> >
> >> >>> > Messages' 'shape' published there is quite random, some of them
> are
> >> >>> > very
> >> >>> > frequent and small, some others very big but sparse... We have
> >> >>> > around
> >> >>> > 150
> >> >>> > messages per second (in total) and we're firing every 15 minutes
> and
> >> >>> > experiencing OOM errors, we've considered firing based on the
> number
> >> >>> > of
> >> >>> > items as well, but given the randomness of the input, I don't
> think
> >> >>> > it
> >> >>> > will
> >> >>> > be a final solution either.
> >> >>> >
> >> >>> > Having a trigger based on size would be great, another option
> would
> >> >>> > be
> >> >>> > to
> >> >>> > have a dynamic shards number for the PTransform that actually
> writes
> >> >>> > the
> >> >>> > files.
> >> >>> >
> >> >>> > What is your recommendation for this use case?
> >> >>> >
> >> >>> > Thanks!!
> >> >>
> >> >>
> >> >
>

Re: Triggers based on size

Posted by Robert Bradshaw <ro...@google.com>.
Sounds like you have enough to get started. Feel free to come back
here with more specifics if you can't get it working.

On Wed, Jan 10, 2018 at 9:09 AM, Carlos Alonso <ca...@mrcalonso.com> wrote:
> Thanks Robert!!
>
> After reading this and the former post about stateful processing Kenneth's
> suggestions sounds sensible. I'll probably give them a try!! Is there
> anything you would like to advice me before starting?
>
> Thanks!
>
> On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw <ro...@google.com>
> wrote:
>>
>> Unfortunately, the metadata driven trigger is still just an idea, not
>> yet implemented.
>>
>> A good introduction to state and timers can be found at
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>> > Hi Robert, Kenneth.
>> >
>> > Thanks a lot to both of you for your responses!!
>> >
>> > Kenneth, unfortunately I'm not sure we're experienced enough with Apache
>> > Beam to get anywhere close to your suggestion, but thanks anyway!!
>> >
>> > Robert, your suggestion sounds great to me, could you please provide any
>> > example on how to use that 'metadata driven' trigger?
>> >
>> > Thanks!
>> >
>> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <kl...@google.com> wrote:
>> >>
>> >> Often, when you need or want more control than triggers provide, such
>> >> as
>> >> input-type-specific logic like yours, you can use state and timers in
>> >> ParDo
>> >> to control when to output. You lose any potential optimizations of
>> >> Combine
>> >> based on associativity/commutativity and assume the burden of making
>> >> sure
>> >> your output is sensible, but dropping to low-level stateful computation
>> >> may
>> >> be your best bet.
>> >>
>> >> Kenn
>> >>
>> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <ro...@google.com>
>> >> wrote:
>> >>>
>> >>> We've tossed around the idea of "metadata-driven" triggers which would
>> >>> essentially let you provide a mapping element -> metadata and a
>> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
>> >>> AfterCount being a special case of this, with the mapping fn being _
>> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
>> >>> provide a (perhaps approximate) sizing mapping fn).
>> >>>
>> >>> Note, however, that there's no guarantee that the trigger fire as soon
>> >>> as possible; due to runtime characteristics a significant amount of
>> >>> data may be buffered (or come in at once) before the trigger is
>> >>> queried. One possibility would be to follow your triggering with a
>> >>> DoFn that breaks up large value streams into multiple manageable sized
>> >>> ones as needed.
>> >>>
>> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <ca...@mrcalonso.com>
>> >>> wrote:
>> >>> > Hi everyone!!
>> >>> >
>> >>> > I was wondering if there is an option to trigger window panes based
>> >>> > on
>> >>> > the
>> >>> > size of the pane itself (rather than the number of elements).
>> >>> >
>> >>> > To provide a little bit more of context we're backing up a PubSub
>> >>> > topic
>> >>> > into
>> >>> > GCS with the "special" feature that, depending on the "type" of the
>> >>> > message,
>> >>> > the GCS destination is one or another.
>> >>> >
>> >>> > Messages' 'shape' published there is quite random, some of them are
>> >>> > very
>> >>> > frequent and small, some others very big but sparse... We have
>> >>> > around
>> >>> > 150
>> >>> > messages per second (in total) and we're firing every 15 minutes and
>> >>> > experiencing OOM errors, we've considered firing based on the number
>> >>> > of
>> >>> > items as well, but given the randomness of the input, I don't think
>> >>> > it
>> >>> > will
>> >>> > be a final solution either.
>> >>> >
>> >>> > Having a trigger based on size would be great, another option would
>> >>> > be
>> >>> > to
>> >>> > have a dynamic shards number for the PTransform that actually writes
>> >>> > the
>> >>> > files.
>> >>> >
>> >>> > What is your recommendation for this use case?
>> >>> >
>> >>> > Thanks!!
>> >>
>> >>
>> >

Re: Triggers based on size

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Thanks Robert!!

After reading this and the former post about stateful processing Kenneth's
suggestions sounds sensible. I'll probably give them a try!! Is there
anything you would like to advice me before starting?

Thanks!

On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw <ro...@google.com>
wrote:

> Unfortunately, the metadata driven trigger is still just an idea, not
> yet implemented.
>
> A good introduction to state and timers can be found at
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
> > Hi Robert, Kenneth.
> >
> > Thanks a lot to both of you for your responses!!
> >
> > Kenneth, unfortunately I'm not sure we're experienced enough with Apache
> > Beam to get anywhere close to your suggestion, but thanks anyway!!
> >
> > Robert, your suggestion sounds great to me, could you please provide any
> > example on how to use that 'metadata driven' trigger?
> >
> > Thanks!
> >
> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <kl...@google.com> wrote:
> >>
> >> Often, when you need or want more control than triggers provide, such as
> >> input-type-specific logic like yours, you can use state and timers in
> ParDo
> >> to control when to output. You lose any potential optimizations of
> Combine
> >> based on associativity/commutativity and assume the burden of making
> sure
> >> your output is sensible, but dropping to low-level stateful computation
> may
> >> be your best bet.
> >>
> >> Kenn
> >>
> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <ro...@google.com>
> >> wrote:
> >>>
> >>> We've tossed around the idea of "metadata-driven" triggers which would
> >>> essentially let you provide a mapping element -> metadata and a
> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
> >>> AfterCount being a special case of this, with the mapping fn being _
> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
> >>> provide a (perhaps approximate) sizing mapping fn).
> >>>
> >>> Note, however, that there's no guarantee that the trigger fire as soon
> >>> as possible; due to runtime characteristics a significant amount of
> >>> data may be buffered (or come in at once) before the trigger is
> >>> queried. One possibility would be to follow your triggering with a
> >>> DoFn that breaks up large value streams into multiple manageable sized
> >>> ones as needed.
> >>>
> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <ca...@mrcalonso.com>
> >>> wrote:
> >>> > Hi everyone!!
> >>> >
> >>> > I was wondering if there is an option to trigger window panes based
> on
> >>> > the
> >>> > size of the pane itself (rather than the number of elements).
> >>> >
> >>> > To provide a little bit more of context we're backing up a PubSub
> topic
> >>> > into
> >>> > GCS with the "special" feature that, depending on the "type" of the
> >>> > message,
> >>> > the GCS destination is one or another.
> >>> >
> >>> > Messages' 'shape' published there is quite random, some of them are
> >>> > very
> >>> > frequent and small, some others very big but sparse... We have around
> >>> > 150
> >>> > messages per second (in total) and we're firing every 15 minutes and
> >>> > experiencing OOM errors, we've considered firing based on the number
> of
> >>> > items as well, but given the randomness of the input, I don't think
> it
> >>> > will
> >>> > be a final solution either.
> >>> >
> >>> > Having a trigger based on size would be great, another option would
> be
> >>> > to
> >>> > have a dynamic shards number for the PTransform that actually writes
> >>> > the
> >>> > files.
> >>> >
> >>> > What is your recommendation for this use case?
> >>> >
> >>> > Thanks!!
> >>
> >>
> >
>

Re: Triggers based on size

Posted by Robert Bradshaw <ro...@google.com>.
Unfortunately, the metadata driven trigger is still just an idea, not
yet implemented.

A good introduction to state and timers can be found at
https://beam.apache.org/blog/2017/08/28/timely-processing.html

On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso <ca...@mrcalonso.com> wrote:
> Hi Robert, Kenneth.
>
> Thanks a lot to both of you for your responses!!
>
> Kenneth, unfortunately I'm not sure we're experienced enough with Apache
> Beam to get anywhere close to your suggestion, but thanks anyway!!
>
> Robert, your suggestion sounds great to me, could you please provide any
> example on how to use that 'metadata driven' trigger?
>
> Thanks!
>
> On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>> Often, when you need or want more control than triggers provide, such as
>> input-type-specific logic like yours, you can use state and timers in ParDo
>> to control when to output. You lose any potential optimizations of Combine
>> based on associativity/commutativity and assume the burden of making sure
>> your output is sensible, but dropping to low-level stateful computation may
>> be your best bet.
>>
>> Kenn
>>
>> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <ro...@google.com>
>> wrote:
>>>
>>> We've tossed around the idea of "metadata-driven" triggers which would
>>> essentially let you provide a mapping element -> metadata and a
>>> monotonic CombineFn metadata* -> bool that would allow for this (the
>>> AfterCount being a special case of this, with the mapping fn being _
>>> -> 1, and the CombineFn being sum(...) >= N, for size one would
>>> provide a (perhaps approximate) sizing mapping fn).
>>>
>>> Note, however, that there's no guarantee that the trigger fire as soon
>>> as possible; due to runtime characteristics a significant amount of
>>> data may be buffered (or come in at once) before the trigger is
>>> queried. One possibility would be to follow your triggering with a
>>> DoFn that breaks up large value streams into multiple manageable sized
>>> ones as needed.
>>>
>>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>> > Hi everyone!!
>>> >
>>> > I was wondering if there is an option to trigger window panes based on
>>> > the
>>> > size of the pane itself (rather than the number of elements).
>>> >
>>> > To provide a little bit more of context we're backing up a PubSub topic
>>> > into
>>> > GCS with the "special" feature that, depending on the "type" of the
>>> > message,
>>> > the GCS destination is one or another.
>>> >
>>> > Messages' 'shape' published there is quite random, some of them are
>>> > very
>>> > frequent and small, some others very big but sparse... We have around
>>> > 150
>>> > messages per second (in total) and we're firing every 15 minutes and
>>> > experiencing OOM errors, we've considered firing based on the number of
>>> > items as well, but given the randomness of the input, I don't think it
>>> > will
>>> > be a final solution either.
>>> >
>>> > Having a trigger based on size would be great, another option would be
>>> > to
>>> > have a dynamic shards number for the PTransform that actually writes
>>> > the
>>> > files.
>>> >
>>> > What is your recommendation for this use case?
>>> >
>>> > Thanks!!
>>
>>
>

Re: Triggers based on size

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Hi Robert, Kenneth.

Thanks a lot to both of you for your responses!!

Kenneth, unfortunately I'm not sure we're experienced enough with Apache
Beam to get anywhere close to your suggestion, but thanks anyway!!

Robert, your suggestion sounds great to me, could you please provide any
example on how to use that 'metadata driven' trigger?

Thanks!

On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <kl...@google.com> wrote:

> Often, when you need or want more control than triggers provide, such as
> input-type-specific logic like yours, you can use state and timers in ParDo
> to control when to output. You lose any potential optimizations of Combine
> based on associativity/commutativity and assume the burden of making sure
> your output is sensible, but dropping to low-level stateful computation may
> be your best bet.
>
> Kenn
>
> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <ro...@google.com>
> wrote:
>
>> We've tossed around the idea of "metadata-driven" triggers which would
>> essentially let you provide a mapping element -> metadata and a
>> monotonic CombineFn metadata* -> bool that would allow for this (the
>> AfterCount being a special case of this, with the mapping fn being _
>> -> 1, and the CombineFn being sum(...) >= N, for size one would
>> provide a (perhaps approximate) sizing mapping fn).
>>
>> Note, however, that there's no guarantee that the trigger fire as soon
>> as possible; due to runtime characteristics a significant amount of
>> data may be buffered (or come in at once) before the trigger is
>> queried. One possibility would be to follow your triggering with a
>> DoFn that breaks up large value streams into multiple manageable sized
>> ones as needed.
>>
>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>> > Hi everyone!!
>> >
>> > I was wondering if there is an option to trigger window panes based on
>> the
>> > size of the pane itself (rather than the number of elements).
>> >
>> > To provide a little bit more of context we're backing up a PubSub topic
>> into
>> > GCS with the "special" feature that, depending on the "type" of the
>> message,
>> > the GCS destination is one or another.
>> >
>> > Messages' 'shape' published there is quite random, some of them are very
>> > frequent and small, some others very big but sparse... We have around
>> 150
>> > messages per second (in total) and we're firing every 15 minutes and
>> > experiencing OOM errors, we've considered firing based on the number of
>> > items as well, but given the randomness of the input, I don't think it
>> will
>> > be a final solution either.
>> >
>> > Having a trigger based on size would be great, another option would be
>> to
>> > have a dynamic shards number for the PTransform that actually writes the
>> > files.
>> >
>> > What is your recommendation for this use case?
>> >
>> > Thanks!!
>>
>
>

Re: Triggers based on size

Posted by Kenneth Knowles <kl...@google.com>.
Often, when you need or want more control than triggers provide, such as
input-type-specific logic like yours, you can use state and timers in ParDo
to control when to output. You lose any potential optimizations of Combine
based on associativity/commutativity and assume the burden of making sure
your output is sensible, but dropping to low-level stateful computation may
be your best bet.

Kenn

On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <ro...@google.com>
wrote:

> We've tossed around the idea of "metadata-driven" triggers which would
> essentially let you provide a mapping element -> metadata and a
> monotonic CombineFn metadata* -> bool that would allow for this (the
> AfterCount being a special case of this, with the mapping fn being _
> -> 1, and the CombineFn being sum(...) >= N, for size one would
> provide a (perhaps approximate) sizing mapping fn).
>
> Note, however, that there's no guarantee that the trigger fire as soon
> as possible; due to runtime characteristics a significant amount of
> data may be buffered (or come in at once) before the trigger is
> queried. One possibility would be to follow your triggering with a
> DoFn that breaks up large value streams into multiple manageable sized
> ones as needed.
>
> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
> > Hi everyone!!
> >
> > I was wondering if there is an option to trigger window panes based on
> the
> > size of the pane itself (rather than the number of elements).
> >
> > To provide a little bit more of context we're backing up a PubSub topic
> into
> > GCS with the "special" feature that, depending on the "type" of the
> message,
> > the GCS destination is one or another.
> >
> > Messages' 'shape' published there is quite random, some of them are very
> > frequent and small, some others very big but sparse... We have around 150
> > messages per second (in total) and we're firing every 15 minutes and
> > experiencing OOM errors, we've considered firing based on the number of
> > items as well, but given the randomness of the input, I don't think it
> will
> > be a final solution either.
> >
> > Having a trigger based on size would be great, another option would be to
> > have a dynamic shards number for the PTransform that actually writes the
> > files.
> >
> > What is your recommendation for this use case?
> >
> > Thanks!!
>

Re: Triggers based on size

Posted by Robert Bradshaw <ro...@google.com>.
We've tossed around the idea of "metadata-driven" triggers which would
essentially let you provide a mapping element -> metadata and a
monotonic CombineFn metadata* -> bool that would allow for this (the
AfterCount being a special case of this, with the mapping fn being _
-> 1, and the CombineFn being sum(...) >= N, for size one would
provide a (perhaps approximate) sizing mapping fn).

Note, however, that there's no guarantee that the trigger fire as soon
as possible; due to runtime characteristics a significant amount of
data may be buffered (or come in at once) before the trigger is
queried. One possibility would be to follow your triggering with a
DoFn that breaks up large value streams into multiple manageable sized
ones as needed.

On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <ca...@mrcalonso.com> wrote:
> Hi everyone!!
>
> I was wondering if there is an option to trigger window panes based on the
> size of the pane itself (rather than the number of elements).
>
> To provide a little bit more of context we're backing up a PubSub topic into
> GCS with the "special" feature that, depending on the "type" of the message,
> the GCS destination is one or another.
>
> Messages' 'shape' published there is quite random, some of them are very
> frequent and small, some others very big but sparse... We have around 150
> messages per second (in total) and we're firing every 15 minutes and
> experiencing OOM errors, we've considered firing based on the number of
> items as well, but given the randomness of the input, I don't think it will
> be a final solution either.
>
> Having a trigger based on size would be great, another option would be to
> have a dynamic shards number for the PTransform that actually writes the
> files.
>
> What is your recommendation for this use case?
>
> Thanks!!