You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Maulik Gandhi <mm...@gmail.com> on 2019/03/19 17:06:56 UTC

Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Maulik Gandhi <mm...@gmail.com>
10:19 AM (1 hour ago)
to user
Hi Beam Community,

I am working on Beam processing pipeline, which reads data from the
non-bounded and bounded source and want to leverage Beam state management
in my pipeline.  For putting data in Beam state, I have to transfer the
data in key-value (eg: KV<String, Object>.  As I am reading data from the
non-bounded and bounded source, I am forced to perform Window + Triggering,
before grouping data by key.  I have chosen to use GlobalWindows().

I am able to kick-off the Data Flow job, which would run my Beam pipeline.
I have noticed Data Flow would use only 1 Worker node to perform the work,
and would not scale the job to use more worker nodes, thus not leveraging
the benefit of distributed processing.

I have posted the question on Stack Overflow:
https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling
but
reaching out on the mailing list, to get some help, or learn what I
am missing.

Any help would be appreciated.

Thanks.
- Maulik

Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Posted by Maulik Gandhi <mm...@gmail.com>.
I think now as I understand this more clearly, there are a couple of things
going on.  I will try to re-explain what I am trying to achieve.

- I am reading from 2 Sources
  - Bounded (AVRO from GCS)
  - Unbounded (AVRO from PubSub)
- I want to prime Beam pipeline state, with data from GCS (bounded source),
using UserId as key, even when data from PubSub is not flowing through.
- Later, when data from PubSub (un-bounded source), starts flowing I would
update/add to Beam state, using (same) UserId as key.

How big is your bounded-source
> 16.01 GiB total data from AVRO files.  But it can be b/w 10-100s of GBs

How much pressure (messages per seconds) your unbounded source is
receiving?
> Initially no pressure, to prime the Beam state, but later there will be
data flowing through PubSub.

I also add a parameter on mvn command, as below and could get 7 (105/15)
worker, as per guide:
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling

mvn compile exec:java -pl <project-name>
-Dexec.mainClass=<class-name> \
-Dexec.args="--runner=DataflowRunner --project=<project-name> \
             --stagingLocation=gs://<gcs-location> \
             --maxNumWorkers=105 \
             --autoscalingAlgorithm=THROUGHPUT_BASED \
             --templateLocation=gs://<gcs-location>"

Thanks.
- Maulik


On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only
> when it feels the pipeline is not able to keep it up with the incoming
> source. How big is your bounded-source and how much pressure (messages per
> seconds) your unbounded source is receiving?
>
> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 21:06:
>
>> Hi Juan,
>>
>> Thanks for replying.  I believe I am using correct configurations.
>>
>> I have posted more details with code snippet and Data Flow job template
>> configuration on Stack Overflow post:
>> https://stackoverflow.com/q/55242684/11226631
>>
>> Thanks.
>> - Maulik
>>
>> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia <jc...@gmail.com>
>> wrote:
>>
>>> Hi Maulik,
>>>
>>> Have you submitted your job with the correct configuration to enable
>>> autoscaling?
>>>
>>> --autoscalingAlgorithm=
>>> --maxWorkers=
>>>
>>> I am on my phone right now and can't tell if the flags name are 100%
>>> correct.
>>>
>>>
>>> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 18:13:
>>>
>>>>
>>>> Maulik Gandhi <mm...@gmail.com>
>>>> 10:19 AM (1 hour ago)
>>>> to user
>>>> Hi Beam Community,
>>>>
>>>> I am working on Beam processing pipeline, which reads data from the
>>>> non-bounded and bounded source and want to leverage Beam state management
>>>> in my pipeline.  For putting data in Beam state, I have to transfer the
>>>> data in key-value (eg: KV<String, Object>.  As I am reading data from the
>>>> non-bounded and bounded source, I am forced to perform Window + Triggering,
>>>> before grouping data by key.  I have chosen to use GlobalWindows().
>>>>
>>>> I am able to kick-off the Data Flow job, which would run my Beam
>>>> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
>>>> the work, and would not scale the job to use more worker nodes, thus not
>>>> leveraging the benefit of distributed processing.
>>>>
>>>> I have posted the question on Stack Overflow:
>>>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but
>>>> reaching out on the mailing list, to get some help, or learn what I
>>>> am missing.
>>>>
>>>> Any help would be appreciated.
>>>>
>>>> Thanks.
>>>> - Maulik
>>>>
>>>

Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Posted by Reza Rokni <re...@google.com>.
Hi,

How many keys do you have flowing through that global window? If there is a
wide key space any chance you have a few very hot keys?

Cheers

Rez

On Thu, 21 Mar 2019, 04:04 Juan Carlos Garcia, <jc...@gmail.com> wrote:

> I would recommend going to the compute engine service and check the vm
> where the pipeline is working, from there you might have more insight if
> you have a bottleneck on your pipeline (cpu, io, network) that is
> preventing to process it faster.
>
>
>
> Maulik Gandhi <mm...@gmail.com> schrieb am Mi., 20. März 2019, 20:15:
>
>> How big is your bounded-source
>> - 16.01 GiB total data from AVRO files.  But it can be b/w 10-100s of GBs
>>
>> How much pressure (messages per seconds) your unbounded source is
>> receiving?
>> -  Initially no pressure, to prime the Beam state, but later there will
>> be data flowing through PubSub.
>>
>> I also add a parameter on mvn command, as below and could get 7 (105/15)
>> worker, as per guide:
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling
>>
>> mvn compile exec:java -pl <project-name>
>> -Dexec.mainClass=<class-name> \
>> -Dexec.args="--runner=DataflowRunner --project=<project-name> \
>>              --stagingLocation=gs://<gcs-location> \
>>              --maxNumWorkers=105 \
>>              --autoscalingAlgorithm=THROUGHPUT_BASED \
>>              --templateLocation=gs://<gcs-location>"
>>
>> Even though I got 7 worker nodes, when processing GCS data (bounded
>> source) and adding it to Beam state, I think the work was just being
>> performed on 1 node, as it took more than 16+ hours.
>>
>> Can someone point me to documentation, on how to figure out how much data
>> is being processed by each worker node (like reading GCS part AVRO files,
>> input counts, etc), rather than just high-level count of input and output
>> element from ParDo.
>>
>> Thanks.
>> - Maulik
>>
>> On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia <jc...@gmail.com>
>> wrote:
>>
>>> Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only
>>> when it feels the pipeline is not able to keep it up with the incoming
>>> source. How big is your bounded-source and how much pressure (messages per
>>> seconds) your unbounded source is receiving?
>>>
>>> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 21:06:
>>>
>>>> Hi Juan,
>>>>
>>>> Thanks for replying.  I believe I am using correct configurations.
>>>>
>>>> I have posted more details with code snippet and Data Flow job template
>>>> configuration on Stack Overflow post:
>>>> https://stackoverflow.com/q/55242684/11226631
>>>>
>>>> Thanks.
>>>> - Maulik
>>>>
>>>> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia <jc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Maulik,
>>>>>
>>>>> Have you submitted your job with the correct configuration to enable
>>>>> autoscaling?
>>>>>
>>>>> --autoscalingAlgorithm=
>>>>> --maxWorkers=
>>>>>
>>>>> I am on my phone right now and can't tell if the flags name are 100%
>>>>> correct.
>>>>>
>>>>>
>>>>> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 18:13:
>>>>>
>>>>>>
>>>>>> Maulik Gandhi <mm...@gmail.com>
>>>>>> 10:19 AM (1 hour ago)
>>>>>> to user
>>>>>> Hi Beam Community,
>>>>>>
>>>>>> I am working on Beam processing pipeline, which reads data from the
>>>>>> non-bounded and bounded source and want to leverage Beam state management
>>>>>> in my pipeline.  For putting data in Beam state, I have to transfer the
>>>>>> data in key-value (eg: KV<String, Object>.  As I am reading data from the
>>>>>> non-bounded and bounded source, I am forced to perform Window + Triggering,
>>>>>> before grouping data by key.  I have chosen to use GlobalWindows().
>>>>>>
>>>>>> I am able to kick-off the Data Flow job, which would run my Beam
>>>>>> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
>>>>>> the work, and would not scale the job to use more worker nodes, thus not
>>>>>> leveraging the benefit of distributed processing.
>>>>>>
>>>>>> I have posted the question on Stack Overflow:
>>>>>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but
>>>>>> reaching out on the mailing list, to get some help, or learn what I
>>>>>> am missing.
>>>>>>
>>>>>> Any help would be appreciated.
>>>>>>
>>>>>> Thanks.
>>>>>> - Maulik
>>>>>>
>>>>>

Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Posted by Juan Carlos Garcia <jc...@gmail.com>.
I would recommend going to the compute engine service and check the vm
where the pipeline is working, from there you might have more insight if
you have a bottleneck on your pipeline (cpu, io, network) that is
preventing to process it faster.



Maulik Gandhi <mm...@gmail.com> schrieb am Mi., 20. März 2019, 20:15:

> How big is your bounded-source
> - 16.01 GiB total data from AVRO files.  But it can be b/w 10-100s of GBs
>
> How much pressure (messages per seconds) your unbounded source is
> receiving?
> -  Initially no pressure, to prime the Beam state, but later there will be
> data flowing through PubSub.
>
> I also add a parameter on mvn command, as below and could get 7 (105/15)
> worker, as per guide:
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling
>
> mvn compile exec:java -pl <project-name>
> -Dexec.mainClass=<class-name> \
> -Dexec.args="--runner=DataflowRunner --project=<project-name> \
>              --stagingLocation=gs://<gcs-location> \
>              --maxNumWorkers=105 \
>              --autoscalingAlgorithm=THROUGHPUT_BASED \
>              --templateLocation=gs://<gcs-location>"
>
> Even though I got 7 worker nodes, when processing GCS data (bounded
> source) and adding it to Beam state, I think the work was just being
> performed on 1 node, as it took more than 16+ hours.
>
> Can someone point me to documentation, on how to figure out how much data
> is being processed by each worker node (like reading GCS part AVRO files,
> input counts, etc), rather than just high-level count of input and output
> element from ParDo.
>
> Thanks.
> - Maulik
>
> On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only
>> when it feels the pipeline is not able to keep it up with the incoming
>> source. How big is your bounded-source and how much pressure (messages per
>> seconds) your unbounded source is receiving?
>>
>> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 21:06:
>>
>>> Hi Juan,
>>>
>>> Thanks for replying.  I believe I am using correct configurations.
>>>
>>> I have posted more details with code snippet and Data Flow job template
>>> configuration on Stack Overflow post:
>>> https://stackoverflow.com/q/55242684/11226631
>>>
>>> Thanks.
>>> - Maulik
>>>
>>> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia <jc...@gmail.com>
>>> wrote:
>>>
>>>> Hi Maulik,
>>>>
>>>> Have you submitted your job with the correct configuration to enable
>>>> autoscaling?
>>>>
>>>> --autoscalingAlgorithm=
>>>> --maxWorkers=
>>>>
>>>> I am on my phone right now and can't tell if the flags name are 100%
>>>> correct.
>>>>
>>>>
>>>> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 18:13:
>>>>
>>>>>
>>>>> Maulik Gandhi <mm...@gmail.com>
>>>>> 10:19 AM (1 hour ago)
>>>>> to user
>>>>> Hi Beam Community,
>>>>>
>>>>> I am working on Beam processing pipeline, which reads data from the
>>>>> non-bounded and bounded source and want to leverage Beam state management
>>>>> in my pipeline.  For putting data in Beam state, I have to transfer the
>>>>> data in key-value (eg: KV<String, Object>.  As I am reading data from the
>>>>> non-bounded and bounded source, I am forced to perform Window + Triggering,
>>>>> before grouping data by key.  I have chosen to use GlobalWindows().
>>>>>
>>>>> I am able to kick-off the Data Flow job, which would run my Beam
>>>>> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
>>>>> the work, and would not scale the job to use more worker nodes, thus not
>>>>> leveraging the benefit of distributed processing.
>>>>>
>>>>> I have posted the question on Stack Overflow:
>>>>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but
>>>>> reaching out on the mailing list, to get some help, or learn what I
>>>>> am missing.
>>>>>
>>>>> Any help would be appreciated.
>>>>>
>>>>> Thanks.
>>>>> - Maulik
>>>>>
>>>>

Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Posted by Maulik Gandhi <mm...@gmail.com>.
How big is your bounded-source
- 16.01 GiB total data from AVRO files.  But it can be b/w 10-100s of GBs

How much pressure (messages per seconds) your unbounded source is
receiving?
-  Initially no pressure, to prime the Beam state, but later there will be
data flowing through PubSub.

I also add a parameter on mvn command, as below and could get 7 (105/15)
worker, as per guide:
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#autoscaling

mvn compile exec:java -pl <project-name>
-Dexec.mainClass=<class-name> \
-Dexec.args="--runner=DataflowRunner --project=<project-name> \
             --stagingLocation=gs://<gcs-location> \
             --maxNumWorkers=105 \
             --autoscalingAlgorithm=THROUGHPUT_BASED \
             --templateLocation=gs://<gcs-location>"

Even though I got 7 worker nodes, when processing GCS data (bounded source)
and adding it to Beam state, I think the work was just being performed on 1
node, as it took more than 16+ hours.

Can someone point me to documentation, on how to figure out how much data
is being processed by each worker node (like reading GCS part AVRO files,
input counts, etc), rather than just high-level count of input and output
element from ParDo.

Thanks.
- Maulik

On Wed, Mar 20, 2019 at 3:17 AM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only
> when it feels the pipeline is not able to keep it up with the incoming
> source. How big is your bounded-source and how much pressure (messages per
> seconds) your unbounded source is receiving?
>
> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 21:06:
>
>> Hi Juan,
>>
>> Thanks for replying.  I believe I am using correct configurations.
>>
>> I have posted more details with code snippet and Data Flow job template
>> configuration on Stack Overflow post:
>> https://stackoverflow.com/q/55242684/11226631
>>
>> Thanks.
>> - Maulik
>>
>> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia <jc...@gmail.com>
>> wrote:
>>
>>> Hi Maulik,
>>>
>>> Have you submitted your job with the correct configuration to enable
>>> autoscaling?
>>>
>>> --autoscalingAlgorithm=
>>> --maxWorkers=
>>>
>>> I am on my phone right now and can't tell if the flags name are 100%
>>> correct.
>>>
>>>
>>> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 18:13:
>>>
>>>>
>>>> Maulik Gandhi <mm...@gmail.com>
>>>> 10:19 AM (1 hour ago)
>>>> to user
>>>> Hi Beam Community,
>>>>
>>>> I am working on Beam processing pipeline, which reads data from the
>>>> non-bounded and bounded source and want to leverage Beam state management
>>>> in my pipeline.  For putting data in Beam state, I have to transfer the
>>>> data in key-value (eg: KV<String, Object>.  As I am reading data from the
>>>> non-bounded and bounded source, I am forced to perform Window + Triggering,
>>>> before grouping data by key.  I have chosen to use GlobalWindows().
>>>>
>>>> I am able to kick-off the Data Flow job, which would run my Beam
>>>> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
>>>> the work, and would not scale the job to use more worker nodes, thus not
>>>> leveraging the benefit of distributed processing.
>>>>
>>>> I have posted the question on Stack Overflow:
>>>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but
>>>> reaching out on the mailing list, to get some help, or learn what I
>>>> am missing.
>>>>
>>>> Any help would be appreciated.
>>>>
>>>> Thanks.
>>>> - Maulik
>>>>
>>>

Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Your auto scaling algorithm is THROUGHPUT_BASED, it will kicks in only when
it feels the pipeline is not able to keep it up with the incoming source.
How big is your bounded-source and how much pressure (messages per seconds)
your unbounded source is receiving?

Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 21:06:

> Hi Juan,
>
> Thanks for replying.  I believe I am using correct configurations.
>
> I have posted more details with code snippet and Data Flow job template
> configuration on Stack Overflow post:
> https://stackoverflow.com/q/55242684/11226631
>
> Thanks.
> - Maulik
>
> On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> Hi Maulik,
>>
>> Have you submitted your job with the correct configuration to enable
>> autoscaling?
>>
>> --autoscalingAlgorithm=
>> --maxWorkers=
>>
>> I am on my phone right now and can't tell if the flags name are 100%
>> correct.
>>
>>
>> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 18:13:
>>
>>>
>>> Maulik Gandhi <mm...@gmail.com>
>>> 10:19 AM (1 hour ago)
>>> to user
>>> Hi Beam Community,
>>>
>>> I am working on Beam processing pipeline, which reads data from the
>>> non-bounded and bounded source and want to leverage Beam state management
>>> in my pipeline.  For putting data in Beam state, I have to transfer the
>>> data in key-value (eg: KV<String, Object>.  As I am reading data from the
>>> non-bounded and bounded source, I am forced to perform Window + Triggering,
>>> before grouping data by key.  I have chosen to use GlobalWindows().
>>>
>>> I am able to kick-off the Data Flow job, which would run my Beam
>>> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
>>> the work, and would not scale the job to use more worker nodes, thus not
>>> leveraging the benefit of distributed processing.
>>>
>>> I have posted the question on Stack Overflow:
>>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but
>>> reaching out on the mailing list, to get some help, or learn what I
>>> am missing.
>>>
>>> Any help would be appreciated.
>>>
>>> Thanks.
>>> - Maulik
>>>
>>

Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Posted by Maulik Gandhi <mm...@gmail.com>.
Hi Juan,

Thanks for replying.  I believe I am using correct configurations.

I have posted more details with code snippet and Data Flow job template
configuration on Stack Overflow post:
https://stackoverflow.com/q/55242684/11226631

Thanks.
- Maulik

On Tue, Mar 19, 2019 at 2:53 PM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Hi Maulik,
>
> Have you submitted your job with the correct configuration to enable
> autoscaling?
>
> --autoscalingAlgorithm=
> --maxWorkers=
>
> I am on my phone right now and can't tell if the flags name are 100%
> correct.
>
>
> Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 18:13:
>
>>
>> Maulik Gandhi <mm...@gmail.com>
>> 10:19 AM (1 hour ago)
>> to user
>> Hi Beam Community,
>>
>> I am working on Beam processing pipeline, which reads data from the
>> non-bounded and bounded source and want to leverage Beam state management
>> in my pipeline.  For putting data in Beam state, I have to transfer the
>> data in key-value (eg: KV<String, Object>.  As I am reading data from the
>> non-bounded and bounded source, I am forced to perform Window + Triggering,
>> before grouping data by key.  I have chosen to use GlobalWindows().
>>
>> I am able to kick-off the Data Flow job, which would run my Beam
>> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
>> the work, and would not scale the job to use more worker nodes, thus not
>> leveraging the benefit of distributed processing.
>>
>> I have posted the question on Stack Overflow:
>> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but
>> reaching out on the mailing list, to get some help, or learn what I
>> am missing.
>>
>> Any help would be appreciated.
>>
>> Thanks.
>> - Maulik
>>
>

Re: Scaling Beam pipeline on Data Flow - Join bounded and non-bounded source

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Hi Maulik,

Have you submitted your job with the correct configuration to enable
autoscaling?

--autoscalingAlgorithm=
--maxWorkers=

I am on my phone right now and can't tell if the flags name are 100%
correct.


Maulik Gandhi <mm...@gmail.com> schrieb am Di., 19. März 2019, 18:13:

>
> Maulik Gandhi <mm...@gmail.com>
> 10:19 AM (1 hour ago)
> to user
> Hi Beam Community,
>
> I am working on Beam processing pipeline, which reads data from the
> non-bounded and bounded source and want to leverage Beam state management
> in my pipeline.  For putting data in Beam state, I have to transfer the
> data in key-value (eg: KV<String, Object>.  As I am reading data from the
> non-bounded and bounded source, I am forced to perform Window + Triggering,
> before grouping data by key.  I have chosen to use GlobalWindows().
>
> I am able to kick-off the Data Flow job, which would run my Beam
> pipeline.  I have noticed Data Flow would use only 1 Worker node to perform
> the work, and would not scale the job to use more worker nodes, thus not
> leveraging the benefit of distributed processing.
>
> I have posted the question on Stack Overflow:
> https://stackoverflow.com/questions/55242684/join-bounded-and-non-bounded-source-data-flow-job-not-scaling but
> reaching out on the mailing list, to get some help, or learn what I
> am missing.
>
> Any help would be appreciated.
>
> Thanks.
> - Maulik
>