You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Csaba Kassai <cs...@doctusoft.com> on 2017/04/02 21:34:31 UTC

Re: exclusive window per event

Hi Antony,

there is a small custom windowing example in this github repo which can be
useful for you: https://github.com/Doctusoft/ds-dataflow-examples
The code is not documented yet, so let me know if you have any question
about it.

Regards,
Csabi



On Fri, 31 Mar 2017 at 18:04 Robert Bradshaw <ro...@google.com> wrote:

Yes, you can extend BoundedWindow to be your own Window type that has
additional members and different equality semantics (rather than
re-using IntervalWindow). The only requirement is that it have an
endpoint. (You'll also have to write a Coder for your new Window
subclass and return that in your WindowFn.

https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html

On Thu, Mar 30, 2017 at 11:19 PM, Antony Mayi <an...@yahoo.com> wrote:
> Hi,
>
> is there a way to implement windowing so that each input event gets into
its
> own exclusive window?
>
> I can see the PartitioningWindowFn can be extended. If I implement the
> assignWindow to return new IntervalWindow with both start and end time set
> to the even time and in case there are two distinct events arriving at the
> same time (indistinguishable within Instant granularity), would this be
> processed as two separate windows without interfering the event data
during
> any transformations?
>
> My motivation is to to be able to flatmap individual input events into a
> pcollection of multiple elements that - being a single exclusive window -
> can be grouped/... independently of other events (even if the other event
> has same time).
>
> thanks,
> Antony.

Re: exclusive window per event

Posted by Robert Bradshaw <ro...@google.com>.
So if I understand correctly, you break a single event up into several
elements, do some processing on them, and then want to re-group
(aggregate) all the elements corresponding to a single original event,
right?

The most natural way to do this would probably be to key your elements
by some kind of event id, and then do a combinePerKey. This will also
yield better parallelism than doing a global combine with distinct
windows. To do this in a streaming manner you would still need to do
windowing, but you could use any WindowFn that doesn't introduce too
much latency (say, FixedWindows with one millisecond size) just so the
gathering end can have confidence it's received *all* elements for a
given event and your key prevents any cross-event combining.

If all you're doing is simple Map(s) of the elements corresponding to
a single event, it may even be cheaper to not explode + recombine.

- Robert



On Mon, Apr 3, 2017 at 10:36 PM, Antony Mayi <an...@yahoo.com> wrote:
> In my case the GroupByKey happens to be used internally when calling
> Combine.globally() on the flattened event elements (so I don't need
> GroupByKey explicitly but it gets used unavoidably).
>
> I already tried using triggers instead of windowing - (trying to trigger for
> every event) something like this:
>
> .apply(Window.<Event>triggering(AfterPane.elementCountAtLeast(1)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
>
> but that's not advancing - it seems to be waiting for more events and not
> really passing the event through the pipeline.
>
> thanks,
> a.
>
>
>
> On Monday, 3 April 2017, 22:06, Ben Chambers <bc...@google.com> wrote:
>
>
> If your goal is to just process events as they come in, you should be able
> to do something like:
>
>  Source -> DoFn -> DoFn -> Sink
>
> You only need to GroupByKey if you want all the elements associated with a
> specific key to be processed together -- which it sounds like you don't. If
> you do need a GroupByKey, you could then use a trigger (every element or
> something like that) to trigger the grouping after every element.
>
> On Mon, Apr 3, 2017 at 12:14 PM Antony Mayi <an...@yahoo.com> wrote:
>
> I thought group-by-key in streaming mode cannot run without windowing? This
> is what I get when removing my Window.into():
>
> java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded
> PCollection in the GlobalWindow without a trigger. Use a Window.into or
> Window.triggering transform prior to GroupByKey.
>
> my case is:
> receiving streaming events
> each event is a list of elements
> I need to process elements of each event independently of elements of other
> events - hence my approach of putting each event in its own window, then
> expanding it (since event = list) using flat map and then running group-by
> type of transformation on that window (which would mean it is still isolated
> from other event elements).
>
> thx for ideas,
> a.
>
> On Sunday, 2 April 2017, 23:43, Ben Chambers <bc...@apache.org> wrote:
>
>
> Can you elaborate on your use case? If your goal is to just group things,
> you can assign a key to each element and then apply a group by key. You
> shouldn't need to use windowing for that.
>
> On Sun, Apr 2, 2017, 2:34 PM Csaba Kassai <cs...@doctusoft.com>
> wrote:
>
> Hi Antony,
>
> there is a small custom windowing example in this github repo which can be
> useful for you: https://github.com/Doctusoft/ds-dataflow-examples
> The code is not documented yet, so let me know if you have any question
> about it.
>
> Regards,
> Csabi
>
>
>
> On Fri, 31 Mar 2017 at 18:04 Robert Bradshaw <ro...@google.com> wrote:
>
> Yes, you can extend BoundedWindow to be your own Window type that has
> additional members and different equality semantics (rather than
> re-using IntervalWindow). The only requirement is that it have an
> endpoint. (You'll also have to write a Coder for your new Window
> subclass and return that in your WindowFn.
>
> https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html
>
> On Thu, Mar 30, 2017 at 11:19 PM, Antony Mayi <an...@yahoo.com> wrote:
>> Hi,
>>
>> is there a way to implement windowing so that each input event gets into
>> its
>> own exclusive window?
>>
>> I can see the PartitioningWindowFn can be extended. If I implement the
>> assignWindow to return new IntervalWindow with both start and end time set
>> to the even time and in case there are two distinct events arriving at the
>> same time (indistinguishable within Instant granularity), would this be
>> processed as two separate windows without interfering the event data
>> during
>> any transformations?
>>
>> My motivation is to to be able to flatmap individual input events into a
>> pcollection of multiple elements that - being a single exclusive window -
>> can be grouped/... independently of other events (even if the other event
>> has same time).
>>
>> thanks,
>> Antony.
>
>
>
>
>

Re: exclusive window per event

Posted by Antony Mayi <an...@yahoo.com>.
In my case the GroupByKey happens to be used internally when calling Combine.globally() on the flattened event elements (so I don't need GroupByKey explicitly but it gets used unavoidably).
I already tried using triggers instead of windowing - (trying to trigger for every event) something like this:
.apply(Window.<Event>triggering(AfterPane.elementCountAtLeast(1)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
but that's not advancing - it seems to be waiting for more events and not really passing the event through the pipeline.
thanks,a.
 

    On Monday, 3 April 2017, 22:06, Ben Chambers <bc...@google.com> wrote:
 

 If your goal is to just process events as they come in, you should be able to do something like:
 Source -> DoFn -> DoFn -> Sink
You only need to GroupByKey if you want all the elements associated with a specific key to be processed together -- which it sounds like you don't. If you do need a GroupByKey, you could then use a trigger (every element or something like that) to trigger the grouping after every element. 
On Mon, Apr 3, 2017 at 12:14 PM Antony Mayi <an...@yahoo.com> wrote:

I thought group-by-key in streaming mode cannot run without windowing? This is what I get when removing my Window.into():
java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
 my case is:receiving streaming eventseach event is a list of elementsI need to process elements of each event independently of elements of other events - hence my approach of putting each event in its own window, then expanding it (since event = list) using flat map and then running group-by type of transformation on that window (which would mean it is still isolated from other event elements).
thx for ideas,a.

    On Sunday, 2 April 2017, 23:43, Ben Chambers <bc...@apache.org> wrote:
 

 Can you elaborate on your use case? If your goal is to just group things, you can assign a key to each element and then apply a group by key. You shouldn't need to use windowing for that.

On Sun, Apr 2, 2017, 2:34 PM Csaba Kassai <cs...@doctusoft.com> wrote:

Hi Antony, 
there is a small custom windowing example in this github repo which can be useful for you: https://github.com/Doctusoft/ds-dataflow-examplesThe code is not documented yet, so let me know if you have any question about it. 
Regards, Csabi


On Fri, 31 Mar 2017 at 18:04 Robert Bradshaw <ro...@google.com> wrote:

Yes, you can extend BoundedWindow to be your own Window type that has
additional members and different equality semantics (rather than
re-using IntervalWindow). The only requirement is that it have an
endpoint. (You'll also have to write a Coder for your new Window
subclass and return that in your WindowFn.

https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html

On Thu, Mar 30, 2017 at 11:19 PM, Antony Mayi <an...@yahoo.com> wrote:
> Hi,
>
> is there a way to implement windowing so that each input event gets into its
> own exclusive window?
>
> I can see the PartitioningWindowFn can be extended. If I implement the
> assignWindow to return new IntervalWindow with both start and end time set
> to the even time and in case there are two distinct events arriving at the
> same time (indistinguishable within Instant granularity), would this be
> processed as two separate windows without interfering the event data during
> any transformations?
>
> My motivation is to to be able to flatmap individual input events into a
> pcollection of multiple elements that - being a single exclusive window -
> can be grouped/... independently of other events (even if the other event
> has same time).
>
> thanks,
> Antony.




   


   

Re: exclusive window per event

Posted by Ben Chambers <bc...@google.com>.
If your goal is to just process events as they come in, you should be able
to do something like:

 Source -> DoFn -> DoFn -> Sink

You only need to GroupByKey if you want all the elements associated with a
specific key to be processed together -- which it sounds like you don't. If
you do need a GroupByKey, you could then use a trigger (every element or
something like that) to trigger the grouping after every element.

On Mon, Apr 3, 2017 at 12:14 PM Antony Mayi <an...@yahoo.com> wrote:

> I thought group-by-key in streaming mode cannot run without windowing?
> This is what I get when removing my Window.into():
>
> java.lang.IllegalStateException: GroupByKey cannot be applied to
> non-bounded PCollection in the GlobalWindow without a trigger. Use a
> Window.into or Window.triggering transform prior to GroupByKey.
>
> my case is:
> receiving streaming events
> each event is a list of elements
> I need to process elements of each event independently of elements of
> other events - hence my approach of putting each event in its own window,
> then expanding it (since event = list) using flat map and then running
> group-by type of transformation on that window (which would mean it is
> still isolated from other event elements).
>
> thx for ideas,
> a.
>
> On Sunday, 2 April 2017, 23:43, Ben Chambers <bc...@apache.org> wrote:
>
>
> Can you elaborate on your use case? If your goal is to just group things,
> you can assign a key to each element and then apply a group by key. You
> shouldn't need to use windowing for that.
>
> On Sun, Apr 2, 2017, 2:34 PM Csaba Kassai <cs...@doctusoft.com>
> wrote:
>
> Hi Antony,
>
> there is a small custom windowing example in this github repo which can be
> useful for you: https://github.com/Doctusoft/ds-dataflow-examples
> The code is not documented yet, so let me know if you have any question
> about it.
>
> Regards,
> Csabi
>
>
>
> On Fri, 31 Mar 2017 at 18:04 Robert Bradshaw <ro...@google.com> wrote:
>
> Yes, you can extend BoundedWindow to be your own Window type that has
> additional members and different equality semantics (rather than
> re-using IntervalWindow). The only requirement is that it have an
> endpoint. (You'll also have to write a Coder for your new Window
> subclass and return that in your WindowFn.
>
>
> https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html
>
> On Thu, Mar 30, 2017 at 11:19 PM, Antony Mayi <an...@yahoo.com>
> wrote:
> > Hi,
> >
> > is there a way to implement windowing so that each input event gets into
> its
> > own exclusive window?
> >
> > I can see the PartitioningWindowFn can be extended. If I implement the
> > assignWindow to return new IntervalWindow with both start and end time
> set
> > to the even time and in case there are two distinct events arriving at
> the
> > same time (indistinguishable within Instant granularity), would this be
> > processed as two separate windows without interfering the event data
> during
> > any transformations?
> >
> > My motivation is to to be able to flatmap individual input events into a
> > pcollection of multiple elements that - being a single exclusive window -
> > can be grouped/... independently of other events (even if the other event
> > has same time).
> >
> > thanks,
> > Antony.
>
>
>
>

Re: exclusive window per event

Posted by Antony Mayi <an...@yahoo.com>.
I thought group-by-key in streaming mode cannot run without windowing? This is what I get when removing my Window.into():
java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
 my case is:receiving streaming eventseach event is a list of elementsI need to process elements of each event independently of elements of other events - hence my approach of putting each event in its own window, then expanding it (since event = list) using flat map and then running group-by type of transformation on that window (which would mean it is still isolated from other event elements).
thx for ideas,a.

    On Sunday, 2 April 2017, 23:43, Ben Chambers <bc...@apache.org> wrote:
 

 Can you elaborate on your use case? If your goal is to just group things, you can assign a key to each element and then apply a group by key. You shouldn't need to use windowing for that.

On Sun, Apr 2, 2017, 2:34 PM Csaba Kassai <cs...@doctusoft.com> wrote:

Hi Antony, 
there is a small custom windowing example in this github repo which can be useful for you: https://github.com/Doctusoft/ds-dataflow-examplesThe code is not documented yet, so let me know if you have any question about it. 
Regards, Csabi


On Fri, 31 Mar 2017 at 18:04 Robert Bradshaw <ro...@google.com> wrote:

Yes, you can extend BoundedWindow to be your own Window type that has
additional members and different equality semantics (rather than
re-using IntervalWindow). The only requirement is that it have an
endpoint. (You'll also have to write a Coder for your new Window
subclass and return that in your WindowFn.

https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html

On Thu, Mar 30, 2017 at 11:19 PM, Antony Mayi <an...@yahoo.com> wrote:
> Hi,
>
> is there a way to implement windowing so that each input event gets into its
> own exclusive window?
>
> I can see the PartitioningWindowFn can be extended. If I implement the
> assignWindow to return new IntervalWindow with both start and end time set
> to the even time and in case there are two distinct events arriving at the
> same time (indistinguishable within Instant granularity), would this be
> processed as two separate windows without interfering the event data during
> any transformations?
>
> My motivation is to to be able to flatmap individual input events into a
> pcollection of multiple elements that - being a single exclusive window -
> can be grouped/... independently of other events (even if the other event
> has same time).
>
> thanks,
> Antony.




   

Re: exclusive window per event

Posted by Ben Chambers <bc...@apache.org>.
Can you elaborate on your use case? If your goal is to just group things,
you can assign a key to each element and then apply a group by key. You
shouldn't need to use windowing for that.

On Sun, Apr 2, 2017, 2:34 PM Csaba Kassai <cs...@doctusoft.com>
wrote:

> Hi Antony,
>
> there is a small custom windowing example in this github repo which can be
> useful for you: https://github.com/Doctusoft/ds-dataflow-examples
> The code is not documented yet, so let me know if you have any question
> about it.
>
> Regards,
> Csabi
>
>
>
> On Fri, 31 Mar 2017 at 18:04 Robert Bradshaw <ro...@google.com> wrote:
>
> Yes, you can extend BoundedWindow to be your own Window type that has
> additional members and different equality semantics (rather than
> re-using IntervalWindow). The only requirement is that it have an
> endpoint. (You'll also have to write a Coder for your new Window
> subclass and return that in your WindowFn.
>
>
> https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/transforms/windowing/WindowFn.html
>
> On Thu, Mar 30, 2017 at 11:19 PM, Antony Mayi <an...@yahoo.com>
> wrote:
> > Hi,
> >
> > is there a way to implement windowing so that each input event gets into
> its
> > own exclusive window?
> >
> > I can see the PartitioningWindowFn can be extended. If I implement the
> > assignWindow to return new IntervalWindow with both start and end time
> set
> > to the even time and in case there are two distinct events arriving at
> the
> > same time (indistinguishable within Instant granularity), would this be
> > processed as two separate windows without interfering the event data
> during
> > any transformations?
> >
> > My motivation is to to be able to flatmap individual input events into a
> > pcollection of multiple elements that - being a single exclusive window -
> > can be grouped/... independently of other events (even if the other event
> > has same time).
> >
> > thanks,
> > Antony.
>
>