You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jesse Anderson <je...@smokinghand.com> on 2016/05/17 18:44:25 UTC

Removing a Window From a PCollection

Is there a way to remove windowing from a PCollection?

Let's say I had the following code:
      PCollection<String> windowedWords = parsed
          .apply(Window.<String>into(
            FixedWindows.of(Duration.standardSeconds(30))));

      PCollection<KV<String, Long>> eventCounts =
windowedWords.apply(Count.perElement());

      // Don't window anymore for the GroupByKey
      PCollection<KV<String, Iterable<Long>>> grouped =
eventCounts.apply(GroupByKey.<String, Long>create());

      PCollection<String> formattedCounts = grouped.apply(ParDo.of(new
TimedFN()));

I've added the window and performed the counts. In the GroupByKey, I no
longer want windowing to apply. I want to group by the key across windows
now. The Iterable that comes back from the GroupByKey only has the one Long
from its Window instead of all N Longs from all Windows. How do you do
remove the Window to GroupByKey for all times?

Thanks,

Jesse

Re: Removing a Window From a PCollection

Posted by Frances Perry <fj...@google.com>.
To add some more color...

If you are doing a GroupByKey on a PCollection<KV<K, V>>, you will get one
KV<K, Iterable<V>> per key, per window, and pane (aka. trigger firing).

And regardless, every time you call context.output in a ParDo, you are
adding a new element to the output PCollection. PCollections have no notion
of ordering, so once you stick multiple elements into a PCollection, any
relationship between them is lost.

So in your case, if you want to keep the relationship between all the
values in a given key/window/pane, you do need to keep them in the same
element for future processing. But also note that means that any downstream
writes will treat them as a single element. In this case, that works for
you because you are presumably using TextIO.Write, which happens to be
newline deliminated, and also shoving newlines within an element. But if
you were to try to parse it back again using TextIO.Read, each line will be
an independent element and you'd have to set timestamps and rewindow to get
the association back again. And if you swap to a different sink that
doesn't use newline delimination, you might need to do different formatting
before writing.

On Tue, May 17, 2016 at 1:47 PM, Jesse Anderson <je...@smokinghand.com>
wrote:

> Somewhat related in case anyone hits this during Googling. If you want the
> results of your GroupByKey to be grouped while writing out like:
> 54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z
> 54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z
> 54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z
> 107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z
> 107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z
> 107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z
> 190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z
> 190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z
>
> Simply doing a "context.output" for each line will not write them out one
> after another. You'll to do a single "context.output" call with multiple
> "\n" in the string.
>
> On Tue, May 17, 2016 at 11:57 AM Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> That's what I needed. Thanks Frances.
>>
>> On Tue, May 17, 2016 at 11:54 AM Frances Perry <fj...@google.com> wrote:
>>
>>> Re-window into the GlobalWindow (which covers all time).
>>>
>>> pc.apply(Window.<T>into(new GlobalWindows()))
>>>
>>> The GlobalWindow is the default. It works for bounded collections (aka.
>>> batch mode) because once all the data has been processed, the system fast
>>> forwards to the end of time. And if you are processing an unbounded
>>> PCollection with aggregations (aka. streaming mode), you have to either set
>>> a different windowing scheme or using triggers so that we aren't waiting
>>> til the end of time for results.
>>>
>>> Hope that helps!
>>>
>>> On Tue, May 17, 2016 at 11:44 AM, Jesse Anderson <je...@smokinghand.com>
>>> wrote:
>>>
>>>> Is there a way to remove windowing from a PCollection?
>>>>
>>>> Let's say I had the following code:
>>>>       PCollection<String> windowedWords = parsed
>>>>           .apply(Window.<String>into(
>>>>             FixedWindows.of(Duration.standardSeconds(30))));
>>>>
>>>>       PCollection<KV<String, Long>> eventCounts =
>>>> windowedWords.apply(Count.perElement());
>>>>
>>>>       // Don't window anymore for the GroupByKey
>>>>       PCollection<KV<String, Iterable<Long>>> grouped =
>>>> eventCounts.apply(GroupByKey.<String, Long>create());
>>>>
>>>>       PCollection<String> formattedCounts = grouped.apply(ParDo.of(new
>>>> TimedFN()));
>>>>
>>>> I've added the window and performed the counts. In the GroupByKey, I no
>>>> longer want windowing to apply. I want to group by the key across windows
>>>> now. The Iterable that comes back from the GroupByKey only has the one Long
>>>> from its Window instead of all N Longs from all Windows. How do you do
>>>> remove the Window to GroupByKey for all times?
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>>>>
>>>
>>>

Re: Removing a Window From a PCollection

Posted by Jesse Anderson <je...@smokinghand.com>.
Somewhat related in case anyone hits this during Googling. If you want the
results of your GroupByKey to be grouped while writing out like:
54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z
54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z
54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z
107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z
107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z
107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z
190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z
190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z

Simply doing a "context.output" for each line will not write them out one
after another. You'll to do a single "context.output" call with multiple
"\n" in the string.

On Tue, May 17, 2016 at 11:57 AM Jesse Anderson <je...@smokinghand.com>
wrote:

