You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Romain Manni-Bucau <rm...@gmail.com> on 2018/08/01 05:03:53 UTC

Re: Cleanup resources on pipeline cancelation

Hi Andrew,

IIRC sources should clean up their resources per method since they dont
have a better lifecycle. Readers can create anything longer and release it
at close time.


Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a écrit :

> Some of our IOs create external resources that need to be cleaned up when
> a pipeline is terminated. It looks like the
> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
> there is no call for cleanup. For example, PubsubIO creates a Pubsub
> subcription in createReader()/split() and it should be deleted at shutdown.
> Does anyone have ideas on how I might make this happen?
>
> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the
> PubSub specific issue.)
>
> Andrew
>

Re: Cleanup resources on pipeline cancelation

Posted by Reuven Lax <re...@google.com>.
Romain is correct, you would need some global reference counting here to
use the close() callback. The problem is that the input subscription is a
pipeline-wide resource, it's not a per-reader resource.

On Thu, Aug 2, 2018 at 10:07 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

>
>
> Le jeu. 2 août 2018 18:32, Andrew Pilloud <ap...@google.com> a écrit :
>
>> The subscriptions I want to clean up are ones that are implicitly created
>> by the PubsubIO. These subscriptions are created then leaked, they aren't
>> reused in future pipelines so the data loss issues are moot here. I agree
>> that we don't want to tear down user supplied subscriptions.
>>
>> I've been doing some more digging, it looks like the Source.Reader
>> interface has a close() callback
>> <https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->.
>> Is that a place I might be able to do cleanup? (It appears this is hooked
>> up to RichFunction.close() callback on Flink and called from the Direct
>> Runner but possibly not called from other runners.)
>>
>
>
> It is after the parallelization (you can have N>1 readers in parallel) so
> if you have some global reference counting to cleanup once yes, otherwise
> it will be hard.
>
>
>> Andrew
>>
>> On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Actually I think SDF is the right way to fix this. The SDF can set a
>>> timer at infinity (which will only fires when the pipeline shuts down). I
>>> believe that SDF support is being added to the portability layer now, so
>>> eventually all portable runners will support it, and maybe we can live with
>>> the status quo until then.
>>>
>>> On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rm...@gmail.com>
>>> wrote:
>>>
>>>> I agree Reuven. But leaking in a source doesnt give any guarantee
>>>> regarding the execution since it will depends the runner and current API
>>>> will not provide you that feature. Using a reference counting state can
>>>> work better but would require a sdf migration (and will hit runner support
>>>> issues :().
>>>>
>>>>
>>>> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit :
>>>>
>>>>> Hi Romain,
>>>>>
>>>>> Andrew's example actually wouldn't work for that. With Google Cloud
>>>>> Pub/Sub (the example source he referenced), if there is no subscription to
>>>>> a topic, all publishes to that topic are dropped on the floor; if you don't
>>>>> want to lose data, your are expected to keep the subscription around
>>>>> continuously. In this example, leaking a subscription is probably
>>>>> preferable to losing date (especially since Pub/Sub itself garbage collects
>>>>> subscriptions that have been inactive for a long time).
>>>>>
>>>>> The answer might be that Beam does not have a good lifecycle story
>>>>> here, and something needs to be built.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <
>>>>> rmannibucau@gmail.com> wrote:
>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> IIRC sources should clean up their resources per method since they
>>>>>> dont have a better lifecycle. Readers can create anything longer and
>>>>>> release it at close time.
>>>>>>
>>>>>>
>>>>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Some of our IOs create external resources that need to be cleaned up
>>>>>>> when a pipeline is terminated. It looks like the
>>>>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>>>>>>> there is no call for cleanup. For example, PubsubIO creates a
>>>>>>> Pubsub subcription in createReader()/split() and it should be deleted at
>>>>>>> shutdown. Does anyone have ideas on how I might make this happen?
>>>>>>>
>>>>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking
>>>>>>> the PubSub specific issue.)
>>>>>>>
>>>>>>> Andrew
>>>>>>>
>>>>>>

Re: Cleanup resources on pipeline cancelation

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Le jeu. 2 août 2018 18:32, Andrew Pilloud <ap...@google.com> a écrit :

> The subscriptions I want to clean up are ones that are implicitly created
> by the PubsubIO. These subscriptions are created then leaked, they aren't
> reused in future pipelines so the data loss issues are moot here. I agree
> that we don't want to tear down user supplied subscriptions.
>
> I've been doing some more digging, it looks like the Source.Reader
> interface has a close() callback
> <https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->.
> Is that a place I might be able to do cleanup? (It appears this is hooked
> up to RichFunction.close() callback on Flink and called from the Direct
> Runner but possibly not called from other runners.)
>


It is after the parallelization (you can have N>1 readers in parallel) so
if you have some global reference counting to cleanup once yes, otherwise
it will be hard.


