You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pradyumna Achar <pr...@gmail.com> on 2021/01/29 03:59:57 UTC

Merging panes of a window

Hello,

I am running into a strange situation trying to use windows and FileIO
properly.

I have a KafkaIO source, followed by a DoFn that assigns timestamps based
on a field in the input record using outputWithTimestamp. After that, I
apply FixedWindows of 1 hour duration on these elements. I need to write
these windows to disk in parts, with a constraint that each part be of a
certain size (except the last part).

So, I made the FixedWindow trigger repeatedly, once per element, and
implemented a stateful DoFn that collects these elements until the size
limit is reached and outputs an Iterable.

However, now I see that these elements are in separate panes. FileIO's
behavior is that "writing happens by default per window and pane" (per
javadoc), and this is what I am observing too. I get a bunch of files,
instead of one for the Iterable.

Is there any way I can make FileIO write that Iterable to a single file?

Thank you
- Pradyumna

Re: Merging panes of a window

Posted by Pradyumna Achar <pr...@gmail.com>.
(Thanks, I tried the alternate approach but that doesn't work ..

 - The custom WindowFn hack ensures that FileIO will write all elements of
the Iterable into a single file. The elementCountAtLeast(1) trigger on the
custom WindowFn makes FileIO do this early, without waiting for the end of
the interval.

 - If I output KVs with the Key being some sort of an identifier for the
iterable, it would achieve the first goal  (FileIO will write all elements
of a given iterable into a single file), but the second goal that it should
do it early without waiting for the end of the interval wouldn't be
accomplished, because FileIO does not operate on Iterables but needs
individual records.

  (I can't feed a PCollection<KV<Long, Iterable<Type>>> to FileIO, but
instead I should flatten it out to PCollection<KV<Long, Type>> and there is
no way to trigger early firings on the window such that the pane is
guaranteed to have all elements of a given key if at least one element in
that pane has that key)
)

>

Re: Merging panes of a window

Posted by Reuven Lax <re...@google.com>.
I think it's possible to accomplish this without a custom WindowFn. Your
naming function in FileIO should be able to access the element itself, not
just the window. I would have your stateful DoFn attach a sequence number
to each element (store an extra ValueState or CombiningState, and increment
it on every element). Then attach this number to the output element. You
could do this by outputting a KV (so your output would be KV<Long,
Iterable<Type>>) or you could use Beam schemas for your output type - your
choice. Then have your naming function use this sequence number to name the
output files.

On Fri, Jan 29, 2021 at 2:32 AM Pradyumna Achar <pr...@gmail.com>
wrote:

>
> Thank you, that worked.
> One small glitch was that the trigger is not guaranteed to fire for every
> element. (AfterPane.elementCountAtLeast(1) might fire after one element, or
> more). When it fires for more than one of those iterables, I get
> double/triple/etc of the intended file sizes, which is undesirable.
> To overcome this, I created another window class, similar to the
> IntervalWindow (called it BespokeIntervalWindow, which is the same thing as
> the IntervalWindow except that there is an extra UUID field in it, in
> addition to the "start" and "end" fields, that keeps each such window
> distinct), and I have a WindowFn that gets the existing IntervalWindow from
> the AssignContext and assigns an equivalent BespokeIntervalWindow to each
> iterable in its assignWindow method. I then set
> the AfterPane.elementCountAtLeast(1) trigger on this WindowFn to make it
> not wait till the end of the interval to emit the pane.
>
>
>

Re: Merging panes of a window

Posted by Pradyumna Achar <pr...@gmail.com>.
Thank you, that worked.
One small glitch was that the trigger is not guaranteed to fire for every
element. (AfterPane.elementCountAtLeast(1) might fire after one element, or
more). When it fires for more than one of those iterables, I get
double/triple/etc of the intended file sizes, which is undesirable.
To overcome this, I created another window class, similar to the
IntervalWindow (called it BespokeIntervalWindow, which is the same thing as
the IntervalWindow except that there is an extra UUID field in it, in
addition to the "start" and "end" fields, that keeps each such window
distinct), and I have a WindowFn that gets the existing IntervalWindow from
the AssignContext and assigns an equivalent BespokeIntervalWindow to each
iterable in its assignWindow method. I then set
the AfterPane.elementCountAtLeast(1) trigger on this WindowFn to make it
not wait till the end of the interval to emit the pane.

Re: Merging panes of a window

Posted by Reuven Lax <re...@google.com>.
You should be able to simply rewindow _after_ the Stateful DoFn. In this
window you do want to trigger on every element, as now each element is an
iterable.

