You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2021/06/02 18:10:41 UTC

Re: Is there a way (seetings) to limit the number of element per worker machine

If you want to control the total number of elements being processed
across all workers at a time, you can do this by assigning random keys
of the form RandomInteger() % TotalDesiredConcurrency followed by a
GroupByKey.

If you want to control the number of elements being processed in
parallel per VM, you can use the fact that Dataflow assigns one work
item per core, so an n1-standard-4 would process 4 elements in
parallel, an n1-highmem-2 would process 2 elements in parallel, etc.

You could also control this explicitly by using a global (per worker)
semaphore in your code. If you do this you may want to proceed your
rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
distribution. This should be much easier than trying to coordinate
multiple parallel pipelines.

On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
<ei...@orielresearch.org> wrote:
>
> Thanks Robert.
> I found the following explanation for the number of threads for 4 cores:
> You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core can have two threads. Your max thread count is, 4 CPU x 12 cores x 2 threads per core, so 12 x 4 x 2 is 96
> Can I limit the threads using the pipeline options in some way? 10-20 elements per worker will work for me.
>
> My current practice to work around that issue is to limit the number of elements in each dataflow pipeline (providing ~10 elements for each pipeline)
> Once I have completed around 200 elements processing = 20 pipelines (google does not allow more than 25 dataflow pipelines per region) with 10 elements each, I am launching the next 20 pipelines.
>
> This is ofcourse missing the benefit of serverless.
>
> Any idea, how to work around this?
>
> Best,
> Eila
>
>
> On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Note that workers generally process one element per thread at a time. The number of threads defaults to the number of cores of the VM that you're using.
>>
>> On Mon, May 17, 2021 at 10:18 AM Brian Hulette <bh...@google.com> wrote:
>>>
>>> What type of files are you reading? If they can be split and read by multiple workers this might be a good candidate for a Splittable DoFn (SDF).
>>>
>>> Brian
>>>
>>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <ei...@orielresearch.org> wrote:
>>>>
>>>> Hi,
>>>> I am running out of resources on the workers machines.
>>>> The reasons are:
>>>> 1. Every pcollection is a reference to a LARGE file that is copied into the worker
>>>> 2. The worker makes calculations on the copied file using a software library that consumes memory / storage / compute resources
>>>>
>>>> I have changed the workers' CPUs and memory size. At some point, I am running out of resources with this method as well
>>>> I am looking to limit the number of pCollection / elements that are being processed in parallel on each worker at a time.
>>>>
>>>> Many thank for any advice,
>>>> Best wishes,
>>>> --
>>>> Eila
>>>>
>>>> Meetup
>
>
>
> --
> Eila
>
> Meetup

Re: Is there a way (seetings) to limit the number of element per worker machine

Posted by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org>.
Hi Roberts,
Thank you. I usually work with the custom worker configuration options
I will custom it to low number of cores with large memory and see if it solves my problem

Thanks so much,
—
Eila
www.orielresearch.com
https://www.meetup.com/Deep-Learning-In-Production 


Sent from my iPhone

