You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mark Striebeck <ma...@gmail.com> on 2022/01/26 21:23:21 UTC

Synchronizing after parallel stages

In our beam pipeline we are writing out the results (to MySQL) in separate
(parallel) stages. When all stages finished writing successfully, we want
to set a "final" flag on all DB tables (so, any client who uses this data
will always use the latest data, but only if it was completely processed).

I added the following to our MySQL write stage:

def finish_bundle(self):
        self._process(1)
        yield beam.utils.windowed_value.WindowedValue(
            value=f'Finished writing pcollection',
            timestamp=beam.utils.timestamp.MIN_TIMESTAMP,
            windows=[beam.transforms.window.GlobalWindow()],
        )
So, that they all yield this one value when they are done.

I collect all these results in a set and then do:

_ = (final_flags_set
       | "FlattenFinalFlags" >> beam.Flatten()
       | "CountFinalFlags" >> beam.combiners.Count.Globally()
       | "SetFinalFlag" >> beam.ParDo(self.SetFinalDoFn(env,
self.TableName()), insertion_timestamp))

The idea is that for the Count.Globally() the pipeline will need to wait
until all stages are done before the SetFinalFlag stage is started.

This works, but it adds several hours to our data pipeline (running on GCP
Dataflow). Without this, the pipeline finishes in 3-4 hours. With this, it
finishes in 6-7 hours.

a) is there any better way to do this?
b) any idea what is going on here? And how I could analyse/debug this?

Thanks
     Mark

Re: Synchronizing after parallel stages

Posted by Luke Cwik <lc...@google.com>.
a)
In the Beam Java SDK there is the wait transform[1]. It's pretty small and
it doesn't exist in Python. Consider re-implementing it in Python and if
you do also consider contributing it back to become part of the Beam Python
SDK.

b)
My guess is that finishBundle is being invoked many more times than you
think and the global combine is taking a long time from having so many
elements. You can look at the GroupByKey element/byte counters in the
Combine and also looking at the stage progress for the job[2].

1:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
2:
https://cloud.google.com/dataflow/docs/concepts/execution-details#stage_progress_for_batch_jobs

On Wed, Jan 26, 2022 at 1:23 PM Mark Striebeck <ma...@gmail.com>
wrote:

> In our beam pipeline we are writing out the results (to MySQL) in separate
> (parallel) stages. When all stages finished writing successfully, we want
> to set a "final" flag on all DB tables (so, any client who uses this data
> will always use the latest data, but only if it was completely processed).
>
> I added the following to our MySQL write stage:
>
> def finish_bundle(self):
>         self._process(1)
>         yield beam.utils.windowed_value.WindowedValue(
>             value=f'Finished writing pcollection',
>             timestamp=beam.utils.timestamp.MIN_TIMESTAMP,
>             windows=[beam.transforms.window.GlobalWindow()],
>         )
> So, that they all yield this one value when they are done.
>
> I collect all these results in a set and then do:
>
> _ = (final_flags_set
>        | "FlattenFinalFlags" >> beam.Flatten()
>        | "CountFinalFlags" >> beam.combiners.Count.Globally()
>        | "SetFinalFlag" >> beam.ParDo(self.SetFinalDoFn(env,
> self.TableName()), insertion_timestamp))
>
> The idea is that for the Count.Globally() the pipeline will need to wait
> until all stages are done before the SetFinalFlag stage is started.
>
> This works, but it adds several hours to our data pipeline (running on GCP
> Dataflow). Without this, the pipeline finishes in 3-4 hours. With this, it
> finishes in 6-7 hours.
>
> a) is there any better way to do this?
> b) any idea what is going on here? And how I could analyse/debug this?
>
> Thanks
>      Mark
>
>