> That's what I needed. Thanks Frances.
>
> On Tue, May 17, 2016 at 11:54 AM Frances Perry <fj...@google.com> wrote:
>
>> Re-window into the GlobalWindow (which covers all time).
>>
>> pc.apply(Window.<T>into(new GlobalWindows()))
>>
>> The GlobalWindow is the default. It works for bounded collections (aka.
>> batch mode) because once all the data has been processed, the system fast
>> forwards to the end of time. And if you are processing an unbounded
>> PCollection with aggregations (aka. streaming mode), you have to either set
>> a different windowing scheme or using triggers so that we aren't waiting
>> til the end of time for results.
>>
>> Hope that helps!
>>
>> On Tue, May 17, 2016 at 11:44 AM, Jesse Anderson <je...@smokinghand.com>
>> wrote:
>>
>>> Is there a way to remove windowing from a PCollection?
>>>
>>> Let's say I had the following code:
>>>       PCollection<String> windowedWords = parsed
>>>           .apply(Window.<String>into(
>>>             FixedWindows.of(Duration.standardSeconds(30))));
>>>
>>>       PCollection<KV<String, Long>> eventCounts =
>>> windowedWords.apply(Count.perElement());
>>>
>>>       // Don't window anymore for the GroupByKey
>>>       PCollection<KV<String, Iterable<Long>>> grouped =
>>> eventCounts.apply(GroupByKey.<String, Long>create());
>>>
>>>       PCollection<String> formattedCounts = grouped.apply(ParDo.of(new
>>> TimedFN()));
>>>
>>> I've added the window and performed the counts. In the GroupByKey, I no
>>> longer want windowing to apply. I want to group by the key across windows
>>> now. The Iterable that comes back from the GroupByKey only has the one Long
>>> from its Window instead of all N Longs from all Windows. How do you do
>>> remove the Window to GroupByKey for all times?
>>>
>>> Thanks,
>>>
>>> Jesse
>>>
>>
>>

Re: Removing a Window From a PCollection

Posted by Jesse Anderson <je...@smokinghand.com>.
That's what I needed. Thanks Frances.

On Tue, May 17, 2016 at 11:54 AM Frances Perry <fj...@google.com> wrote:

> Re-window into the GlobalWindow (which covers all time).
>
> pc.apply(Window.<T>into(new GlobalWindows()))
>
> The GlobalWindow is the default. It works for bounded collections (aka.
> batch mode) because once all the data has been processed, the system fast
> forwards to the end of time. And if you are processing an unbounded
> PCollection with aggregations (aka. streaming mode), you have to either set
> a different windowing scheme or using triggers so that we aren't waiting
> til the end of time for results.
>
> Hope that helps!
>
> On Tue, May 17, 2016 at 11:44 AM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> Is there a way to remove windowing from a PCollection?
>>
>> Let's say I had the following code:
>>       PCollection<String> windowedWords = parsed
>>           .apply(Window.<String>into(
>>             FixedWindows.of(Duration.standardSeconds(30))));
>>
>>       PCollection<KV<String, Long>> eventCounts =
>> windowedWords.apply(Count.perElement());
>>
>>       // Don't window anymore for the GroupByKey
>>       PCollection<KV<String, Iterable<Long>>> grouped =
>> eventCounts.apply(GroupByKey.<String, Long>create());
>>
>>       PCollection<String> formattedCounts = grouped.apply(ParDo.of(new
>> TimedFN()));
>>
>> I've added the window and performed the counts. In the GroupByKey, I no
>> longer want windowing to apply. I want to group by the key across windows
>> now. The Iterable that comes back from the GroupByKey only has the one Long
>> from its Window instead of all N Longs from all Windows. How do you do
>> remove the Window to GroupByKey for all times?
>>
>> Thanks,
>>
>> Jesse
>>
>
>

Re: Removing a Window From a PCollection

Posted by Frances Perry <fj...@google.com>.
Re-window into the GlobalWindow (which covers all time).

pc.apply(Window.<T>into(new GlobalWindows()))

The GlobalWindow is the default. It works for bounded collections (aka.
batch mode) because once all the data has been processed, the system fast
forwards to the end of time. And if you are processing an unbounded
PCollection with aggregations (aka. streaming mode), you have to either set
a different windowing scheme or using triggers so that we aren't waiting
til the end of time for results.

Hope that helps!

On Tue, May 17, 2016 at 11:44 AM, Jesse Anderson <je...@smokinghand.com>
wrote:

> Is there a way to remove windowing from a PCollection?
>
> Let's say I had the following code:
>       PCollection<String> windowedWords = parsed
>           .apply(Window.<String>into(
>             FixedWindows.of(Duration.standardSeconds(30))));
>
>       PCollection<KV<String, Long>> eventCounts =
> windowedWords.apply(Count.perElement());
>
>       // Don't window anymore for the GroupByKey
>       PCollection<KV<String, Iterable<Long>>> grouped =
> eventCounts.apply(GroupByKey.<String, Long>create());
>
>       PCollection<String> formattedCounts = grouped.apply(ParDo.of(new
> TimedFN()));
>
> I've added the window and performed the counts. In the GroupByKey, I no
> longer want windowing to apply. I want to group by the key across windows
> now. The Iterable that comes back from the GroupByKey only has the one Long
> from its Window instead of all N Longs from all Windows. How do you do
> remove the Window to GroupByKey for all times?
>
> Thanks,
>
> Jesse
>