You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Neha Sharma <ne...@gmail.com> on 2020/04/16 05:28:36 UTC

Global window with Bounded Source

Hello,

I have a use case where I have a bounded source and I am reading Avro files
from Google Cloud Storage. I am also using group by transform.The amount of
data is huge and I need to process the data in sequential order.

But as Bounded source reads everything it seemed to be a good idea fixed
window on top of the global window. But it does not seem to be working as
expected.

Can you please tell me how to handle such scenarios where a bounded source
with large dataset can be broken down into smaller chunks for processing,
using windows such that the window for a key should always be processed in
order.


Regards,
Neha

Re: Global window with Bounded Source

Posted by Neha Sharma <ne...@gmail.com>.
Hi Luke,

I really appreciate the help.
Thank you very much.

I will try the mentioned options and see which one fits to the requirement.

Regards,
Neha

On Thu, Apr 16, 2020, 11:43 PM Luke Cwik <lc...@google.com> wrote:

> If you have a timestamp in the record that if sorted would give you the
> correct ordering then you can use a pipeline like:
> ReadFromFilesWithTimestamps -> KV<Key, Data> ->
> ParDo(@RequiresTimeSortedInput StatefulDoFn) -> ...
>
> The important parts here are that:
> * Your runner must support the @RequiresTimeSortedInput[1] annotation
> (very few runners support this to my knowledge since this has become
> available in Apache Beam 2.20).
> * ReadFromFilesWithTimestamps needs to assign the timestamp to each record
> associated with its position based upon the order in which you want the
> data arrive to the StatefulDoFn
> * For each key, the StatefulDoFn would create a "flush" timer that occurs
> at the end of the global window. For each element in @ProcessElement, it
> would update state with the newly combined version, in the flush
> timers @OnTimer method, you would output the value from state and clear
> state.
> * A shuffle/groupbykey will be inserted for you automatically before the
> StatefulDoFn
>
> Another option is to use the SortValues[2] transform. It has several
> caveats but worthwhile over implementing your own sorting algorithm so
> reading that page is useful.
>
> A third option is if you have many files (and all the keys could be stored
> in memory (possibly with a disk based backup), is to instead parallelize
> processing of the files but each one file is wholly handled by a
> single @ProcessElement call. So you would be responsible for reading in the
> input, storing all the keys in "memory" and updating them as you see them
> and finally when the file is done produce all the output you have buffered.
> You could do this with the FileIO[3] transform since it produces a
> PCollection of ReadableFile so you could process each individual file
> separately.
>
> 1:
> https://github.com/apache/beam/blob/c3bd4854e879da65060de8cd259865a9b34742c7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L815
> 2: https://beam.apache.org/documentation/sdks/java-extensions/#sorter
> 3:
> https://beam.apache.org/releases/javadoc/2.19.0/index.html?org/apache/beam/sdk/io/FileIO.html
>
> On Thu, Apr 16, 2020 at 1:21 PM Vincent Marquez <vi...@gmail.com>
> wrote:
>
>> I actually ran into the same issue, and would love some guidance!
>>
>>  I had a list of avro files within folders in GCS, each folder
>>  representing a single day, and I needed to de-dupe events per day (by a
>> key).  I didn't want a GroupByKey to hold billions of events when it didn't
>> matter, so I added a timestamp to each folder, then tried windowing.   I
>> thought perhaps the windowed event passed into the ReadAll would mean
>> windows could proceed down the pipeline without having to load the entire
>> batch of files into memory.
>>
>> I was wrong, and saw the same behavior of Neha.  Is there a better way of
>> doing this?  I don't see a *technical* reason why the dataflow runner
>> couldn't be implemented to have this behavior, but I could be mistaken.
>>
>
> A lot of applications which only work via truly sequential processing
> don't have much value added by using Apache Beam and you're likely better
> off with using another framework or a simple for loop. You really want to
> find a parallel processing paradigm within your data to get the benefits
> that data parallel frameworks provide.
>
>
>>
>>
>> *~Vincent*
>>
>>
>> On Thu, Apr 16, 2020 at 10:13 AM Neha Sharma <ne...@gmail.com>
>> wrote:
>>
>>> Hi Luke,
>>>
>>> It is the order the record appears in the source file.
>>>
>>> Basically each record corresponding to a key depends on the previous
>>> occurrence of the same key and hence parallel processing does not seem to
>>> be a good idea.
>>>
>>> Is there a possibility where bounded source + fixed window based on the
>>> timestamp in the record can be used to somehow batch the whole data into
>>> smaller chunks for processing and at the same time can maintain the
>>> ordering provided a sorting based on the timestamp?
>>>
>>> Something like this:
>>>
>>> Read from Bounded Source ->
>>> Fixed window to make smaller batches ->
>>> Sorting based on timestamp ->
>>> Processing
>>>
>>>
>>> Regards,
>>> Neha
>>>
>>>
>>> On Thu, Apr 16, 2020, 6:57 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> What do you mean by in sequential order, order across files, keys, ...?
>>>> Is this an ordering that is based on data such as a timestamp of the
>>>> record or the order in which the records appear in the source files?
>>>> Do you have a lot of keys or very few?
>>>>
>>>> If you want to process all the data across all the files in sequential
>>>> order with no parallelism then Apache Beam may not provide much value since
>>>> its basis is all about parallel data processing.
>>>>
>>>> On Wed, Apr 15, 2020 at 10:30 PM Neha Sharma <ne...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a use case where I have a bounded source and I am reading Avro
>>>>> files from Google Cloud Storage. I am also using group by transform.The
>>>>> amount of data is huge and I need to process the data in sequential order.
>>>>>
>>>>> But as Bounded source reads everything it seemed to be a good idea
>>>>> fixed window on top of the global window. But it does not seem to be
>>>>> working as expected.
>>>>>
>>>>> Can you please tell me how to handle such scenarios where a bounded
>>>>> source with large dataset can be broken down into smaller chunks for
>>>>> processing, using windows such that the window for a key should always be
>>>>> processed in order.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Neha
>>>>>
>>>>

Re: Global window with Bounded Source

Posted by Luke Cwik <lc...@google.com>.
If you have a timestamp in the record that if sorted would give you the
correct ordering then you can use a pipeline like:
ReadFromFilesWithTimestamps -> KV<Key, Data> ->
ParDo(@RequiresTimeSortedInput StatefulDoFn) -> ...

The important parts here are that:
* Your runner must support the @RequiresTimeSortedInput[1] annotation (very
few runners support this to my knowledge since this has become available in
Apache Beam 2.20).
* ReadFromFilesWithTimestamps needs to assign the timestamp to each record
associated with its position based upon the order in which you want the
data arrive to the StatefulDoFn
* For each key, the StatefulDoFn would create a "flush" timer that occurs
at the end of the global window. For each element in @ProcessElement, it
would update state with the newly combined version, in the flush
timers @OnTimer method, you would output the value from state and clear
state.
* A shuffle/groupbykey will be inserted for you automatically before the
StatefulDoFn

Another option is to use the SortValues[2] transform. It has several
caveats but worthwhile over implementing your own sorting algorithm so
reading that page is useful.

A third option is if you have many files (and all the keys could be stored
in memory (possibly with a disk based backup), is to instead parallelize
processing of the files but each one file is wholly handled by a
single @ProcessElement call. So you would be responsible for reading in the
input, storing all the keys in "memory" and updating them as you see them
and finally when the file is done produce all the output you have buffered.
You could do this with the FileIO[3] transform since it produces a
PCollection of ReadableFile so you could process each individual file
separately.

1:
https://github.com/apache/beam/blob/c3bd4854e879da65060de8cd259865a9b34742c7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L815
2: https://beam.apache.org/documentation/sdks/java-extensions/#sorter
3:
https://beam.apache.org/releases/javadoc/2.19.0/index.html?org/apache/beam/sdk/io/FileIO.html

On Thu, Apr 16, 2020 at 1:21 PM Vincent Marquez <vi...@gmail.com>
wrote:

> I actually ran into the same issue, and would love some guidance!
>
>  I had a list of avro files within folders in GCS, each folder
>  representing a single day, and I needed to de-dupe events per day (by a
> key).  I didn't want a GroupByKey to hold billions of events when it didn't
> matter, so I added a timestamp to each folder, then tried windowing.   I
> thought perhaps the windowed event passed into the ReadAll would mean
> windows could proceed down the pipeline without having to load the entire
> batch of files into memory.
>
> I was wrong, and saw the same behavior of Neha.  Is there a better way of
> doing this?  I don't see a *technical* reason why the dataflow runner
> couldn't be implemented to have this behavior, but I could be mistaken.
>

A lot of applications which only work via truly sequential processing don't
have much value added by using Apache Beam and you're likely better off
with using another framework or a simple for loop. You really want to find
a parallel processing paradigm within your data to get the benefits that
data parallel frameworks provide.


>
>
> *~Vincent*
>
>
> On Thu, Apr 16, 2020 at 10:13 AM Neha Sharma <ne...@gmail.com>
> wrote:
>
>> Hi Luke,
>>
>> It is the order the record appears in the source file.
>>
>> Basically each record corresponding to a key depends on the previous
>> occurrence of the same key and hence parallel processing does not seem to
>> be a good idea.
>>
>> Is there a possibility where bounded source + fixed window based on the
>> timestamp in the record can be used to somehow batch the whole data into
>> smaller chunks for processing and at the same time can maintain the
>> ordering provided a sorting based on the timestamp?
>>
>> Something like this:
>>
>> Read from Bounded Source ->
>> Fixed window to make smaller batches ->
>> Sorting based on timestamp ->
>> Processing
>>
>>
>> Regards,
>> Neha
>>
>>
>> On Thu, Apr 16, 2020, 6:57 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> What do you mean by in sequential order, order across files, keys, ...?
>>> Is this an ordering that is based on data such as a timestamp of the
>>> record or the order in which the records appear in the source files?
>>> Do you have a lot of keys or very few?
>>>
>>> If you want to process all the data across all the files in sequential
>>> order with no parallelism then Apache Beam may not provide much value since
>>> its basis is all about parallel data processing.
>>>
>>> On Wed, Apr 15, 2020 at 10:30 PM Neha Sharma <ne...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a use case where I have a bounded source and I am reading Avro
>>>> files from Google Cloud Storage. I am also using group by transform.The
>>>> amount of data is huge and I need to process the data in sequential order.
>>>>
>>>> But as Bounded source reads everything it seemed to be a good idea
>>>> fixed window on top of the global window. But it does not seem to be
>>>> working as expected.
>>>>
>>>> Can you please tell me how to handle such scenarios where a bounded
>>>> source with large dataset can be broken down into smaller chunks for
>>>> processing, using windows such that the window for a key should always be
>>>> processed in order.
>>>>
>>>>
>>>> Regards,
>>>> Neha
>>>>
>>>

Re: Global window with Bounded Source

Posted by Vincent Marquez <vi...@gmail.com>.
I actually ran into the same issue, and would love some guidance!

 I had a list of avro files within folders in GCS, each folder
 representing a single day, and I needed to de-dupe events per day (by a
key).  I didn't want a GroupByKey to hold billions of events when it didn't
matter, so I added a timestamp to each folder, then tried windowing.   I
thought perhaps the windowed event passed into the ReadAll would mean
windows could proceed down the pipeline without having to load the entire
batch of files into memory.

I was wrong, and saw the same behavior of Neha.  Is there a better way of
doing this?  I don't see a *technical* reason why the dataflow runner
couldn't be implemented to have this behavior, but I could be mistaken.


*~Vincent*


On Thu, Apr 16, 2020 at 10:13 AM Neha Sharma <ne...@gmail.com>
wrote:

> Hi Luke,
>
> It is the order the record appears in the source file.
>
> Basically each record corresponding to a key depends on the previous
> occurrence of the same key and hence parallel processing does not seem to
> be a good idea.
>
> Is there a possibility where bounded source + fixed window based on the
> timestamp in the record can be used to somehow batch the whole data into
> smaller chunks for processing and at the same time can maintain the
> ordering provided a sorting based on the timestamp?
>
> Something like this:
>
> Read from Bounded Source ->
> Fixed window to make smaller batches ->
> Sorting based on timestamp ->
> Processing
>
>
> Regards,
> Neha
>
>
> On Thu, Apr 16, 2020, 6:57 PM Luke Cwik <lc...@google.com> wrote:
>
>> What do you mean by in sequential order, order across files, keys, ...?
>> Is this an ordering that is based on data such as a timestamp of the
>> record or the order in which the records appear in the source files?
>> Do you have a lot of keys or very few?
>>
>> If you want to process all the data across all the files in sequential
>> order with no parallelism then Apache Beam may not provide much value since
>> its basis is all about parallel data processing.
>>
>> On Wed, Apr 15, 2020 at 10:30 PM Neha Sharma <ne...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have a use case where I have a bounded source and I am reading Avro
>>> files from Google Cloud Storage. I am also using group by transform.The
>>> amount of data is huge and I need to process the data in sequential order.
>>>
>>> But as Bounded source reads everything it seemed to be a good idea fixed
>>> window on top of the global window. But it does not seem to be working as
>>> expected.
>>>
>>> Can you please tell me how to handle such scenarios where a bounded
>>> source with large dataset can be broken down into smaller chunks for
>>> processing, using windows such that the window for a key should always be
>>> processed in order.
>>>
>>>
>>> Regards,
>>> Neha
>>>
>>

Re: Global window with Bounded Source

Posted by Neha Sharma <ne...@gmail.com>.
Hi Luke,

It is the order the record appears in the source file.

Basically each record corresponding to a key depends on the previous
occurrence of the same key and hence parallel processing does not seem to
be a good idea.

Is there a possibility where bounded source + fixed window based on the
timestamp in the record can be used to somehow batch the whole data into
smaller chunks for processing and at the same time can maintain the
ordering provided a sorting based on the timestamp?

Something like this:

Read from Bounded Source ->
Fixed window to make smaller batches ->
Sorting based on timestamp ->
Processing


Regards,
Neha


On Thu, Apr 16, 2020, 6:57 PM Luke Cwik <lc...@google.com> wrote:

> What do you mean by in sequential order, order across files, keys, ...?
> Is this an ordering that is based on data such as a timestamp of the
> record or the order in which the records appear in the source files?
> Do you have a lot of keys or very few?
>
> If you want to process all the data across all the files in sequential
> order with no parallelism then Apache Beam may not provide much value since
> its basis is all about parallel data processing.
>
> On Wed, Apr 15, 2020 at 10:30 PM Neha Sharma <ne...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have a use case where I have a bounded source and I am reading Avro
>> files from Google Cloud Storage. I am also using group by transform.The
>> amount of data is huge and I need to process the data in sequential order.
>>
>> But as Bounded source reads everything it seemed to be a good idea fixed
>> window on top of the global window. But it does not seem to be working as
>> expected.
>>
>> Can you please tell me how to handle such scenarios where a bounded
>> source with large dataset can be broken down into smaller chunks for
>> processing, using windows such that the window for a key should always be
>> processed in order.
>>
>>
>> Regards,
>> Neha
>>
>

Re: Global window with Bounded Source

Posted by Luke Cwik <lc...@google.com>.
What do you mean by in sequential order, order across files, keys, ...?
Is this an ordering that is based on data such as a timestamp of the record
or the order in which the records appear in the source files?
Do you have a lot of keys or very few?

If you want to process all the data across all the files in sequential
order with no parallelism then Apache Beam may not provide much value since
its basis is all about parallel data processing.

On Wed, Apr 15, 2020 at 10:30 PM Neha Sharma <ne...@gmail.com>
wrote:

> Hello,
>
> I have a use case where I have a bounded source and I am reading Avro
> files from Google Cloud Storage. I am also using group by transform.The
> amount of data is huge and I need to process the data in sequential order.
>
> But as Bounded source reads everything it seemed to be a good idea fixed
> window on top of the global window. But it does not seem to be working as
> expected.
>
> Can you please tell me how to handle such scenarios where a bounded source
> with large dataset can be broken down into smaller chunks for processing,
> using windows such that the window for a key should always be processed in
> order.
>
>
> Regards,
> Neha
>