You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Frantisek Csajka <cs...@gmail.com> on 2019/11/22 10:35:04 UTC

Fanout and OOMs on Dataflow while streaming to BQ

Hi Beamers,

We are facing OutOfMemory errors with a streaming pipeline on Dataflow. We
are unable to get rid of them, not even with bigger worker instances. Any
advice will be appreciated.

The scenario is the following.
- Files are being written to a bucket in GCS. A notification is set up on
the bucket, so for each file there is a Pub/Sub message
- Notifications are consumed by our pipeline
- Files from notifications are read by the pipeline
- Each file contains several events (there is a quite big fanout)
- Events are written to BigQuery (streaming)

Everything works fine if there are only few notifications, but if the files
are incoming at high rate or if there is a large backlog in the
notification subscription, events get stuck in BQ write and later OOM is
thrown.

Having a larger worker does not work because if the backlog is large,
larger instance(s) will throw OOM as well, just later...

As far as we can see, there is a checkpointing/reshuffle in BQ write and
thats where the elements got stuck. It looks like the pubsub is consuming
too many elements and due to fanout it causes OOM when grouping in
reshuffle.
Is there any way to backpressure the pubsub read? Is it possible to have
smaller window in Reshuffle? How does the Reshuffle actually work?
Any advice?

Thanks in advance,
Frantisek

Re: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Frantisek Csajka <cs...@gmail.com>.
Thanks Luke! No more questions so far :)

Regards,
Frantisek

On Tue, Nov 26, 2019 at 12:31 AM Luke Cwik <lc...@google.com> wrote:

>
>
> On Mon, Nov 25, 2019 at 1:59 AM Reza Rokni <re...@google.com> wrote:
>
>> Reshuffle does a GBK if I recall correctly, so I guess that was not a
>> solution for your use-case :-(
>>
>> On Mon, 25 Nov 2019 at 17:33, Frantisek Csajka <cs...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Sorry for the late reply, I was not near a computer during the weekend.
>>> Many thanks for all the advices and suggestions!
>>>
>>> erik.lincoln@ntconcepts.com, jklukas@mozilla.com:
>>> Thanks for detailed answers! The separation of pipelines using P/S works
>>> fine, although it would be nice if it could be done in one pipeline (less
>>> resources needed). Streaming engine worked well for as in a similar
>>> scenario but as the amount of data can be large, the streaming engine can
>>> be expensive for us :/
>>>
>>> kenn@apache.org
>>> Never worked with SDF, but it sound interesting, thanks! Is there any
>>> good docs with examples to start with?
>>>
>>
> The Beam Python SDK is the furthest along in this matter and supports
> bounded SplittableDoFns and unbounded is being worked on right now (maybe a
> month or so away). The Beam Java and Go SDKs have some code related to
> supporting SDFs but they are not mature and will change in the next few
> months quite a bit to follow what is being done in Beam Python SDK at which
> point they will become available for general use.
>
> This https://s.apache.org/beam-breaking-fusion best describes the
> motivation and the problem space.
>
>
>
>> rez@google.com:
>>> Yes we were considering breaking the (possible) fusion but have not
>>> tried exactly what you are suggesting, we were experimenting with Reshuffle
>>> transform. Will try your suggestion with explicit GBK. Thanks!
>>> Also, Reshuffle is deprecated, so, is GBK the preferred way to break the
>>> fusion or will there be some fusion breaking transform in the future?
>>>
>>> Thank you all again, it helped a lot,
>>> Frantisek
>>>
>>> On Sun, Nov 24, 2019 at 2:58 AM Reza Rokni <re...@google.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> One thing that you could explore, as the fanout is large:
>>>>
>>>> The Dataflow runner will optimise various transforms using Fusion where
>>>> possible (link
>>>> <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>).
>>>> There are cases , like transforms with high fanout, where this may not be
>>>> desirable. You can manually break fusion by putting in a shuffle step (
>>>> also on that doc link). This would come at the cost of an extra shuffle of
>>>> course, so you will need to experiment to see if this meets your needs.
>>>>
>>>> In your fanout transform , generate a KV where the Key is some Hash
>>>> that partitions the elements,  use something like O(10k) for the key space.
>>>> Then do a GBK and then ungroup.
>>>>
>>>> Cheers
>>>> Reza
>>>>
>>>>
>>>>
>>>> On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <ke...@apache.org> wrote:
>>>>
>>>>> +Lukasz Cwik <lc...@google.com> +Chamikara Jayalath
>>>>> <ch...@google.com>
>>>>>
>>>>> It sounds like your high-fanout transform that listens for new files
>>>>> on Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
>>>>> fairly common use case that could be a useful general contribution to Beam.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <jk...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> We also had throughput issues in writing to BQ in a streaming
>>>>>> pipeline and we mitigated by provisioning a large quantity of SSD storage
>>>>>> to improve I/O throughput to disk for checkpoints.
>>>>>>
>>>>>> I also Erik's suggestion to look into Streaming Engine. We are
>>>>>> currently looking into migrating our streaming use cases to use streaming
>>>>>> engine after we had success with improved BQ write throughput on batch
>>>>>> workloads by using Shuffle service (the batch mode analogue to the
>>>>>> Streaming Engine).
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <
>>>>>> erik.lincoln@ntconcepts.com> wrote:
>>>>>>
>>>>>>> Hi Frantisek,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Some advice from making a similar pipeline and struggling with
>>>>>>> throughput and latency:
>>>>>>>
>>>>>>>    1. Break up your pipeline into multiple pipelines. Dataflow only
>>>>>>>    auto-scales based on input throughput. If you’re microbatching events in
>>>>>>>    files, the job will only autoscale to meet the volume of files, not the
>>>>>>>    volume of events added to the pipeline from the files.
>>>>>>>       1. Better flow is:
>>>>>>>
>>>>>>>                                                                i.
>>>>>>> Pipeline 1: Receive GCS notifications, read files, and then output
>>>>>>> file contents as Pubsub messages either per event or in microbatches from
>>>>>>> the file
>>>>>>>
>>>>>>>                                                              ii.
>>>>>>> Pipeline 2: Receive events from Pubsub, do your transforms, then
>>>>>>> write to BQ
>>>>>>>
>>>>>>>    1. Use the Streaming Engine service (if you can run in a region
>>>>>>>    supporting it):
>>>>>>>    https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>>>>    2. BQ streaming can be slower than a load job if you have a very
>>>>>>>    high volume (millions of events a minute). If your event volume is high,
>>>>>>>    you might need to consider further microbatching loads into BQ from GCS.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>>
>>>>>>> Erik
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Frantisek Csajka <cs...@gmail.com>
>>>>>>> *Sent:* Friday, November 22, 2019 5:35 AM
>>>>>>> *To:* user@beam.apache.org
>>>>>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi Beamers,
>>>>>>>
>>>>>>> We are facing OutOfMemory errors with a streaming pipeline on
>>>>>>> Dataflow. We are unable to get rid of them, not even with bigger worker
>>>>>>> instances. Any advice will be appreciated.
>>>>>>>
>>>>>>> The scenario is the following.
>>>>>>> - Files are being written to a bucket in GCS. A notification is set
>>>>>>> up on the bucket, so for each file there is a Pub/Sub message
>>>>>>> - Notifications are consumed by our pipeline
>>>>>>> - Files from notifications are read by the pipeline
>>>>>>> - Each file contains several events (there is a quite big fanout)
>>>>>>> - Events are written to BigQuery (streaming)
>>>>>>>
>>>>>>> Everything works fine if there are only few notifications, but if
>>>>>>> the files are incoming at high rate or if there is a large backlog in the
>>>>>>> notification subscription, events get stuck in BQ write and later OOM is
>>>>>>> thrown.
>>>>>>>
>>>>>>> Having a larger worker does not work because if the backlog is
>>>>>>> large, larger instance(s) will throw OOM as well, just later...
>>>>>>>
>>>>>>> As far as we can see, there is a checkpointing/reshuffle in BQ write
>>>>>>> and thats where the elements got stuck. It looks like the pubsub is
>>>>>>> consuming too many elements and due to fanout it causes OOM when grouping
>>>>>>> in reshuffle.
>>>>>>> Is there any way to backpressure the pubsub read? Is it possible to
>>>>>>> have smaller window in Reshuffle? How does the Reshuffle actually work?
>>>>>>> Any advice?
>>>>>>>
>>>>>>> Thanks in advance,
>>>>>>> Frantisek
>>>>>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with
>>>>>>> it are intended solely for the use of the individual or entity to whom they
>>>>>>> are addressed and may contain confidential and privileged information
>>>>>>> protected by law. If you received this e-mail in error, any review, use,
>>>>>>> dissemination, distribution, or copying of the e-mail is strictly
>>>>>>> prohibited. Please notify the sender immediately by return e-mail and
>>>>>>> delete all copies from your system.
>>>>>>>
>>>>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

Re: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Luke Cwik <lc...@google.com>.
On Mon, Nov 25, 2019 at 1:59 AM Reza Rokni <re...@google.com> wrote:

> Reshuffle does a GBK if I recall correctly, so I guess that was not a
> solution for your use-case :-(
>
> On Mon, 25 Nov 2019 at 17:33, Frantisek Csajka <cs...@gmail.com> wrote:
>
>> Hi,
>>
>> Sorry for the late reply, I was not near a computer during the weekend.
>> Many thanks for all the advices and suggestions!
>>
>> erik.lincoln@ntconcepts.com, jklukas@mozilla.com:
>> Thanks for detailed answers! The separation of pipelines using P/S works
>> fine, although it would be nice if it could be done in one pipeline (less
>> resources needed). Streaming engine worked well for as in a similar
>> scenario but as the amount of data can be large, the streaming engine can
>> be expensive for us :/
>>
>> kenn@apache.org
>> Never worked with SDF, but it sound interesting, thanks! Is there any
>> good docs with examples to start with?
>>
>
The Beam Python SDK is the furthest along in this matter and supports
bounded SplittableDoFns and unbounded is being worked on right now (maybe a
month or so away). The Beam Java and Go SDKs have some code related to
supporting SDFs but they are not mature and will change in the next few
months quite a bit to follow what is being done in Beam Python SDK at which
point they will become available for general use.

This https://s.apache.org/beam-breaking-fusion best describes the
motivation and the problem space.



> rez@google.com:
>> Yes we were considering breaking the (possible) fusion but have not tried
>> exactly what you are suggesting, we were experimenting with Reshuffle
>> transform. Will try your suggestion with explicit GBK. Thanks!
>> Also, Reshuffle is deprecated, so, is GBK the preferred way to break the
>> fusion or will there be some fusion breaking transform in the future?
>>
>> Thank you all again, it helped a lot,
>> Frantisek
>>
>> On Sun, Nov 24, 2019 at 2:58 AM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi,
>>>
>>> One thing that you could explore, as the fanout is large:
>>>
>>> The Dataflow runner will optimise various transforms using Fusion where
>>> possible (link
>>> <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>).
>>> There are cases , like transforms with high fanout, where this may not be
>>> desirable. You can manually break fusion by putting in a shuffle step (
>>> also on that doc link). This would come at the cost of an extra shuffle of
>>> course, so you will need to experiment to see if this meets your needs.
>>>
>>> In your fanout transform , generate a KV where the Key is some Hash that
>>> partitions the elements,  use something like O(10k) for the key space. Then
>>> do a GBK and then ungroup.
>>>
>>> Cheers
>>> Reza
>>>
>>>
>>>
>>> On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <ke...@apache.org> wrote:
>>>
>>>> +Lukasz Cwik <lc...@google.com> +Chamikara Jayalath
>>>> <ch...@google.com>
>>>>
>>>> It sounds like your high-fanout transform that listens for new files on
>>>> Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
>>>> fairly common use case that could be a useful general contribution to Beam.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <jk...@mozilla.com>
>>>> wrote:
>>>>
>>>>> We also had throughput issues in writing to BQ in a streaming pipeline
>>>>> and we mitigated by provisioning a large quantity of SSD storage to improve
>>>>> I/O throughput to disk for checkpoints.
>>>>>
>>>>> I also Erik's suggestion to look into Streaming Engine. We are
>>>>> currently looking into migrating our streaming use cases to use streaming
>>>>> engine after we had success with improved BQ write throughput on batch
>>>>> workloads by using Shuffle service (the batch mode analogue to the
>>>>> Streaming Engine).
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <
>>>>> erik.lincoln@ntconcepts.com> wrote:
>>>>>
>>>>>> Hi Frantisek,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Some advice from making a similar pipeline and struggling with
>>>>>> throughput and latency:
>>>>>>
>>>>>>    1. Break up your pipeline into multiple pipelines. Dataflow only
>>>>>>    auto-scales based on input throughput. If you’re microbatching events in
>>>>>>    files, the job will only autoscale to meet the volume of files, not the
>>>>>>    volume of events added to the pipeline from the files.
>>>>>>       1. Better flow is:
>>>>>>
>>>>>>                                                                i.
>>>>>> Pipeline 1: Receive GCS notifications, read files, and then output
>>>>>> file contents as Pubsub messages either per event or in microbatches from
>>>>>> the file
>>>>>>
>>>>>>                                                              ii.
>>>>>> Pipeline 2: Receive events from Pubsub, do your transforms, then
>>>>>> write to BQ
>>>>>>
>>>>>>    1. Use the Streaming Engine service (if you can run in a region
>>>>>>    supporting it):
>>>>>>    https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>>>    2. BQ streaming can be slower than a load job if you have a very
>>>>>>    high volume (millions of events a minute). If your event volume is high,
>>>>>>    you might need to consider further microbatching loads into BQ from GCS.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hope this helps,
>>>>>>
>>>>>> Erik
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Frantisek Csajka <cs...@gmail.com>
>>>>>> *Sent:* Friday, November 22, 2019 5:35 AM
>>>>>> *To:* user@beam.apache.org
>>>>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi Beamers,
>>>>>>
>>>>>> We are facing OutOfMemory errors with a streaming pipeline on
>>>>>> Dataflow. We are unable to get rid of them, not even with bigger worker
>>>>>> instances. Any advice will be appreciated.
>>>>>>
>>>>>> The scenario is the following.
>>>>>> - Files are being written to a bucket in GCS. A notification is set
>>>>>> up on the bucket, so for each file there is a Pub/Sub message
>>>>>> - Notifications are consumed by our pipeline
>>>>>> - Files from notifications are read by the pipeline
>>>>>> - Each file contains several events (there is a quite big fanout)
>>>>>> - Events are written to BigQuery (streaming)
>>>>>>
>>>>>> Everything works fine if there are only few notifications, but if the
>>>>>> files are incoming at high rate or if there is a large backlog in the
>>>>>> notification subscription, events get stuck in BQ write and later OOM is
>>>>>> thrown.
>>>>>>
>>>>>> Having a larger worker does not work because if the backlog is large,
>>>>>> larger instance(s) will throw OOM as well, just later...
>>>>>>
>>>>>> As far as we can see, there is a checkpointing/reshuffle in BQ write
>>>>>> and thats where the elements got stuck. It looks like the pubsub is
>>>>>> consuming too many elements and due to fanout it causes OOM when grouping
>>>>>> in reshuffle.
>>>>>> Is there any way to backpressure the pubsub read? Is it possible to
>>>>>> have smaller window in Reshuffle? How does the Reshuffle actually work?
>>>>>> Any advice?
>>>>>>
>>>>>> Thanks in advance,
>>>>>> Frantisek
>>>>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it
>>>>>> are intended solely for the use of the individual or entity to whom they
>>>>>> are addressed and may contain confidential and privileged information
>>>>>> protected by law. If you received this e-mail in error, any review, use,
>>>>>> dissemination, distribution, or copying of the e-mail is strictly
>>>>>> prohibited. Please notify the sender immediately by return e-mail and
>>>>>> delete all copies from your system.
>>>>>>
>>>>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Re: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Reza Rokni <re...@google.com>.
Reshuffle does a GBK if I recall correctly, so I guess that was not a
solution for your use-case :-(

On Mon, 25 Nov 2019 at 17:33, Frantisek Csajka <cs...@gmail.com> wrote:

> Hi,
>
> Sorry for the late reply, I was not near a computer during the weekend.
> Many thanks for all the advices and suggestions!
>
> erik.lincoln@ntconcepts.com, jklukas@mozilla.com:
> Thanks for detailed answers! The separation of pipelines using P/S works
> fine, although it would be nice if it could be done in one pipeline (less
> resources needed). Streaming engine worked well for as in a similar
> scenario but as the amount of data can be large, the streaming engine can
> be expensive for us :/
>
> kenn@apache.org
> Never worked with SDF, but it sound interesting, thanks! Is there any good
> docs with examples to start with?
>
> rez@google.com:
> Yes we were considering breaking the (possible) fusion but have not tried
> exactly what you are suggesting, we were experimenting with Reshuffle
> transform. Will try your suggestion with explicit GBK. Thanks!
> Also, Reshuffle is deprecated, so, is GBK the preferred way to break the
> fusion or will there be some fusion breaking transform in the future?
>
> Thank you all again, it helped a lot,
> Frantisek
>
> On Sun, Nov 24, 2019 at 2:58 AM Reza Rokni <re...@google.com> wrote:
>
>> Hi,
>>
>> One thing that you could explore, as the fanout is large:
>>
>> The Dataflow runner will optimise various transforms using Fusion where
>> possible (link
>> <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>).
>> There are cases , like transforms with high fanout, where this may not be
>> desirable. You can manually break fusion by putting in a shuffle step (
>> also on that doc link). This would come at the cost of an extra shuffle of
>> course, so you will need to experiment to see if this meets your needs.
>>
>> In your fanout transform , generate a KV where the Key is some Hash that
>> partitions the elements,  use something like O(10k) for the key space. Then
>> do a GBK and then ungroup.
>>
>> Cheers
>> Reza
>>
>>
>>
>> On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <ke...@apache.org> wrote:
>>
>>> +Lukasz Cwik <lc...@google.com> +Chamikara Jayalath
>>> <ch...@google.com>
>>>
>>> It sounds like your high-fanout transform that listens for new files on
>>> Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
>>> fairly common use case that could be a useful general contribution to Beam.
>>>
>>> Kenn
>>>
>>> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> We also had throughput issues in writing to BQ in a streaming pipeline
>>>> and we mitigated by provisioning a large quantity of SSD storage to improve
>>>> I/O throughput to disk for checkpoints.
>>>>
>>>> I also Erik's suggestion to look into Streaming Engine. We are
>>>> currently looking into migrating our streaming use cases to use streaming
>>>> engine after we had success with improved BQ write throughput on batch
>>>> workloads by using Shuffle service (the batch mode analogue to the
>>>> Streaming Engine).
>>>>
>>>>
>>>>
>>>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <
>>>> erik.lincoln@ntconcepts.com> wrote:
>>>>
>>>>> Hi Frantisek,
>>>>>
>>>>>
>>>>>
>>>>> Some advice from making a similar pipeline and struggling with
>>>>> throughput and latency:
>>>>>
>>>>>    1. Break up your pipeline into multiple pipelines. Dataflow only
>>>>>    auto-scales based on input throughput. If you’re microbatching events in
>>>>>    files, the job will only autoscale to meet the volume of files, not the
>>>>>    volume of events added to the pipeline from the files.
>>>>>       1. Better flow is:
>>>>>
>>>>>                                                                i.
>>>>> Pipeline 1: Receive GCS notifications, read files, and then output
>>>>> file contents as Pubsub messages either per event or in microbatches from
>>>>> the file
>>>>>
>>>>>                                                              ii.      Pipeline
>>>>> 2: Receive events from Pubsub, do your transforms, then write to BQ
>>>>>
>>>>>    1. Use the Streaming Engine service (if you can run in a region
>>>>>    supporting it):
>>>>>    https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>>    2. BQ streaming can be slower than a load job if you have a very
>>>>>    high volume (millions of events a minute). If your event volume is high,
>>>>>    you might need to consider further microbatching loads into BQ from GCS.
>>>>>
>>>>>
>>>>>
>>>>> Hope this helps,
>>>>>
>>>>> Erik
>>>>>
>>>>>
>>>>>
>>>>> *From:* Frantisek Csajka <cs...@gmail.com>
>>>>> *Sent:* Friday, November 22, 2019 5:35 AM
>>>>> *To:* user@beam.apache.org
>>>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>>>>
>>>>>
>>>>>
>>>>> Hi Beamers,
>>>>>
>>>>> We are facing OutOfMemory errors with a streaming pipeline on
>>>>> Dataflow. We are unable to get rid of them, not even with bigger worker
>>>>> instances. Any advice will be appreciated.
>>>>>
>>>>> The scenario is the following.
>>>>> - Files are being written to a bucket in GCS. A notification is set up
>>>>> on the bucket, so for each file there is a Pub/Sub message
>>>>> - Notifications are consumed by our pipeline
>>>>> - Files from notifications are read by the pipeline
>>>>> - Each file contains several events (there is a quite big fanout)
>>>>> - Events are written to BigQuery (streaming)
>>>>>
>>>>> Everything works fine if there are only few notifications, but if the
>>>>> files are incoming at high rate or if there is a large backlog in the
>>>>> notification subscription, events get stuck in BQ write and later OOM is
>>>>> thrown.
>>>>>
>>>>> Having a larger worker does not work because if the backlog is large,
>>>>> larger instance(s) will throw OOM as well, just later...
>>>>>
>>>>> As far as we can see, there is a checkpointing/reshuffle in BQ write
>>>>> and thats where the elements got stuck. It looks like the pubsub is
>>>>> consuming too many elements and due to fanout it causes OOM when grouping
>>>>> in reshuffle.
>>>>> Is there any way to backpressure the pubsub read? Is it possible to
>>>>> have smaller window in Reshuffle? How does the Reshuffle actually work?
>>>>> Any advice?
>>>>>
>>>>> Thanks in advance,
>>>>> Frantisek
>>>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it
>>>>> are intended solely for the use of the individual or entity to whom they
>>>>> are addressed and may contain confidential and privileged information
>>>>> protected by law. If you received this e-mail in error, any review, use,
>>>>> dissemination, distribution, or copying of the e-mail is strictly
>>>>> prohibited. Please notify the sender immediately by return e-mail and
>>>>> delete all copies from your system.
>>>>>
>>>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Frantisek Csajka <cs...@gmail.com>.
Hi,

Sorry for the late reply, I was not near a computer during the weekend.
Many thanks for all the advices and suggestions!

erik.lincoln@ntconcepts.com, jklukas@mozilla.com:
Thanks for detailed answers! The separation of pipelines using P/S works
fine, although it would be nice if it could be done in one pipeline (less
resources needed). Streaming engine worked well for as in a similar
scenario but as the amount of data can be large, the streaming engine can
be expensive for us :/

kenn@apache.org
Never worked with SDF, but it sound interesting, thanks! Is there any good
docs with examples to start with?

rez@google.com:
Yes we were considering breaking the (possible) fusion but have not tried
exactly what you are suggesting, we were experimenting with Reshuffle
transform. Will try your suggestion with explicit GBK. Thanks!
Also, Reshuffle is deprecated, so, is GBK the preferred way to break the
fusion or will there be some fusion breaking transform in the future?

Thank you all again, it helped a lot,
Frantisek

On Sun, Nov 24, 2019 at 2:58 AM Reza Rokni <re...@google.com> wrote:

> Hi,
>
> One thing that you could explore, as the fanout is large:
>
> The Dataflow runner will optimise various transforms using Fusion where
> possible (link
> <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>).
> There are cases , like transforms with high fanout, where this may not be
> desirable. You can manually break fusion by putting in a shuffle step (
> also on that doc link). This would come at the cost of an extra shuffle of
> course, so you will need to experiment to see if this meets your needs.
>
> In your fanout transform , generate a KV where the Key is some Hash that
> partitions the elements,  use something like O(10k) for the key space. Then
> do a GBK and then ungroup.
>
> Cheers
> Reza
>
>
>
> On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <ke...@apache.org> wrote:
>
>> +Lukasz Cwik <lc...@google.com> +Chamikara Jayalath
>> <ch...@google.com>
>>
>> It sounds like your high-fanout transform that listens for new files on
>> Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
>> fairly common use case that could be a useful general contribution to Beam.
>>
>> Kenn
>>
>> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> We also had throughput issues in writing to BQ in a streaming pipeline
>>> and we mitigated by provisioning a large quantity of SSD storage to improve
>>> I/O throughput to disk for checkpoints.
>>>
>>> I also Erik's suggestion to look into Streaming Engine. We are currently
>>> looking into migrating our streaming use cases to use streaming engine
>>> after we had success with improved BQ write throughput on batch workloads
>>> by using Shuffle service (the batch mode analogue to the Streaming Engine).
>>>
>>>
>>>
>>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <
>>> erik.lincoln@ntconcepts.com> wrote:
>>>
>>>> Hi Frantisek,
>>>>
>>>>
>>>>
>>>> Some advice from making a similar pipeline and struggling with
>>>> throughput and latency:
>>>>
>>>>    1. Break up your pipeline into multiple pipelines. Dataflow only
>>>>    auto-scales based on input throughput. If you’re microbatching events in
>>>>    files, the job will only autoscale to meet the volume of files, not the
>>>>    volume of events added to the pipeline from the files.
>>>>       1. Better flow is:
>>>>
>>>>                                                                i.      Pipeline
>>>> 1: Receive GCS notifications, read files, and then output file contents as
>>>> Pubsub messages either per event or in microbatches from the file
>>>>
>>>>                                                              ii.      Pipeline
>>>> 2: Receive events from Pubsub, do your transforms, then write to BQ
>>>>
>>>>    1. Use the Streaming Engine service (if you can run in a region
>>>>    supporting it):
>>>>    https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>    2. BQ streaming can be slower than a load job if you have a very
>>>>    high volume (millions of events a minute). If your event volume is high,
>>>>    you might need to consider further microbatching loads into BQ from GCS.
>>>>
>>>>
>>>>
>>>> Hope this helps,
>>>>
>>>> Erik
>>>>
>>>>
>>>>
>>>> *From:* Frantisek Csajka <cs...@gmail.com>
>>>> *Sent:* Friday, November 22, 2019 5:35 AM
>>>> *To:* user@beam.apache.org
>>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>>>
>>>>
>>>>
>>>> Hi Beamers,
>>>>
>>>> We are facing OutOfMemory errors with a streaming pipeline on Dataflow.
>>>> We are unable to get rid of them, not even with bigger worker instances.
>>>> Any advice will be appreciated.
>>>>
>>>> The scenario is the following.
>>>> - Files are being written to a bucket in GCS. A notification is set up
>>>> on the bucket, so for each file there is a Pub/Sub message
>>>> - Notifications are consumed by our pipeline
>>>> - Files from notifications are read by the pipeline
>>>> - Each file contains several events (there is a quite big fanout)
>>>> - Events are written to BigQuery (streaming)
>>>>
>>>> Everything works fine if there are only few notifications, but if the
>>>> files are incoming at high rate or if there is a large backlog in the
>>>> notification subscription, events get stuck in BQ write and later OOM is
>>>> thrown.
>>>>
>>>> Having a larger worker does not work because if the backlog is large,
>>>> larger instance(s) will throw OOM as well, just later...
>>>>
>>>> As far as we can see, there is a checkpointing/reshuffle in BQ write
>>>> and thats where the elements got stuck. It looks like the pubsub is
>>>> consuming too many elements and due to fanout it causes OOM when grouping
>>>> in reshuffle.
>>>> Is there any way to backpressure the pubsub read? Is it possible to
>>>> have smaller window in Reshuffle? How does the Reshuffle actually work?
>>>> Any advice?
>>>>
>>>> Thanks in advance,
>>>> Frantisek
>>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it
>>>> are intended solely for the use of the individual or entity to whom they
>>>> are addressed and may contain confidential and privileged information
>>>> protected by law. If you received this e-mail in error, any review, use,
>>>> dissemination, distribution, or copying of the e-mail is strictly
>>>> prohibited. Please notify the sender immediately by return e-mail and
>>>> delete all copies from your system.
>>>>
>>>

Re: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Reza Rokni <re...@google.com>.
Hi,

One thing that you could explore, as the fanout is large:

The Dataflow runner will optimise various transforms using Fusion where
possible (link
<https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>).
There are cases , like transforms with high fanout, where this may not be
desirable. You can manually break fusion by putting in a shuffle step (
also on that doc link). This would come at the cost of an extra shuffle of
course, so you will need to experiment to see if this meets your needs.

In your fanout transform , generate a KV where the Key is some Hash that
partitions the elements,  use something like O(10k) for the key space. Then
do a GBK and then ungroup.

Cheers
Reza



On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <ke...@apache.org> wrote:

> +Lukasz Cwik <lc...@google.com> +Chamikara Jayalath <ch...@google.com>
>
>
> It sounds like your high-fanout transform that listens for new files on
> Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
> fairly common use case that could be a useful general contribution to Beam.
>
> Kenn
>
> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> We also had throughput issues in writing to BQ in a streaming pipeline
>> and we mitigated by provisioning a large quantity of SSD storage to improve
>> I/O throughput to disk for checkpoints.
>>
>> I also Erik's suggestion to look into Streaming Engine. We are currently
>> looking into migrating our streaming use cases to use streaming engine
>> after we had success with improved BQ write throughput on batch workloads
>> by using Shuffle service (the batch mode analogue to the Streaming Engine).
>>
>>
>>
>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <er...@ntconcepts.com>
>> wrote:
>>
>>> Hi Frantisek,
>>>
>>>
>>>
>>> Some advice from making a similar pipeline and struggling with
>>> throughput and latency:
>>>
>>>    1. Break up your pipeline into multiple pipelines. Dataflow only
>>>    auto-scales based on input throughput. If you’re microbatching events in
>>>    files, the job will only autoscale to meet the volume of files, not the
>>>    volume of events added to the pipeline from the files.
>>>       1. Better flow is:
>>>
>>>                                                                i.      Pipeline
>>> 1: Receive GCS notifications, read files, and then output file contents as
>>> Pubsub messages either per event or in microbatches from the file
>>>
>>>                                                              ii.      Pipeline
>>> 2: Receive events from Pubsub, do your transforms, then write to BQ
>>>
>>>    1. Use the Streaming Engine service (if you can run in a region
>>>    supporting it):
>>>    https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>    2. BQ streaming can be slower than a load job if you have a very
>>>    high volume (millions of events a minute). If your event volume is high,
>>>    you might need to consider further microbatching loads into BQ from GCS.
>>>
>>>
>>>
>>> Hope this helps,
>>>
>>> Erik
>>>
>>>
>>>
>>> *From:* Frantisek Csajka <cs...@gmail.com>
>>> *Sent:* Friday, November 22, 2019 5:35 AM
>>> *To:* user@beam.apache.org
>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>>
>>>
>>>
>>> Hi Beamers,
>>>
>>> We are facing OutOfMemory errors with a streaming pipeline on Dataflow.
>>> We are unable to get rid of them, not even with bigger worker instances.
>>> Any advice will be appreciated.
>>>
>>> The scenario is the following.
>>> - Files are being written to a bucket in GCS. A notification is set up
>>> on the bucket, so for each file there is a Pub/Sub message
>>> - Notifications are consumed by our pipeline
>>> - Files from notifications are read by the pipeline
>>> - Each file contains several events (there is a quite big fanout)
>>> - Events are written to BigQuery (streaming)
>>>
>>> Everything works fine if there are only few notifications, but if the
>>> files are incoming at high rate or if there is a large backlog in the
>>> notification subscription, events get stuck in BQ write and later OOM is
>>> thrown.
>>>
>>> Having a larger worker does not work because if the backlog is large,
>>> larger instance(s) will throw OOM as well, just later...
>>>
>>> As far as we can see, there is a checkpointing/reshuffle in BQ write and
>>> thats where the elements got stuck. It looks like the pubsub is consuming
>>> too many elements and due to fanout it causes OOM when grouping in
>>> reshuffle.
>>> Is there any way to backpressure the pubsub read? Is it possible to have
>>> smaller window in Reshuffle? How does the Reshuffle actually work?
>>> Any advice?
>>>
>>> Thanks in advance,
>>> Frantisek
>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it
>>> are intended solely for the use of the individual or entity to whom they
>>> are addressed and may contain confidential and privileged information
>>> protected by law. If you received this e-mail in error, any review, use,
>>> dissemination, distribution, or copying of the e-mail is strictly
>>> prohibited. Please notify the sender immediately by return e-mail and
>>> delete all copies from your system.
>>>
>>

Re: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Kenneth Knowles <ke...@apache.org>.
+Lukasz Cwik <lc...@google.com> +Chamikara Jayalath <ch...@google.com>

It sounds like your high-fanout transform that listens for new files on
Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
fairly common use case that could be a useful general contribution to Beam.

Kenn

On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <jk...@mozilla.com> wrote:

> We also had throughput issues in writing to BQ in a streaming pipeline and
> we mitigated by provisioning a large quantity of SSD storage to improve I/O
> throughput to disk for checkpoints.
>
> I also Erik's suggestion to look into Streaming Engine. We are currently
> looking into migrating our streaming use cases to use streaming engine
> after we had success with improved BQ write throughput on batch workloads
> by using Shuffle service (the batch mode analogue to the Streaming Engine).
>
>
>
> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <er...@ntconcepts.com>
> wrote:
>
>> Hi Frantisek,
>>
>>
>>
>> Some advice from making a similar pipeline and struggling with throughput
>> and latency:
>>
>>    1. Break up your pipeline into multiple pipelines. Dataflow only
>>    auto-scales based on input throughput. If you’re microbatching events in
>>    files, the job will only autoscale to meet the volume of files, not the
>>    volume of events added to the pipeline from the files.
>>       1. Better flow is:
>>
>>                                                                i.      Pipeline
>> 1: Receive GCS notifications, read files, and then output file contents as
>> Pubsub messages either per event or in microbatches from the file
>>
>>                                                              ii.      Pipeline
>> 2: Receive events from Pubsub, do your transforms, then write to BQ
>>
>>    1. Use the Streaming Engine service (if you can run in a region
>>    supporting it):
>>    https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>    2. BQ streaming can be slower than a load job if you have a very high
>>    volume (millions of events a minute). If your event volume is high, you
>>    might need to consider further microbatching loads into BQ from GCS.
>>
>>
>>
>> Hope this helps,
>>
>> Erik
>>
>>
>>
>> *From:* Frantisek Csajka <cs...@gmail.com>
>> *Sent:* Friday, November 22, 2019 5:35 AM
>> *To:* user@beam.apache.org
>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>
>>
>>
>> Hi Beamers,
>>
>> We are facing OutOfMemory errors with a streaming pipeline on Dataflow.
>> We are unable to get rid of them, not even with bigger worker instances.
>> Any advice will be appreciated.
>>
>> The scenario is the following.
>> - Files are being written to a bucket in GCS. A notification is set up on
>> the bucket, so for each file there is a Pub/Sub message
>> - Notifications are consumed by our pipeline
>> - Files from notifications are read by the pipeline
>> - Each file contains several events (there is a quite big fanout)
>> - Events are written to BigQuery (streaming)
>>
>> Everything works fine if there are only few notifications, but if the
>> files are incoming at high rate or if there is a large backlog in the
>> notification subscription, events get stuck in BQ write and later OOM is
>> thrown.
>>
>> Having a larger worker does not work because if the backlog is large,
>> larger instance(s) will throw OOM as well, just later...
>>
>> As far as we can see, there is a checkpointing/reshuffle in BQ write and
>> thats where the elements got stuck. It looks like the pubsub is consuming
>> too many elements and due to fanout it causes OOM when grouping in
>> reshuffle.
>> Is there any way to backpressure the pubsub read? Is it possible to have
>> smaller window in Reshuffle? How does the Reshuffle actually work?
>> Any advice?
>>
>> Thanks in advance,
>> Frantisek
>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it are
>> intended solely for the use of the individual or entity to whom they are
>> addressed and may contain confidential and privileged information protected
>> by law. If you received this e-mail in error, any review, use,
>> dissemination, distribution, or copying of the e-mail is strictly
>> prohibited. Please notify the sender immediately by return e-mail and
>> delete all copies from your system.
>>
>

Re: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Jeff Klukas <jk...@mozilla.com>.
We also had throughput issues in writing to BQ in a streaming pipeline and
we mitigated by provisioning a large quantity of SSD storage to improve I/O
throughput to disk for checkpoints.

I also Erik's suggestion to look into Streaming Engine. We are currently
looking into migrating our streaming use cases to use streaming engine
after we had success with improved BQ write throughput on batch workloads
by using Shuffle service (the batch mode analogue to the Streaming Engine).



On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <er...@ntconcepts.com>
wrote:

> Hi Frantisek,
>
>
>
> Some advice from making a similar pipeline and struggling with throughput
> and latency:
>
>    1. Break up your pipeline into multiple pipelines. Dataflow only
>    auto-scales based on input throughput. If you’re microbatching events in
>    files, the job will only autoscale to meet the volume of files, not the
>    volume of events added to the pipeline from the files.
>       1. Better flow is:
>
>                                                                i.      Pipeline
> 1: Receive GCS notifications, read files, and then output file contents as
> Pubsub messages either per event or in microbatches from the file
>
>                                                              ii.      Pipeline
> 2: Receive events from Pubsub, do your transforms, then write to BQ
>
>    1. Use the Streaming Engine service (if you can run in a region
>    supporting it):
>    https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>    2. BQ streaming can be slower than a load job if you have a very high
>    volume (millions of events a minute). If your event volume is high, you
>    might need to consider further microbatching loads into BQ from GCS.
>
>
>
> Hope this helps,
>
> Erik
>
>
>
> *From:* Frantisek Csajka <cs...@gmail.com>
> *Sent:* Friday, November 22, 2019 5:35 AM
> *To:* user@beam.apache.org
> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>
>
>
> Hi Beamers,
>
> We are facing OutOfMemory errors with a streaming pipeline on Dataflow. We
> are unable to get rid of them, not even with bigger worker instances. Any
> advice will be appreciated.
>
> The scenario is the following.
> - Files are being written to a bucket in GCS. A notification is set up on
> the bucket, so for each file there is a Pub/Sub message
> - Notifications are consumed by our pipeline
> - Files from notifications are read by the pipeline
> - Each file contains several events (there is a quite big fanout)
> - Events are written to BigQuery (streaming)
>
> Everything works fine if there are only few notifications, but if the
> files are incoming at high rate or if there is a large backlog in the
> notification subscription, events get stuck in BQ write and later OOM is
> thrown.
>
> Having a larger worker does not work because if the backlog is large,
> larger instance(s) will throw OOM as well, just later...
>
> As far as we can see, there is a checkpointing/reshuffle in BQ write and
> thats where the elements got stuck. It looks like the pubsub is consuming
> too many elements and due to fanout it causes OOM when grouping in
> reshuffle.
> Is there any way to backpressure the pubsub read? Is it possible to have
> smaller window in Reshuffle? How does the Reshuffle actually work?
> Any advice?
>
> Thanks in advance,
> Frantisek
> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it are
> intended solely for the use of the individual or entity to whom they are
> addressed and may contain confidential and privileged information protected
> by law. If you received this e-mail in error, any review, use,
> dissemination, distribution, or copying of the e-mail is strictly
> prohibited. Please notify the sender immediately by return e-mail and
> delete all copies from your system.
>

RE: Fanout and OOMs on Dataflow while streaming to BQ

Posted by Erik Lincoln <er...@ntconcepts.com>.
Hi Frantisek,

Some advice from making a similar pipeline and struggling with throughput and latency:

  1.  Break up your pipeline into multiple pipelines. Dataflow only auto-scales based on input throughput. If you’re microbatching events in files, the job will only autoscale to meet the volume of files, not the volume of events added to the pipeline from the files.
     *   Better flow is:

                                                               i.      Pipeline 1: Receive GCS notifications, read files, and then output file contents as Pubsub messages either per event or in microbatches from the file

                                                             ii.      Pipeline 2: Receive events from Pubsub, do your transforms, then write to BQ

  1.  Use the Streaming Engine service (if you can run in a region supporting it): https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
  2.  BQ streaming can be slower than a load job if you have a very high volume (millions of events a minute). If your event volume is high, you might need to consider further microbatching loads into BQ from GCS.

Hope this helps,
Erik

From: Frantisek Csajka <cs...@gmail.com>
Sent: Friday, November 22, 2019 5:35 AM
To: user@beam.apache.org
Subject: Fanout and OOMs on Dataflow while streaming to BQ

Hi Beamers,

We are facing OutOfMemory errors with a streaming pipeline on Dataflow. We are unable to get rid of them, not even with bigger worker instances. Any advice will be appreciated.

The scenario is the following.
- Files are being written to a bucket in GCS. A notification is set up on the bucket, so for each file there is a Pub/Sub message
- Notifications are consumed by our pipeline
- Files from notifications are read by the pipeline
- Each file contains several events (there is a quite big fanout)
- Events are written to BigQuery (streaming)

Everything works fine if there are only few notifications, but if the files are incoming at high rate or if there is a large backlog in the notification subscription, events get stuck in BQ write and later OOM is thrown.

Having a larger worker does not work because if the backlog is large, larger instance(s) will throw OOM as well, just later...

As far as we can see, there is a checkpointing/reshuffle in BQ write and thats where the elements got stuck. It looks like the pubsub is consuming too many elements and due to fanout it causes OOM when grouping in reshuffle.
Is there any way to backpressure the pubsub read? Is it possible to have smaller window in Reshuffle? How does the Reshuffle actually work?
Any advice?

Thanks in advance,
Frantisek
CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it are intended solely for the use of the individual or entity to whom they are addressed and may contain confidential and privileged information protected by law. If you received this e-mail in error, any review, use, dissemination, distribution, or copying of the e-mail is strictly prohibited. Please notify the sender immediately by return e-mail and delete all copies from your system.