You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ahmed Abualsaud via dev <de...@beam.apache.org> on 2022/09/01 20:43:04 UTC

A lesson about DoFn retries

Hi all,

TLDR: When writing IO connectors, be wary of how bundle retries can affect
the work flow.

A faulty implementation of a step in BigQuery batch loads was discovered
recently. I raised an issue [1] but also wanted to mention it here as a
potentially helpful lesson for those developing new/existing IO connectors.

For those unfamiliar with BigQueryIO file loads, a write that is too large
for a single load job [2] looks roughly something like this:


   1.

   Take input rows and write them to temporary files.
   2.

   Load temporary files to temporary BQ tables.
   3.

   Delete temporary files.
   4.

   Copy the contents of temporary tables over to the final table.
   5.

   Delete temporary tables.


The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in
processElement and 5 in finishBundle). In the case a bundle fails in the
middle of table deletion, let’s say an error occurs when deleting the nth
table, the whole bundle will retry and we will perform the copy again. But
tables 1~n have already been deleted and so we get stuck trying to copy
from non-existent sources.

The solution lies in keeping the retry of each step separate. A good
example of this is in how steps 2 and 3 are implemented [3]. They are
separated into different DoFns and step 3 can start only after step 2
completes successfully. This way, any failure in step 3 does not go back to
affect step 2.

That's all, thanks for your attention :)

Ahmed

[1] https://github.com/apache/beam/issues/22920

[2]
https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105


[3]
https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454

Re: A lesson about DoFn retries

Posted by Jan Lukavský <je...@seznam.cz>.
Yes, all portable runners use fusion, it is built into the machinery 
that translates Pipeline into protobuf representation. It is needed for 
the ability to run the pipeline efficiently, otherwise there would be 
too many calls between the runner and SDK harness. Which is why the 
translation creates fused "executable stages".

On 9/3/22 04:34, Ahmed Abualsaud via dev wrote:
> Yes you’re right, I forgot to mention that important piece of 
> information 😅 thanks for catching it.  The GBK keeps the DoFns 
> separate at pipeline execution.
>
> From what I’ve learned fusion is a Dataflow thing, do other runners do 
> it too?
>
> On Thu, Sep 1, 2022 at 6:08 PM Brian Hulette <bh...@google.com> wrote:
>
>     Thanks for sharing the learnings Ahmed!
>
>     > The solution lies in keeping the retry of each step separate. A
>     good example of this is in how steps 2 and 3 are implemented [3].
>     They are separated into different DoFns and step 3 can start only
>     after step 2 completes successfully. This way, any failure in step
>     3 does not go back to affect step 2. Is it enough just that
>     they're in different DoFns? I thought the key was that the DoFns
>     are separated by a GroupByKey, so they will be in different fused
>     stages, which are retried independently.
>     Brian
>
>     On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev
>     <de...@beam.apache.org> wrote:
>
>         Hi all,
>
>
>         TLDR: When writing IO connectors, be wary of how bundle
>         retries can affect the work flow.
>
>
>         A faulty implementation of a step in BigQuery batch loads was
>         discovered recently.I raised an issue [1] but also wanted to
>         mention it here as a potentially helpful lesson for those
>         developing new/existing IO connectors.
>
>
>         For those unfamiliar with BigQueryIO file loads, a write that
>         is too large for a single load job [2] looks roughly something
>         like this:
>
>
>         1.
>
>             Take input rows and write them to temporary files.
>
>         2.
>
>             Load temporary files to temporary BQ tables.
>
>         3.
>
>             Delete temporary files.
>
>         4.
>
>             Copy the contents of temporary tables over to the final table.
>
>         5.
>
>             Delete temporary tables.
>
>
>         The faulty part here is that steps 4 and 5 are done in the
>         same DoFn (4 in processElementand 5 in finishBundle). In the
>         case a bundle fails in the middle of table deletion, let’s say
>         an error occurs when deleting the nthtable, the whole bundle
>         will retry and we will perform the copy again. But tables 1~n
>         have already been deleted and so we get stuck trying to copy
>         from non-existent sources.
>
>
>         The solution lies in keeping the retry of each step separate.
>         A good example of this is in how steps 2 and 3 are implemented
>         [3]. They are separated into different DoFns and step 3 can
>         start only after step 2 completes successfully. This way, any
>         failure in step 3 does not go back to affect step 2.
>
>
>         That's all, thanks for your attention :)
>
>         Ahmed
>
>
>         [1] https://github.com/apache/beam/issues/22920
>         <https://github.com/apache/beam/issues/22920>
>
>         [2]
>         https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105
>         <https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105>
>
>         [3]
>         https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
>         <https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454>
>
>

Re: A lesson about DoFn retries