> On Jun 2, 2021, at 2:10 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> If you want to control the total number of elements being processed
> across all workers at a time, you can do this by assigning random keys
> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> GroupByKey.
> 
> If you want to control the number of elements being processed in
> parallel per VM, you can use the fact that Dataflow assigns one work
> item per core, so an n1-standard-4 would process 4 elements in
> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
> 
> You could also control this explicitly by using a global (per worker)
> semaphore in your code. If you do this you may want to proceed your
> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> distribution. This should be much easier than trying to coordinate
> multiple parallel pipelines.
> 
>> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
>> <ei...@orielresearch.org> wrote:
>> 
>> Thanks Robert.
>> I found the following explanation for the number of threads for 4 cores:
>> You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core can have two threads. Your max thread count is, 4 CPU x 12 cores x 2 threads per core, so 12 x 4 x 2 is 96
>> Can I limit the threads using the pipeline options in some way? 10-20 elements per worker will work for me.
>> 
>> My current practice to work around that issue is to limit the number of elements in each dataflow pipeline (providing ~10 elements for each pipeline)
>> Once I have completed around 200 elements processing = 20 pipelines (google does not allow more than 25 dataflow pipelines per region) with 10 elements each, I am launching the next 20 pipelines.
>> 
>> This is ofcourse missing the benefit of serverless.
>> 
>> Any idea, how to work around this?
>> 
>> Best,
>> Eila
>> 
>> 
>>> On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw <ro...@google.com> wrote:
>>> 
>>> Note that workers generally process one element per thread at a time. The number of threads defaults to the number of cores of the VM that you're using.
>>> 
>>> On Mon, May 17, 2021 at 10:18 AM Brian Hulette <bh...@google.com> wrote:
>>>> 
>>>> What type of files are you reading? If they can be split and read by multiple workers this might be a good candidate for a Splittable DoFn (SDF).
>>>> 
>>>> Brian
>>>> 
>>>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <ei...@orielresearch.org> wrote:
>>>>> 
>>>>> Hi,
>>>>> I am running out of resources on the workers machines.
>>>>> The reasons are:
>>>>> 1. Every pcollection is a reference to a LARGE file that is copied into the worker
>>>>> 2. The worker makes calculations on the copied file using a software library that consumes memory / storage / compute resources
>>>>> 
>>>>> I have changed the workers' CPUs and memory size. At some point, I am running out of resources with this method as well
>>>>> I am looking to limit the number of pCollection / elements that are being processed in parallel on each worker at a time.
>>>>> 
>>>>> Many thank for any advice,
>>>>> Best wishes,
>>>>> --
>>>>> Eila
>>>>> 
>>>>> Meetup
>> 
>> 
>> 
>> --
>> Eila
>> 
>> Meetup

Re: Is there a way (seetings) to limit the number of element per worker machine

Posted by Vincent Marquez <vi...@gmail.com>.
On Wed, Jun 2, 2021 at 11:27 AM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Jun 2, 2021 at 11:18 AM Vincent Marquez
> <vi...@gmail.com> wrote:
> >
> > On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> If you want to control the total number of elements being processed
> >> across all workers at a time, you can do this by assigning random keys
> >> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> >> GroupByKey.
> >>
> >> If you want to control the number of elements being processed in
> >> parallel per VM, you can use the fact that Dataflow assigns one work
> >> item per core, so an n1-standard-4 would process 4 elements in
> >> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
> >>
> >> You could also control this explicitly by using a global (per worker)
> >> semaphore in your code. If you do this you may want to proceed your
> >> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> >> distribution. This should be much easier than trying to coordinate
> >> multiple parallel pipelines.
> >>
> >
> > Is there a risk here of having an OOM error due to 'build up' of in
> memory elements from a streaming input?  Or do the runners have some
> concept of throttling bundles based on progress of stages further down the
> pipeline?
>
> For streaming pipelines, hundreds of threads (aka work items) are
> allocated for each worker, so limiting the number of concurrent items
> per worker is harder there.
>
>
Hmm, I did notice this today, that many many many DoFns are instantiated in
a streaming job compared to how many I expected.   This seems like it would
cause all sorts of problems.  For instance, if one were to use the readAll
for say JDBC or Redis or any number of connectors, each of which sets up
connections to some endpoint, a single worker could have hundreds or
thousands of JDBC connections?  I would think this would definitely make
some of the readAll transforms less usable in a streaming pipeline if
scaling out the number of workers would overload the source machines.

Is this behavior documented somewhere?  Is this true for all runners?

--Vincent