On Thu, Jan 28, 2021 at 10:20 PM Pradyumna Achar <pr...@gmail.com>
wrote:

> Thank you. I tried it out and it does pass elements to the stateful DoFn
> as and when they arrive instead of waiting for the window to finish, the
> way you describe.
>
> However, with this, the FileIO does not write the file until the whole
> window is done, even though the stateful DoFn outputs iterables in chunks
> of 100MB. It only receives a single ON_TIME pane and writes a big file of
> the entire window.
>
> Wondering if there is a way to tell that each iterable is a separate pane
> or something like that..
>
>>

Re: Merging panes of a window

Posted by Pradyumna Achar <pr...@gmail.com>.
Thank you. I tried it out and it does pass elements to the stateful DoFn as
and when they arrive instead of waiting for the window to finish, the way
you describe.

However, with this, the FileIO does not write the file until the whole
window is done, even though the stateful DoFn outputs iterables in chunks
of 100MB. It only receives a single ON_TIME pane and writes a big file of
the entire window.

Wondering if there is a way to tell that each iterable is a separate pane
or something like that..

>

Re: Merging panes of a window

Posted by Reuven Lax <re...@google.com>.
On Thu, Jan 28, 2021 at 8:42 PM Pradyumna Achar <pr...@gmail.com>
wrote:

> hmm, no I need the triggers on the window for two reasons:
>
>   1. Say I get ~6GB of data for each of those hourly windows, and I let
> the window fire only after the watermark naturally crosses, I would need to
> store that 6GB in memory. Whereas if I let early firings happen often, and
> let the stateful DoFn output whenever it has received 100MB worth of data,
> the memory requirement comes down significantly.
>

I think you're misunderstanding. stateful DoFns receive elements as they
arrive, regardless of triggering and windowing. The only semantic meaning
of windows for stateful DoFns is garbage collection -  there is a separate
state per window, and when a window expires, the state for that window is
garbage collected. The stateful DoFn will not wait until the end of the
window to process the elements.

The behaviour you describe is how GroupByKey (and similar aggregating
transforms, such as count and other combiners) work. However it is not true
that all that data needs to be stored in memory. Generally runners shuffle
the data using disk or an external shuffle service. The grouped data is
then read by downstream transforms, but generally does not have to fit in
memory.


>
>  2. The window size could be larger than an hour, maybe a day. Early
> firings would let 100MB-pieces of the data be written and get picked up by
> downstream systems at a reduced latency instead of waiting for everything
> to arrive.
>
> GroupIntoBatches doesn't work on sizes (in terms of bytes) AFAIK
>

Re: Merging panes of a window

Posted by Pradyumna Achar <pr...@gmail.com>.
hmm, no I need the triggers on the window for two reasons:

  1. Say I get ~6GB of data for each of those hourly windows, and I let the
window fire only after the watermark naturally crosses, I would need to
store that 6GB in memory. Whereas if I let early firings happen often, and
let the stateful DoFn output whenever it has received 100MB worth of data,
the memory requirement comes down significantly.

 2. The window size could be larger than an hour, maybe a day. Early
firings would let 100MB-pieces of the data be written and get picked up by
downstream systems at a reduced latency instead of waiting for everything
to arrive.

GroupIntoBatches doesn't work on sizes (in terms of bytes) AFAIK

Re: Merging panes of a window

Posted by Reuven Lax <re...@google.com>.
There is no need for a trigger here at all. The stateful DoFn will process
elements as they arrive, so you don't need to set any triggering.

Also, have you seen the GroupIntoBaches transform? It may already do what
you are trying to do.

On Thu, Jan 28, 2021 at 8:00 PM Pradyumna Achar <pr...@gmail.com>
wrote:

> Hello,
>
> I am running into a strange situation trying to use windows and FileIO
> properly.
>
> I have a KafkaIO source, followed by a DoFn that assigns timestamps based
> on a field in the input record using outputWithTimestamp. After that, I
> apply FixedWindows of 1 hour duration on these elements. I need to write
> these windows to disk in parts, with a constraint that each part be of a
> certain size (except the last part).
>
> So, I made the FixedWindow trigger repeatedly, once per element, and
> implemented a stateful DoFn that collects these elements until the size
> limit is reached and outputs an Iterable.
>
> However, now I see that these elements are in separate panes. FileIO's
> behavior is that "writing happens by default per window and pane" (per
> javadoc), and this is what I am observing too. I get a bunch of files,
> instead of one for the Iterable.
>
> Is there any way I can make FileIO write that Iterable to a single file?
>
> Thank you
> - Pradyumna
>