> Andrew
>
> On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <re...@google.com> wrote:
>
>> Actually I think SDF is the right way to fix this. The SDF can set a
>> timer at infinity (which will only fires when the pipeline shuts down). I
>> believe that SDF support is being added to the portability layer now, so
>> eventually all portable runners will support it, and maybe we can live with
>> the status quo until then.
>>
>> On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> I agree Reuven. But leaking in a source doesnt give any guarantee
>>> regarding the execution since it will depends the runner and current API
>>> will not provide you that feature. Using a reference counting state can
>>> work better but would require a sdf migration (and will hit runner support
>>> issues :().
>>>
>>>
>>> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit :
>>>
>>>> Hi Romain,
>>>>
>>>> Andrew's example actually wouldn't work for that. With Google Cloud
>>>> Pub/Sub (the example source he referenced), if there is no subscription to
>>>> a topic, all publishes to that topic are dropped on the floor; if you don't
>>>> want to lose data, your are expected to keep the subscription around
>>>> continuously. In this example, leaking a subscription is probably
>>>> preferable to losing date (especially since Pub/Sub itself garbage collects
>>>> subscriptions that have been inactive for a long time).
>>>>
>>>> The answer might be that Beam does not have a good lifecycle story
>>>> here, and something needs to be built.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> IIRC sources should clean up their resources per method since they
>>>>> dont have a better lifecycle. Readers can create anything longer and
>>>>> release it at close time.
>>>>>
>>>>>
>>>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a
>>>>> écrit :
>>>>>
>>>>>> Some of our IOs create external resources that need to be cleaned up
>>>>>> when a pipeline is terminated. It looks like the
>>>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>>>>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>>>>>> subcription in createReader()/split() and it should be deleted at shutdown.
>>>>>> Does anyone have ideas on how I might make this happen?
>>>>>>
>>>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking
>>>>>> the PubSub specific issue.)
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>

Re: Cleanup resources on pipeline cancelation

Posted by Andrew Pilloud <ap...@google.com>.
The subscriptions I want to clean up are ones that are implicitly created
by the PubsubIO. These subscriptions are created then leaked, they aren't
reused in future pipelines so the data loss issues are moot here. I agree
that we don't want to tear down user supplied subscriptions.

I've been doing some more digging, it looks like the Source.Reader
interface has a close() callback
<https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->.
Is that a place I might be able to do cleanup? (It appears this is hooked
up to RichFunction.close() callback on Flink and called from the Direct
Runner but possibly not called from other runners.)

Andrew

On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <re...@google.com> wrote:

> Actually I think SDF is the right way to fix this. The SDF can set a timer
> at infinity (which will only fires when the pipeline shuts down). I believe
> that SDF support is being added to the portability layer now, so eventually
> all portable runners will support it, and maybe we can live with the status
> quo until then.
>
> On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> I agree Reuven. But leaking in a source doesnt give any guarantee
>> regarding the execution since it will depends the runner and current API
>> will not provide you that feature. Using a reference counting state can
>> work better but would require a sdf migration (and will hit runner support
>> issues :().
>>
>>
>> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit :
>>
>>> Hi Romain,
>>>
>>> Andrew's example actually wouldn't work for that. With Google Cloud
>>> Pub/Sub (the example source he referenced), if there is no subscription to
>>> a topic, all publishes to that topic are dropped on the floor; if you don't
>>> want to lose data, your are expected to keep the subscription around
>>> continuously. In this example, leaking a subscription is probably
>>> preferable to losing date (especially since Pub/Sub itself garbage collects
>>> subscriptions that have been inactive for a long time).
>>>
>>> The answer might be that Beam does not have a good lifecycle story here,
>>> and something needs to be built.
>>>
>>> Reuven
>>>
>>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <
>>> rmannibucau@gmail.com> wrote:
>>>
>>>> Hi Andrew,
>>>>
>>>> IIRC sources should clean up their resources per method since they dont
>>>> have a better lifecycle. Readers can create anything longer and release it
>>>> at close time.
>>>>
>>>>
>>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a
>>>> écrit :
>>>>
>>>>> Some of our IOs create external resources that need to be cleaned up
>>>>> when a pipeline is terminated. It looks like the
>>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>>>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>>>>> subcription in createReader()/split() and it should be deleted at shutdown.
>>>>> Does anyone have ideas on how I might make this happen?
>>>>>
>>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the
>>>>> PubSub specific issue.)
>>>>>
>>>>> Andrew
>>>>>
>>>>

Re: Cleanup resources on pipeline cancelation

Posted by Reuven Lax <re...@google.com>.
Actually I think SDF is the right way to fix this. The SDF can set a timer
at infinity (which will only fires when the pipeline shuts down). I believe
that SDF support is being added to the portability layer now, so eventually
all portable runners will support it, and maybe we can live with the status
quo until then.