> >> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
> >> <ei...@orielresearch.org> wrote:
> >> >
> >> > Thanks Robert.
> >> > I found the following explanation for the number of threads for 4
> cores:
> >> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each
> core can have two threads. Your max thread count is, 4 CPU x 12 cores x 2
> threads per core, so 12 x 4 x 2 is 96
> >> > Can I limit the threads using the pipeline options in some way? 10-20
> elements per worker will work for me.
> >> >
> >> > My current practice to work around that issue is to limit the number
> of elements in each dataflow pipeline (providing ~10 elements for each
> pipeline)
> >> > Once I have completed around 200 elements processing = 20 pipelines
> (google does not allow more than 25 dataflow pipelines per region) with 10
> elements each, I am launching the next 20 pipelines.
> >> >
> >> > This is ofcourse missing the benefit of serverless.
> >> >
> >> > Any idea, how to work around this?
> >> >
> >> > Best,
> >> > Eila
> >> >
> >> >
> >> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >> >>
> >> >> Note that workers generally process one element per thread at a
> time. The number of threads defaults to the number of cores of the VM that
> you're using.
> >> >>
> >> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette <bh...@google.com>
> wrote:
> >> >>>
> >> >>> What type of files are you reading? If they can be split and read
> by multiple workers this might be a good candidate for a Splittable DoFn
> (SDF).
> >> >>>
> >> >>> Brian
> >> >>>
> >> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <
> eila@orielresearch.org> wrote:
> >> >>>>
> >> >>>> Hi,
> >> >>>> I am running out of resources on the workers machines.
> >> >>>> The reasons are:
> >> >>>> 1. Every pcollection is a reference to a LARGE file that is copied
> into the worker
> >> >>>> 2. The worker makes calculations on the copied file using a
> software library that consumes memory / storage / compute resources
> >> >>>>
> >> >>>> I have changed the workers' CPUs and memory size. At some point, I
> am running out of resources with this method as well
> >> >>>> I am looking to limit the number of pCollection / elements that
> are being processed in parallel on each worker at a time.
> >> >>>>
> >> >>>> Many thank for any advice,
> >> >>>> Best wishes,
> >> >>>> --
> >> >>>> Eila
> >> >>>>
> >> >>>> Meetup
> >> >
> >> >
> >> >
> >> > --
> >> > Eila
> >> >
> >> > Meetup
> >
> >
> >
> > ~Vincent
>

Re: Is there a way (seetings) to limit the number of element per worker machine

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Jun 2, 2021 at 11:18 AM Vincent Marquez
<vi...@gmail.com> wrote:
>
> On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> If you want to control the total number of elements being processed
>> across all workers at a time, you can do this by assigning random keys
>> of the form RandomInteger() % TotalDesiredConcurrency followed by a
>> GroupByKey.
>>
>> If you want to control the number of elements being processed in
>> parallel per VM, you can use the fact that Dataflow assigns one work
>> item per core, so an n1-standard-4 would process 4 elements in
>> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
>>
>> You could also control this explicitly by using a global (per worker)
>> semaphore in your code. If you do this you may want to proceed your
>> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
>> distribution. This should be much easier than trying to coordinate
>> multiple parallel pipelines.
>>
>
> Is there a risk here of having an OOM error due to 'build up' of in memory elements from a streaming input?  Or do the runners have some concept of throttling bundles based on progress of stages further down the pipeline?

For streaming pipelines, hundreds of threads (aka work items) are
allocated for each worker, so limiting the number of concurrent items
per worker is harder there.

>> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
>> <ei...@orielresearch.org> wrote:
>> >
>> > Thanks Robert.
>> > I found the following explanation for the number of threads for 4 cores:
>> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core can have two threads. Your max thread count is, 4 CPU x 12 cores x 2 threads per core, so 12 x 4 x 2 is 96
>> > Can I limit the threads using the pipeline options in some way? 10-20 elements per worker will work for me.
>> >
>> > My current practice to work around that issue is to limit the number of elements in each dataflow pipeline (providing ~10 elements for each pipeline)
>> > Once I have completed around 200 elements processing = 20 pipelines (google does not allow more than 25 dataflow pipelines per region) with 10 elements each, I am launching the next 20 pipelines.
>> >
>> > This is ofcourse missing the benefit of serverless.
>> >
>> > Any idea, how to work around this?
>> >
>> > Best,
>> > Eila
>> >
>> >
>> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw <ro...@google.com> wrote:
>> >>
>> >> Note that workers generally process one element per thread at a time. The number of threads defaults to the number of cores of the VM that you're using.
>> >>
>> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette <bh...@google.com> wrote:
>> >>>
>> >>> What type of files are you reading? If they can be split and read by multiple workers this might be a good candidate for a Splittable DoFn (SDF).
>> >>>
>> >>> Brian
>> >>>
>> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <ei...@orielresearch.org> wrote:
>> >>>>
>> >>>> Hi,
>> >>>> I am running out of resources on the workers machines.
>> >>>> The reasons are:
>> >>>> 1. Every pcollection is a reference to a LARGE file that is copied into the worker
>> >>>> 2. The worker makes calculations on the copied file using a software library that consumes memory / storage / compute resources
>> >>>>
>> >>>> I have changed the workers' CPUs and memory size. At some point, I am running out of resources with this method as well
>> >>>> I am looking to limit the number of pCollection / elements that are being processed in parallel on each worker at a time.
>> >>>>
>> >>>> Many thank for any advice,
>> >>>> Best wishes,
>> >>>> --
>> >>>> Eila
>> >>>>
>> >>>> Meetup
>> >
>> >
>> >
>> > --
>> > Eila
>> >
>> > Meetup
>
>
>
> ~Vincent

Re: Is there a way (seetings) to limit the number of element per worker machine

Posted by Vincent Marquez <vi...@gmail.com>.
On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw <ro...@google.com> wrote:

> If you want to control the total number of elements being processed
> across all workers at a time, you can do this by assigning random keys
> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> GroupByKey.
>
> If you want to control the number of elements being processed in
> parallel per VM, you can use the fact that Dataflow assigns one work
> item per core, so an n1-standard-4 would process 4 elements in
> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
>
> You could also control this explicitly by using a global (per worker)
> semaphore in your code. If you do this you may want to proceed your
> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> distribution. This should be much easier than trying to coordinate
> multiple parallel pipelines.
>
>
Is there a risk here of having an OOM error due to 'build up' of in memory
elements from a streaming input?  Or do the runners have some concept of
throttling bundles based on progress of stages further down the pipeline?




> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
> <ei...@orielresearch.org> wrote:
> >
> > Thanks Robert.
> > I found the following explanation for the number of threads for 4 cores:
> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core
> can have two threads. Your max thread count is, 4 CPU x 12 cores x 2
> threads per core, so 12 x 4 x 2 is 96
> > Can I limit the threads using the pipeline options in some way? 10-20
> elements per worker will work for me.
> >
> > My current practice to work around that issue is to limit the number of
> elements in each dataflow pipeline (providing ~10 elements for each
> pipeline)
> > Once I have completed around 200 elements processing = 20 pipelines
> (google does not allow more than 25 dataflow pipelines per region) with 10
> elements each, I am launching the next 20 pipelines.
> >
> > This is ofcourse missing the benefit of serverless.
> >
> > Any idea, how to work around this?
> >
> > Best,
> > Eila
> >
> >
> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Note that workers generally process one element per thread at a time.
> The number of threads defaults to the number of cores of the VM that you're
> using.
> >>
> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette <bh...@google.com>
> wrote:
> >>>
> >>> What type of files are you reading? If they can be split and read by
> multiple workers this might be a good candidate for a Splittable DoFn (SDF).
> >>>
> >>> Brian
> >>>
> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <
> eila@orielresearch.org> wrote:
> >>>>
> >>>> Hi,
> >>>> I am running out of resources on the workers machines.
> >>>> The reasons are:
> >>>> 1. Every pcollection is a reference to a LARGE file that is copied
> into the worker
> >>>> 2. The worker makes calculations on the copied file using a software
> library that consumes memory / storage / compute resources
> >>>>
> >>>> I have changed the workers' CPUs and memory size. At some point, I am
> running out of resources with this method as well
> >>>> I am looking to limit the number of pCollection / elements that are
> being processed in parallel on each worker at a time.
> >>>>
> >>>> Many thank for any advice,
> >>>> Best wishes,
> >>>> --
> >>>> Eila
> >>>>
> >>>> Meetup
> >
> >
> >
> > --
> > Eila
> >
> > Meetup
>


~Vincent