You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Moritz Mack <mm...@talend.com> on 2023/05/15 15:38:15 UTC

[DISCUSS] Idempotent initialization of file systems

Hi all,

I was just looking into an old issue again, SerializablePipelineOptions calling FileSystems.setDefaultPipelineOptions on deserialization [1]. This applies to various runners including Flink and Spark, but not Dataflow as far as I know.

Problem:

Current initialization of FileSystems through FileSystems.setDefaultPipelineOptions is rather problematic and prone to race conditions, especially when triggered on deserialization of SerializablePipelineOptions (see [1], [2], [3]).

Even further, there’s also an inherent risk of leaking resources that way: Without a well-defined lifecycle for file systems, existing ones are just silently replaced on every invocation of FileSystems.setDefaultPipelineOptions without adequately closing attached resources. Particularly with S3FileSystem this is troublesome as it might leak threads [4].

Possible solutions:

In the best-case pipeline options would be read only as soon as a pipeline is running, making it simple and safe to initialize file systems just once per running pipeline on each worker. Though, that’s likely not the case for some user pipelines and even runners do mutate pipeline options.

Also, as far as I can see, removing FileSystems.setDefaultPipelineOptions from deserialization in SerializablePipelineOptions is unlikely to happen any time soon as it requires a coordinated push across various runners and it’s not obvious where and when initialization is supposed to happen for each runner.

With the above in mind, it would be at least possible to safely limit repeated initialization of file systems to cases when necessary if tracking the revision of pipeline options (using monotonically increasing revision numbers for every update), see this draft PR [5].

Happy to hear your thoughts or alternative approaches on this.

Best,
Moritz

[1] https://github.com/apache/beam/issues/18430
[2] https://issues.apache.org/jira/browse/BEAM-14465
[3] https://issues.apache.org/jira/browse/BEAM-14355
[4] https://github.com/apache/beam/issues/26321
[5] https://github.com/apache/beam/pull/26694

As a recipient of an email from the Talend Group, your personal data will be processed by our systems. Please see our Privacy Notice <https://www.talend.com/privacy-policy/> for more information about our collection and use of your personal information, our security practices, and your data protection rights, including any rights you may have to object to automated-decision making or profiling we use to analyze support or marketing related communications. To manage or discontinue promotional communications, use the communication preferences portal<https://info.talend.com/emailpreferencesen.html>. To exercise your data protection rights, use the privacy request form<https://talend.my.onetrust.com/webform/ef906c5a-de41-4ea0-ba73-96c079cdd15a/b191c71d-f3cb-4a42-9815-0c3ca021704cl>. Contact us here <https://www.talend.com/contact/> or by mail to either of our co-headquarters: Talend, Inc.: 400 South El Camino Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon De Rothschild, 92150 Suresnes, France

Re: [DISCUSS] Idempotent initialization of file systems

Posted by Moritz Mack <mm...@talend.com>.
Thanks for your thoughts, Robert.

On 15.05.23, 19:23, "Robert Bradshaw via dev" <de...@beam.apache.org> wrote:
On Mon, May 15, 2023 at 8:38 AM Moritz Mack <mm...@talend.com> wrote:
>
> Hi all,
>
> I was just looking into an old issue again, SerializablePipelineOptions calling FileSystems.setDefaultPipelineOptions on deserialization [1]. This applies to various runners including Flink and Spark, but not Dataflow as far as I know.
>
> Problem:
>
> Current initialization of FileSystems through FileSystems.setDefaultPipelineOptions is rather problematic and prone to race conditions, especially when triggered on deserialization of SerializablePipelineOptions (see [1], [2], [3]).
>
> Even further, there’s also an inherent risk of leaking resources that way: Without a well-defined lifecycle for file systems, existing ones are just silently replaced on every invocation of FileSystems.setDefaultPipelineOptions without adequately closing attached resources. Particularly with S3FileSystem this is troublesome as it might leak threads [4].
>
> Possible solutions:
>
> In the best-case pipeline options would be read only as soon as a pipeline is running, making it simple and safe to initialize file systems just once per running pipeline on each worker. Though, that’s likely not the case for some user pipelines and even runners do mutate pipeline options.

Would it be possible to validate this? E.g. what if we made it illegal
to call this more than once (or, at least, without a corresponding
unset operation that could be used for tests)?

I’m not totally sure what you would like to validate? If the repeated initialization of file systems is happening due to repeated calls to FileSystems.setDefaultPipelineOptions? Or if Pipeline options could be read only as soon as the pipeline is running?

If you check [2] and [3], the AWS S3 logs give a clear hint how often the file systems got re-initialized, the logs mentioned in the tickets happen once per initialization of the S3 FS (and were eventually removed treating symptoms instead of causes):  In the case of the Go Flink integration tests that was almost 80k times, running Python wordcount on Beam it was happening > 600 times.

I spend some more time investigating if Pipeline options could be locked during execution. That requires a lot of effort changing runners, but that part is possible. However, test cases strongly suggest being able to mutate pipeline options while running is an intended feature. So I fear that is not a possible way to go, especially without knowing how users are using this.

