You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by André Rocha Silva <a....@portaltelemedicina.com.br> on 2019/12/31 13:25:27 UTC

Cloud dataflow is not scaling

Hi!

I have a cloud dataflow job that is not scaling.

The job sequence is the following:
1 -  [io] Read from a file in the bucket (1 element out)
2 - [ParDo] With the file information, get a query from a database (10,000
elements out)
3 - [ParDo] Works with the elements

But when I read from a file that already contains the same database query
result it scales to 60+ workers:
1 -  [io] Read from a file in the bucket (10,000 elements out)
2 - [ParDo] Works with the elements

Do I have to develop an I/O connector for the apache beam to know how many
elements its dealing with?

Best regards
André Rocha Silva

Re: Cloud dataflow is not scaling

Posted by Reza Rokni <re...@google.com>.
I suspect this is due to Fusion in the steps between the FanOut and the
DoFn.

Don't have the link to hand but look to the docs for 'breaking fusion' .
Essentially via a Reshuffle or a sideinput.

Cheers

Reza

On Tue, 31 Dec 2019, 14:26 André Rocha Silva, <
a.silva@portaltelemedicina.com.br> wrote:

> Hi!
>
> I have a cloud dataflow job that is not scaling.
>
> The job sequence is the following:
> 1 -  [io] Read from a file in the bucket (1 element out)
> 2 - [ParDo] With the file information, get a query from a database (10,000
> elements out)
> 3 - [ParDo] Works with the elements
>
> But when I read from a file that already contains the same database query
> result it scales to 60+ workers:
> 1 -  [io] Read from a file in the bucket (10,000 elements out)
> 2 - [ParDo] Works with the elements
>
> Do I have to develop an I/O connector for the apache beam to know how many
> elements its dealing with?
>
> Best regards
> André Rocha Silva
>
>
>

Re: Cloud dataflow is not scaling

Posted by André Rocha Silva <a....@portaltelemedicina.com.br>.
Thank you very much!!

I simply added a beam.Reshuffle() between 2 and 3 and it worked!
[image: image.png]

On Thu, Jan 2, 2020 at 11:40 AM Leonardo Miguel <
leonardo.miguel@arquivei.com.br> wrote:

> Hi André,
>
> As Reza pointed out, it may be due to fusion.
>
> Dataflow scaling is THROUGHPUT_BASED but it may use fusion for
> optimization.
> So, what may happen is: step 2 and 3 are executed fused, and throughput is
> calculated based only on the output of 1. Doing a reshuffle between 2 and 3
> would prevent fusion and Dataflow would understand that step 2 produces a
> large amount of items that can be processed in step 3 in parallel.
>
> A way of avoiding this behavior is making sure your ParDos are 1:1 (one
> input item produces one output item). I don't know if there is a
> programmatic way of telling Dataflow when it's 1:N but I don't think there
> is, so the default way is prevent fusion using Reshuffle or side input.
>
> If you are not familiar with fusion, Dataflow may optimize your
> transformation graph in a way that a series of ParDos are executed as one
> large transform. So, step 2 and 3 would be executed together and output of
> step 2 would not be parallelized. I think you may find more info at
> Dataflow docs.
>
> Em ter., 31 de dez. de 2019 às 10:25, André Rocha Silva <
> a.silva@portaltelemedicina.com.br> escreveu:
>
>> Hi!
>>
>> I have a cloud dataflow job that is not scaling.
>>
>> The job sequence is the following:
>> 1 -  [io] Read from a file in the bucket (1 element out)
>> 2 - [ParDo] With the file information, get a query from a database
>> (10,000 elements out)
>> 3 - [ParDo] Works with the elements
>>
>> But when I read from a file that already contains the same database query
>> result it scales to 60+ workers:
>> 1 -  [io] Read from a file in the bucket (10,000 elements out)
>> 2 - [ParDo] Works with the elements
>>
>> Do I have to develop an I/O connector for the apache beam to know how
>> many elements its dealing with?
>>
>> Best regards
>> André Rocha Silva
>>
>>
>>
>
>
> --
> []s
>
> Leonardo Alves Miguel
> Data Engineer
> (16) 3509-5515 | www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>

Re: Cloud dataflow is not scaling

Posted by Leonardo Miguel <le...@arquivei.com.br>.
Hi André,

As Reza pointed out, it may be due to fusion.

Dataflow scaling is THROUGHPUT_BASED but it may use fusion for optimization.
So, what may happen is: step 2 and 3 are executed fused, and throughput is
calculated based only on the output of 1. Doing a reshuffle between 2 and 3
would prevent fusion and Dataflow would understand that step 2 produces a
large amount of items that can be processed in step 3 in parallel.

A way of avoiding this behavior is making sure your ParDos are 1:1 (one
input item produces one output item). I don't know if there is a
programmatic way of telling Dataflow when it's 1:N but I don't think there
is, so the default way is prevent fusion using Reshuffle or side input.

If you are not familiar with fusion, Dataflow may optimize your
transformation graph in a way that a series of ParDos are executed as one
large transform. So, step 2 and 3 would be executed together and output of
step 2 would not be parallelized. I think you may find more info at
Dataflow docs.

Em ter., 31 de dez. de 2019 às 10:25, André Rocha Silva <
a.silva@portaltelemedicina.com.br> escreveu:

> Hi!
>
> I have a cloud dataflow job that is not scaling.
>
> The job sequence is the following:
> 1 -  [io] Read from a file in the bucket (1 element out)
> 2 - [ParDo] With the file information, get a query from a database (10,000
> elements out)
> 3 - [ParDo] Works with the elements
>
> But when I read from a file that already contains the same database query
> result it scales to 60+ workers:
> 1 -  [io] Read from a file in the bucket (10,000 elements out)
> 2 - [ParDo] Works with the elements
>
> Do I have to develop an I/O connector for the apache beam to know how many
> elements its dealing with?
>
> Best regards
> André Rocha Silva
>
>
>


-- 
[]s

Leonardo Alves Miguel
Data Engineer
(16) 3509-5515 | www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>