On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> I agree Reuven. But leaking in a source doesnt give any guarantee
> regarding the execution since it will depends the runner and current API
> will not provide you that feature. Using a reference counting state can
> work better but would require a sdf migration (and will hit runner support
> issues :().
>
>
> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit :
>
>> Hi Romain,
>>
>> Andrew's example actually wouldn't work for that. With Google Cloud
>> Pub/Sub (the example source he referenced), if there is no subscription to
>> a topic, all publishes to that topic are dropped on the floor; if you don't
>> want to lose data, your are expected to keep the subscription around
>> continuously. In this example, leaking a subscription is probably
>> preferable to losing date (especially since Pub/Sub itself garbage collects
>> subscriptions that have been inactive for a long time).
>>
>> The answer might be that Beam does not have a good lifecycle story here,
>> and something needs to be built.
>>
>> Reuven
>>
>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <
>> rmannibucau@gmail.com> wrote:
>>
>>> Hi Andrew,
>>>
>>> IIRC sources should clean up their resources per method since they dont
>>> have a better lifecycle. Readers can create anything longer and release it
>>> at close time.
>>>
>>>
>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a
>>> écrit :
>>>
>>>> Some of our IOs create external resources that need to be cleaned up
>>>> when a pipeline is terminated. It looks like the
>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>>>> subcription in createReader()/split() and it should be deleted at shutdown.
>>>> Does anyone have ideas on how I might make this happen?
>>>>
>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the
>>>> PubSub specific issue.)
>>>>
>>>> Andrew
>>>>
>>>

Re: Cleanup resources on pipeline cancelation

Posted by Romain Manni-Bucau <rm...@gmail.com>.
I agree Reuven. But leaking in a source doesnt give any guarantee regarding
the execution since it will depends the runner and current API will not
provide you that feature. Using a reference counting state can work better
but would require a sdf migration (and will hit runner support issues :().


Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit :

> Hi Romain,
>
> Andrew's example actually wouldn't work for that. With Google Cloud
> Pub/Sub (the example source he referenced), if there is no subscription to
> a topic, all publishes to that topic are dropped on the floor; if you don't
> want to lose data, your are expected to keep the subscription around
> continuously. In this example, leaking a subscription is probably
> preferable to losing date (especially since Pub/Sub itself garbage collects
> subscriptions that have been inactive for a long time).
>
> The answer might be that Beam does not have a good lifecycle story here,
> and something needs to be built.
>
> Reuven
>
> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> Hi Andrew,
>>
>> IIRC sources should clean up their resources per method since they dont
>> have a better lifecycle. Readers can create anything longer and release it
>> at close time.
>>
>>
>> Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a écrit :
>>
>>> Some of our IOs create external resources that need to be cleaned up
>>> when a pipeline is terminated. It looks like the
>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>>> subcription in createReader()/split() and it should be deleted at shutdown.
>>> Does anyone have ideas on how I might make this happen?
>>>
>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the
>>> PubSub specific issue.)
>>>
>>> Andrew
>>>
>>

Re: Cleanup resources on pipeline cancelation

Posted by Reuven Lax <re...@google.com>.
Hi Romain,

Andrew's example actually wouldn't work for that. With Google Cloud Pub/Sub
(the example source he referenced), if there is no subscription to a topic,
all publishes to that topic are dropped on the floor; if you don't want to
lose data, your are expected to keep the subscription around continuously.
In this example, leaking a subscription is probably preferable to losing
date (especially since Pub/Sub itself garbage collects subscriptions that
have been inactive for a long time).

The answer might be that Beam does not have a good lifecycle story here,
and something needs to be built.

Reuven

On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Hi Andrew,
>
> IIRC sources should clean up their resources per method since they dont
> have a better lifecycle. Readers can create anything longer and release it
> at close time.
>
>
> Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a écrit :
>
>> Some of our IOs create external resources that need to be cleaned up when
>> a pipeline is terminated. It looks like the
>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>> subcription in createReader()/split() and it should be deleted at shutdown.
>> Does anyone have ideas on how I might make this happen?
>>
>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the
>> PubSub specific issue.)
>>
>> Andrew
>>
>

Re: Cleanup resources on pipeline cancelation

Posted by Chamikara Jayalath <ch...@google.com>.
Hi Andrew,

Beam currently does not have a generalized cleanup story so answer usually
has been ad-hoc. For bounded source we can (1) cleanup any resources
created for splitting after splitting (2) cleanup resources created for a
given reader when the reader exists (last advaince() call).

I'm not sure what the proper solution for UnboundedSources is and it might
not even make sense to to add cleanup logic to an unbounded source that is
never expected to end. We might need something more generic (for example, a
mechanism to collect temporary resources and delete such resources at
pipeline termination).

Thanks,
Cham


On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Hi Andrew,
>
> IIRC sources should clean up their resources per method since they dont
> have a better lifecycle. Readers can create anything longer and release it
> at close time.
>
>
> Le mer. 1 août 2018 00:31, Andrew Pilloud <ap...@google.com> a écrit :
>
>> Some of our IOs create external resources that need to be cleaned up when
>> a pipeline is terminated. It looks like the
>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, but
>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>> subcription in createReader()/split() and it should be deleted at shutdown.
>> Does anyone have ideas on how I might make this happen?
>>
>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the
>> PubSub specific issue.)
>>
>> Andrew
>>
>