> Also, as far as I can see, removing FileSystems.setDefaultPipelineOptions from deserialization in SerializablePipelineOptions is unlikely to happen any time soon as it requires a coordinated push across various runners and it’s not obvious where and when initialization is supposed to happen for each runner.
>
> With the above in mind, it would be at least possible to safely limit repeated initialization of file systems to cases when necessary if tracking the revision of pipeline options (using monotonically increasing revision numbers for every update), see this draft PR [5].

If the above is too hard to untangle, this could be a reasonable workaround.

> Happy to hear your thoughts or alternative approaches on this.

Thanks for taking this on.

> [1] https://urldefense.com/v3/__https://github.com/apache/beam/issues/18430__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-h3DR158$<https://urldefense.com/v3/__https:/github.com/apache/beam/issues/18430__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-h3DR158$>
>
> [2] https://urldefense.com/v3/__https://issues.apache.org/jira/browse/BEAM-14465__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-rHqRzUX$<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-14465__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-rHqRzUX$>
>
> [3] https://urldefense.com/v3/__https://issues.apache.org/jira/browse/BEAM-14355__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-gZuD0Wd$<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-14355__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-gZuD0Wd$>
>
> [4] https://urldefense.com/v3/__https://github.com/apache/beam/issues/26321__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-lHpKaWW$<https://urldefense.com/v3/__https:/github.com/apache/beam/issues/26321__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-lHpKaWW$>
>
> [5] https://urldefense.com/v3/__https://github.com/apache/beam/pull/26694__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-mzyCEi5$<https://urldefense.com/v3/__https:/github.com/apache/beam/pull/26694__;!!CiXD_PY!XxgOF5pAnsINPQPDKwXysutqmNXLKvftxsb-nQ044zqdqcSbolKU0y0S1_vWKSt26H03-mzyCEi5$>

As a recipient of an email from the Talend Group, your personal data will be processed by our systems. Please see our Privacy Notice <https://www.talend.com/privacy-policy/> for more information about our collection and use of your personal information, our security practices, and your data protection rights, including any rights you may have to object to automated-decision making or profiling we use to analyze support or marketing related communications. To manage or discontinue promotional communications, use the communication preferences portal<https://info.talend.com/emailpreferencesen.html>. To exercise your data protection rights, use the privacy request form<https://talend.my.onetrust.com/webform/ef906c5a-de41-4ea0-ba73-96c079cdd15a/b191c71d-f3cb-4a42-9815-0c3ca021704cl>. Contact us here <https://www.talend.com/contact/> or by mail to either of our co-headquarters: Talend, Inc.: 400 South El Camino Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon De Rothschild, 92150 Suresnes, France

Re: [DISCUSS] Idempotent initialization of file systems

Posted by Robert Bradshaw via dev <de...@beam.apache.org>.
On Mon, May 15, 2023 at 8:38 AM Moritz Mack <mm...@talend.com> wrote:
>
> Hi all,
>
> I was just looking into an old issue again, SerializablePipelineOptions calling FileSystems.setDefaultPipelineOptions on deserialization [1]. This applies to various runners including Flink and Spark, but not Dataflow as far as I know.
>
> Problem:
>
> Current initialization of FileSystems through FileSystems.setDefaultPipelineOptions is rather problematic and prone to race conditions, especially when triggered on deserialization of SerializablePipelineOptions (see [1], [2], [3]).
>
> Even further, there’s also an inherent risk of leaking resources that way: Without a well-defined lifecycle for file systems, existing ones are just silently replaced on every invocation of FileSystems.setDefaultPipelineOptions without adequately closing attached resources. Particularly with S3FileSystem this is troublesome as it might leak threads [4].
>
> Possible solutions:
>
> In the best-case pipeline options would be read only as soon as a pipeline is running, making it simple and safe to initialize file systems just once per running pipeline on each worker. Though, that’s likely not the case for some user pipelines and even runners do mutate pipeline options.

Would it be possible to validate this? E.g. what if we made it illegal
to call this more than once (or, at least, without a corresponding
unset operation that could be used for tests)?

> Also, as far as I can see, removing FileSystems.setDefaultPipelineOptions from deserialization in SerializablePipelineOptions is unlikely to happen any time soon as it requires a coordinated push across various runners and it’s not obvious where and when initialization is supposed to happen for each runner.
>
> With the above in mind, it would be at least possible to safely limit repeated initialization of file systems to cases when necessary if tracking the revision of pipeline options (using monotonically increasing revision numbers for every update), see this draft PR [5].

If the above is too hard to untangle, this could be a reasonable workaround.

> Happy to hear your thoughts or alternative approaches on this.

Thanks for taking this on.

> [1] https://github.com/apache/beam/issues/18430
>
> [2] https://issues.apache.org/jira/browse/BEAM-14465
>
> [3] https://issues.apache.org/jira/browse/BEAM-14355
>
> [4] https://github.com/apache/beam/issues/26321
>
> [5] https://github.com/apache/beam/pull/26694
>
> As a recipient of an email from the Talend Group, your personal data will be processed by our systems. Please see our Privacy Notice for more information about our collection and use of your personal information, our security practices, and your data protection rights, including any rights you may have to object to automated-decision making or profiling we use to analyze support or marketing related communications. To manage or discontinue promotional communications, use the communication preferences portal. To exercise your data protection rights, use the privacy request form. Contact us here or by mail to either of our co-headquarters: Talend, Inc.: 400 South El Camino Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon De Rothschild, 92150 Suresnes, France