Posted by Ahmed Abualsaud via dev <de...@beam.apache.org>.
Yes you’re right, I forgot to mention that important piece of information
😅 thanks for catching it.  The GBK keeps the DoFns separate at pipeline
execution.

From what I’ve learned fusion is a Dataflow thing, do other runners do it
too?

On Thu, Sep 1, 2022 at 6:08 PM Brian Hulette <bh...@google.com> wrote:

> Thanks for sharing the learnings Ahmed!
>
> > The solution lies in keeping the retry of each step separate. A good
> example of this is in how steps 2 and 3 are implemented [3]. They are
> separated into different DoFns and step 3 can start only after step 2
> completes successfully. This way, any failure in step 3 does not go back to
> affect step 2. Is it enough just that they're in different DoFns? I thought
> the key was that the DoFns are separated by a GroupByKey, so they will be
> in different fused stages, which are retried independently.
>
> Brian
>
> On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev <
> dev@beam.apache.org> wrote:
>
>> Hi all,
>>
>> TLDR: When writing IO connectors, be wary of how bundle retries can
>> affect the work flow.
>>
>> A faulty implementation of a step in BigQuery batch loads was discovered
>> recently. I raised an issue [1] but also wanted to mention it here as a
>> potentially helpful lesson for those developing new/existing IO connectors.
>>
>> For those unfamiliar with BigQueryIO file loads, a write that is too
>> large for a single load job [2] looks roughly something like this:
>>
>>
>>    1.
>>
>>    Take input rows and write them to temporary files.
>>    2.
>>
>>    Load temporary files to temporary BQ tables.
>>    3.
>>
>>    Delete temporary files.
>>    4.
>>
>>    Copy the contents of temporary tables over to the final table.
>>    5.
>>
>>    Delete temporary tables.
>>
>>
>> The faulty part here is that steps 4 and 5 are done in the same DoFn (4
>> in processElement and 5 in finishBundle). In the case a bundle fails in
>> the middle of table deletion, let’s say an error occurs when deleting the n
>> th table, the whole bundle will retry and we will perform the copy
>> again. But tables 1~n have already been deleted and so we get stuck trying
>> to copy from non-existent sources.
>>
>> The solution lies in keeping the retry of each step separate. A good
>> example of this is in how steps 2 and 3 are implemented [3]. They are
>> separated into different DoFns and step 3 can start only after step 2
>> completes successfully. This way, any failure in step 3 does not go back to
>> affect step 2.
>>
>> That's all, thanks for your attention :)
>>
>> Ahmed
>>
>> [1] https://github.com/apache/beam/issues/22920
>>
>> [2]
>> https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105
>>
>>
>> [3]
>> https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
>>
>>
>>

Re: A lesson about DoFn retries

Posted by Brian Hulette via dev <de...@beam.apache.org>.
Thanks for sharing the learnings Ahmed!

> The solution lies in keeping the retry of each step separate. A good
example of this is in how steps 2 and 3 are implemented [3]. They are
separated into different DoFns and step 3 can start only after step 2
completes successfully. This way, any failure in step 3 does not go back to
affect step 2. Is it enough just that they're in different DoFns? I thought
the key was that the DoFns are separated by a GroupByKey, so they will be
in different fused stages, which are retried independently.

Brian

On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev <de...@beam.apache.org>
wrote:

> Hi all,
>
> TLDR: When writing IO connectors, be wary of how bundle retries can affect
> the work flow.
>
> A faulty implementation of a step in BigQuery batch loads was discovered
> recently. I raised an issue [1] but also wanted to mention it here as a
> potentially helpful lesson for those developing new/existing IO connectors.
>
> For those unfamiliar with BigQueryIO file loads, a write that is too large
> for a single load job [2] looks roughly something like this:
>
>
>    1.
>
>    Take input rows and write them to temporary files.
>    2.
>
>    Load temporary files to temporary BQ tables.
>    3.
>
>    Delete temporary files.
>    4.
>
>    Copy the contents of temporary tables over to the final table.
>    5.
>
>    Delete temporary tables.
>
>
> The faulty part here is that steps 4 and 5 are done in the same DoFn (4 in
> processElement and 5 in finishBundle). In the case a bundle fails in the
> middle of table deletion, let’s say an error occurs when deleting the nth
> table, the whole bundle will retry and we will perform the copy again. But
> tables 1~n have already been deleted and so we get stuck trying to copy
> from non-existent sources.
>
> The solution lies in keeping the retry of each step separate. A good
> example of this is in how steps 2 and 3 are implemented [3]. They are
> separated into different DoFns and step 3 can start only after step 2
> completes successfully. This way, any failure in step 3 does not go back to
> affect step 2.
>
> That's all, thanks for your attention :)
>
> Ahmed
>
> [1] https://github.com/apache/beam/issues/22920
>
> [2]
> https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105
>
>
> [3]
> https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
>
>
>