You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2018/12/03 16:53:46 UTC

Re: Graceful shutdown of long-running Beam pipeline on Flink

There are propoosals for pipeline drain[1] and also for snapshot and
update[2] for Apache Beam. We would love contributions in this space.

1:
https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
2:
https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY

On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <wa...@dades.ca> wrote:

> Hi JC,
>
> Thanks for the quick response!
> I had hoped for an in-pipeline solution for runner portability but it is
> nice to know we're not the only ones stepping outside to interact with
> runner management. :-)
>
> Wayne
>
>
> On 2018-12-03 01:23, Juan Carlos Garcia wrote:
>
> Hi Wayne,
>
> We have the same setup and we do daily updates to our pipeline.
>
> The way we do it is using the flink tool via a Jenkins.
>
> Basically our deployment job do as follow:
>
> 1. Detect if the pipeline is running (it matches via job name)
>
> 2. If found, do a flink cancel with a savepoint (we uses hdfs for
> checkpoint / savepoint) under a given directory.
>
> 3. It uses the flink run command for the new job and specify the savepoint
> from step 2.
>
> I don't think there is any support to achieve the same from within the
> pipeline. You need to do this externally as explained above.
>
> Best regards,
> JC
>
>
> Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <wa...@dades.ca>
> geschrieben:
>
>> Hi all,
>> We have a number of Beam pipelines processing unbounded streams sourced
>> from Kafka on the Flink runner and are very happy with both the platform
>> and performance!
>>
>> The problem is with shutting down the pipelines...for version upgrades,
>> system maintenance, load management, etc. it would be nice to be able to
>> gracefully shut these down under software control but haven't been able to
>> find a way to do so. We're in good shape on checkpointing and then cleanly
>> recovering but shutdowns are all destructive to Flink or the Flink
>> TaskManager.
>>
>> Methods tried:
>>
>> 1) Calling cancel on FlinkRunnerResult returned from pipeline.run()
>> This would be our preferred method but p.run() doesn't return until
>> termination and even if it did, the runner code simply throws:
>> "throw new UnsupportedOperationException("FlinkRunnerResult does not
>> support cancel.");"
>> so this doesn't appear to be a near-term option.
>>
>> 2) Inject a "termination" message into the pipeline via Kafka
>> This does get through, but calling exit() from a stage in the pipeline
>> also terminates the Flink TaskManager.
>>
>> 3) Inject a "sleep" message, then manually restart the cluster
>> This is our current method: we pause the data at the source, flood all
>> branches of the pipeline with a "we're going down" msg so the stages can do
>> a bit of housekeeping, then hard-stop the entire environment and re-launch
>> with the new version.
>>
>> Is there a "Best Practice" method for gracefully terminating an unbounded
>> pipeline from within the pipeline or from the mainline that launches it?
>>
>> Thanks!
>> Wayne
>>
>> --
>> Wayne Collinsdades.ca Inc.mailto:wayneco@dades.ca <wa...@dades.ca>
>> cell:416-898-5137
>>
>>
> --
> Wayne Collinsdades.ca Inc.mailto:wayneco@dades.ca <wa...@dades.ca>
> cell:416-898-5137
>
>

Re: Graceful shutdown of long-running Beam pipeline on Flink

Posted by Thomas Weise <th...@apache.org>.
As noted, there is currently no support for Flink savepoints through the
Beam API.

However, it is now possible to restore from a savepoint with a Flink runner
specific pipeline option:

https://issues.apache.org/jira/browse/BEAM-5396
https://github.com/apache/beam/pull/7169#issuecomment-443283332

This was just merged - we are going to use it for the Python pipelines.

Thomas


On Mon, Dec 3, 2018 at 8:54 AM Lukasz Cwik <lc...@google.com> wrote:

> There are propoosals for pipeline drain[1] and also for snapshot and
> update[2] for Apache Beam. We would love contributions in this space.
>
> 1:
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
> 2:
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
>
> On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <wa...@dades.ca> wrote:
>
>> Hi JC,
>>
>> Thanks for the quick response!
>> I had hoped for an in-pipeline solution for runner portability but it is
>> nice to know we're not the only ones stepping outside to interact with
>> runner management. :-)
>>
>> Wayne
>>
>>
>> On 2018-12-03 01:23, Juan Carlos Garcia wrote:
>>
>> Hi Wayne,
>>
>> We have the same setup and we do daily updates to our pipeline.
>>
>> The way we do it is using the flink tool via a Jenkins.
>>
>> Basically our deployment job do as follow:
>>
>> 1. Detect if the pipeline is running (it matches via job name)
>>
>> 2. If found, do a flink cancel with a savepoint (we uses hdfs for
>> checkpoint / savepoint) under a given directory.
>>
>> 3. It uses the flink run command for the new job and specify the
>> savepoint from step 2.
>>
>> I don't think there is any support to achieve the same from within the
>> pipeline. You need to do this externally as explained above.
>>
>> Best regards,
>> JC
>>
>>
>> Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <wa...@dades.ca>
>> geschrieben:
>>
>>> Hi all,
>>> We have a number of Beam pipelines processing unbounded streams sourced
>>> from Kafka on the Flink runner and are very happy with both the platform
>>> and performance!
>>>
>>> The problem is with shutting down the pipelines...for version upgrades,
>>> system maintenance, load management, etc. it would be nice to be able to
>>> gracefully shut these down under software control but haven't been able to
>>> find a way to do so. We're in good shape on checkpointing and then cleanly
>>> recovering but shutdowns are all destructive to Flink or the Flink
>>> TaskManager.
>>>
>>> Methods tried:
>>>
>>> 1) Calling cancel on FlinkRunnerResult returned from pipeline.run()
>>> This would be our preferred method but p.run() doesn't return until
>>> termination and even if it did, the runner code simply throws:
>>> "throw new UnsupportedOperationException("FlinkRunnerResult does not
>>> support cancel.");"
>>> so this doesn't appear to be a near-term option.
>>>
>>> 2) Inject a "termination" message into the pipeline via Kafka
>>> This does get through, but calling exit() from a stage in the pipeline
>>> also terminates the Flink TaskManager.
>>>
>>> 3) Inject a "sleep" message, then manually restart the cluster
>>> This is our current method: we pause the data at the source, flood all
>>> branches of the pipeline with a "we're going down" msg so the stages can do
>>> a bit of housekeeping, then hard-stop the entire environment and re-launch
>>> with the new version.
>>>
>>> Is there a "Best Practice" method for gracefully terminating an
>>> unbounded pipeline from within the pipeline or from the mainline that
>>> launches it?
>>>
>>> Thanks!
>>> Wayne
>>>
>>> --
>>> Wayne Collinsdades.ca Inc.mailto:wayneco@dades.ca <wa...@dades.ca>
>>> cell:416-898-5137
>>>
>>>
>> --
>> Wayne Collinsdades.ca Inc.mailto:wayneco@dades.ca <wa...@dades.ca>
>> cell:416-898-5137
>>
>>

Re: Graceful shutdown of long-running Beam pipeline on Flink

Posted by Thomas Weise <th...@apache.org>.
As noted, there is currently no support for Flink savepoints through the
Beam API.

However, it is now possible to restore from a savepoint with a Flink runner
specific pipeline option:

https://issues.apache.org/jira/browse/BEAM-5396
https://github.com/apache/beam/pull/7169#issuecomment-443283332

This was just merged - we are going to use it for the Python pipelines.

Thomas


On Mon, Dec 3, 2018 at 8:54 AM Lukasz Cwik <lc...@google.com> wrote:

> There are propoosals for pipeline drain[1] and also for snapshot and
> update[2] for Apache Beam. We would love contributions in this space.
>
> 1:
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
> 2:
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
>
> On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <wa...@dades.ca> wrote:
>
>> Hi JC,
>>
>> Thanks for the quick response!
>> I had hoped for an in-pipeline solution for runner portability but it is
>> nice to know we're not the only ones stepping outside to interact with
>> runner management. :-)
>>
>> Wayne
>>
>>
>> On 2018-12-03 01:23, Juan Carlos Garcia wrote:
>>
>> Hi Wayne,
>>
>> We have the same setup and we do daily updates to our pipeline.
>>
>> The way we do it is using the flink tool via a Jenkins.
>>
>> Basically our deployment job do as follow:
>>
>> 1. Detect if the pipeline is running (it matches via job name)
>>
>> 2. If found, do a flink cancel with a savepoint (we uses hdfs for
>> checkpoint / savepoint) under a given directory.
>>
>> 3. It uses the flink run command for the new job and specify the
>> savepoint from step 2.
>>
>> I don't think there is any support to achieve the same from within the
>> pipeline. You need to do this externally as explained above.
>>
>> Best regards,
>> JC
>>
>>
>> Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <wa...@dades.ca>
>> geschrieben:
>>
>>> Hi all,
>>> We have a number of Beam pipelines processing unbounded streams sourced
>>> from Kafka on the Flink runner and are very happy with both the platform
>>> and performance!
>>>
>>> The problem is with shutting down the pipelines...for version upgrades,
>>> system maintenance, load management, etc. it would be nice to be able to
>>> gracefully shut these down under software control but haven't been able to
>>> find a way to do so. We're in good shape on checkpointing and then cleanly
>>> recovering but shutdowns are all destructive to Flink or the Flink
>>> TaskManager.
>>>
>>> Methods tried:
>>>
>>> 1) Calling cancel on FlinkRunnerResult returned from pipeline.run()
>>> This would be our preferred method but p.run() doesn't return until
>>> termination and even if it did, the runner code simply throws:
>>> "throw new UnsupportedOperationException("FlinkRunnerResult does not
>>> support cancel.");"
>>> so this doesn't appear to be a near-term option.
>>>
>>> 2) Inject a "termination" message into the pipeline via Kafka
>>> This does get through, but calling exit() from a stage in the pipeline
>>> also terminates the Flink TaskManager.
>>>
>>> 3) Inject a "sleep" message, then manually restart the cluster
>>> This is our current method: we pause the data at the source, flood all
>>> branches of the pipeline with a "we're going down" msg so the stages can do
>>> a bit of housekeeping, then hard-stop the entire environment and re-launch
>>> with the new version.
>>>
>>> Is there a "Best Practice" method for gracefully terminating an
>>> unbounded pipeline from within the pipeline or from the mainline that
>>> launches it?
>>>
>>> Thanks!
>>> Wayne
>>>
>>> --
>>> Wayne Collinsdades.ca Inc.mailto:wayneco@dades.ca <wa...@dades.ca>
>>> cell:416-898-5137
>>>
>>>
>> --
>> Wayne Collinsdades.ca Inc.mailto:wayneco@dades.ca <wa...@dades.ca>
>> cell:416-898-5137
>>
>>

Re: Graceful shutdown of long-running Beam pipeline on Flink

Posted by Wayne Collins <wa...@dades.ca>.
Excellent proposals!
They go beyond our requirements but would provide a great foundation for
runner-agnostic life cycle management of pipelines.

Will jump into discussion on the other side...
Thanks!
Wayne


On 2018-12-03 11:53, Lukasz Cwik wrote:
> There are propoosals for pipeline drain[1] and also for snapshot and
> update[2] for Apache Beam. We would love contributions in this space.
>
> 1: https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
> 2: https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
>
> On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <wayneco@dades.ca
> <ma...@dades.ca>> wrote:
>
>     Hi JC,
>
>     Thanks for the quick response!
>     I had hoped for an in-pipeline solution for runner portability but
>     it is nice to know we're not the only ones stepping outside to
>     interact with runner management. :-)
>
>     Wayne
>
>
>     On 2018-12-03 01:23, Juan Carlos Garcia wrote:
>>     Hi Wayne, 
>>
>>     We have the same setup and we do daily updates to our pipeline.
>>
>>     The way we do it is using the flink tool via a Jenkins. 
>>
>>     Basically our deployment job do as follow:
>>
>>     1. Detect if the pipeline is running (it matches via job name) 
>>
>>     2. If found, do a flink cancel with a savepoint (we uses hdfs for
>>     checkpoint / savepoint) under a given directory. 
>>
>>     3. It uses the flink run command for the new job and specify the
>>     savepoint from step 2.
>>
>>     I don't think there is any support to achieve the same from
>>     within the pipeline. You need to do this externally as explained
>>     above. 
>>
>>     Best regards, 
>>     JC
>>
>>
>>     Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <wayneco@dades.ca
>>     <ma...@dades.ca>> geschrieben:
>>
>>         Hi all,
>>         We have a number of Beam pipelines processing unbounded
>>         streams sourced from Kafka on the Flink runner and are very
>>         happy with both the platform and performance!
>>
>>         The problem is with shutting down the pipelines...for version
>>         upgrades, system maintenance, load management, etc. it would
>>         be nice to be able to gracefully shut these down under
>>         software control but haven't been able to find a way to do
>>         so. We're in good shape on checkpointing and then cleanly
>>         recovering but shutdowns are all destructive to Flink or the
>>         Flink TaskManager.
>>
>>         Methods tried:
>>
>>         1) Calling cancel on FlinkRunnerResult returned from
>>         pipeline.run()
>>         This would be our preferred method but p.run() doesn't return
>>         until termination and even if it did, the runner code simply
>>         throws:
>>         "throw new UnsupportedOperationException("FlinkRunnerResult
>>         does not support cancel.");"
>>         so this doesn't appear to be a near-term option.
>>
>>         2) Inject a "termination" message into the pipeline via Kafka
>>         This does get through, but calling exit() from a stage in the
>>         pipeline also terminates the Flink TaskManager.
>>
>>         3) Inject a "sleep" message, then manually restart the cluster
>>         This is our current method: we pause the data at the source,
>>         flood all branches of the pipeline with a "we're going down"
>>         msg so the stages can do a bit of housekeeping, then
>>         hard-stop the entire environment and re-launch with the new
>>         version.
>>
>>         Is there a "Best Practice" method for gracefully terminating
>>         an unbounded pipeline from within the pipeline or from the
>>         mainline that launches it?
>>
>>         Thanks!
>>         Wayne
>>
>>         -- 
>>         Wayne Collins
>>         dades.ca <http://dades.ca> Inc.
>>         mailto:wayneco@dades.ca
>>         cell:416-898-5137
>>
>
>     -- 
>     Wayne Collins
>     dades.ca <http://dades.ca> Inc.
>     mailto:wayneco@dades.ca
>     cell:416-898-5137
>

-- 
Wayne Collins
dades.ca Inc.
mailto:wayneco@dades.ca
cell:416-898-5137


Re: Graceful shutdown of long-running Beam pipeline on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Thank you for sharing these, Lukasz!

Great question, Wayne!

As for pipeline shutdown, Flink users typically take a snapshot and then 
cancel the pipeline with Flink tools.

The Beam tooling needs to be improved to support cancelling as well. If 
snapshotting is enabled, the Beam job could also be restored from a 
snapshot instead of explicitly taking a savepoint.

Related issue for cancelling: 
https://issues.apache.org/jira/browse/BEAM-593 I think we should address 
this soon for the next release.

Thanks,
Max


On 03.12.18 17:53, Lukasz Cwik wrote:
> There are propoosals for pipeline drain[1] and also for snapshot and 
> update[2] for Apache Beam. We would love contributions in this space.
> 
> 1: 
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
> 2: 
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
> 
> On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <wayneco@dades.ca 
> <ma...@dades.ca>> wrote:
> 
>     Hi JC,
> 
>     Thanks for the quick response!
>     I had hoped for an in-pipeline solution for runner portability but
>     it is nice to know we're not the only ones stepping outside to
>     interact with runner management. :-)
> 
>     Wayne
> 
> 
>     On 2018-12-03 01:23, Juan Carlos Garcia wrote:
>>     Hi Wayne,
>>
>>     We have the same setup and we do daily updates to our pipeline.
>>
>>     The way we do it is using the flink tool via a Jenkins.
>>
>>     Basically our deployment job do as follow:
>>
>>     1. Detect if the pipeline is running (it matches via job name)
>>
>>     2. If found, do a flink cancel with a savepoint (we uses hdfs for
>>     checkpoint / savepoint) under a given directory.
>>
>>     3. It uses the flink run command for the new job and specify the
>>     savepoint from step 2.
>>
>>     I don't think there is any support to achieve the same from within
>>     the pipeline. You need to do this externally as explained above.
>>
>>     Best regards,
>>     JC
>>
>>
>>     Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <wayneco@dades.ca
>>     <ma...@dades.ca>> geschrieben:
>>
>>         Hi all,
>>         We have a number of Beam pipelines processing unbounded
>>         streams sourced from Kafka on the Flink runner and are very
>>         happy with both the platform and performance!
>>
>>         The problem is with shutting down the pipelines...for version
>>         upgrades, system maintenance, load management, etc. it would
>>         be nice to be able to gracefully shut these down under
>>         software control but haven't been able to find a way to do so.
>>         We're in good shape on checkpointing and then cleanly
>>         recovering but shutdowns are all destructive to Flink or the
>>         Flink TaskManager.
>>
>>         Methods tried:
>>
>>         1) Calling cancel on FlinkRunnerResult returned from
>>         pipeline.run()
>>         This would be our preferred method but p.run() doesn't return
>>         until termination and even if it did, the runner code simply
>>         throws:
>>         "throw new UnsupportedOperationException("FlinkRunnerResult
>>         does not support cancel.");"
>>         so this doesn't appear to be a near-term option.
>>
>>         2) Inject a "termination" message into the pipeline via Kafka
>>         This does get through, but calling exit() from a stage in the
>>         pipeline also terminates the Flink TaskManager.
>>
>>         3) Inject a "sleep" message, then manually restart the cluster
>>         This is our current method: we pause the data at the source,
>>         flood all branches of the pipeline with a "we're going down"
>>         msg so the stages can do a bit of housekeeping, then hard-stop
>>         the entire environment and re-launch with the new version.
>>
>>         Is there a "Best Practice" method for gracefully terminating
>>         an unbounded pipeline from within the pipeline or from the
>>         mainline that launches it?
>>
>>         Thanks!
>>         Wayne
>>
>>         -- 
>>         Wayne Collins
>>         dades.ca  <http://dades.ca>  Inc.
>>         mailto:wayneco@dades.ca
>>         cell:416-898-5137
>>
> 
>     -- 
>     Wayne Collins
>     dades.ca  <http://dades.ca>  Inc.
>     mailto:wayneco@dades.ca
>     cell:416-898-5137
> 

Re: Graceful shutdown of long-running Beam pipeline on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Thank you for sharing these, Lukasz!

Great question, Wayne!

As for pipeline shutdown, Flink users typically take a snapshot and then 
cancel the pipeline with Flink tools.

The Beam tooling needs to be improved to support cancelling as well. If 
snapshotting is enabled, the Beam job could also be restored from a 
snapshot instead of explicitly taking a savepoint.

Related issue for cancelling: 
https://issues.apache.org/jira/browse/BEAM-593 I think we should address 
this soon for the next release.

Thanks,
Max


On 03.12.18 17:53, Lukasz Cwik wrote:
> There are propoosals for pipeline drain[1] and also for snapshot and 
> update[2] for Apache Beam. We would love contributions in this space.
> 
> 1: 
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
> 2: 
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
> 
> On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <wayneco@dades.ca 
> <ma...@dades.ca>> wrote:
> 
>     Hi JC,
> 
>     Thanks for the quick response!
>     I had hoped for an in-pipeline solution for runner portability but
>     it is nice to know we're not the only ones stepping outside to
>     interact with runner management. :-)
> 
>     Wayne
> 
> 
>     On 2018-12-03 01:23, Juan Carlos Garcia wrote:
>>     Hi Wayne,
>>
>>     We have the same setup and we do daily updates to our pipeline.
>>
>>     The way we do it is using the flink tool via a Jenkins.
>>
>>     Basically our deployment job do as follow:
>>
>>     1. Detect if the pipeline is running (it matches via job name)
>>
>>     2. If found, do a flink cancel with a savepoint (we uses hdfs for
>>     checkpoint / savepoint) under a given directory.
>>
>>     3. It uses the flink run command for the new job and specify the
>>     savepoint from step 2.
>>
>>     I don't think there is any support to achieve the same from within
>>     the pipeline. You need to do this externally as explained above.
>>
>>     Best regards,
>>     JC
>>
>>
>>     Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <wayneco@dades.ca
>>     <ma...@dades.ca>> geschrieben:
>>
>>         Hi all,
>>         We have a number of Beam pipelines processing unbounded
>>         streams sourced from Kafka on the Flink runner and are very
>>         happy with both the platform and performance!
>>
>>         The problem is with shutting down the pipelines...for version
>>         upgrades, system maintenance, load management, etc. it would
>>         be nice to be able to gracefully shut these down under
>>         software control but haven't been able to find a way to do so.
>>         We're in good shape on checkpointing and then cleanly
>>         recovering but shutdowns are all destructive to Flink or the
>>         Flink TaskManager.
>>
>>         Methods tried:
>>
>>         1) Calling cancel on FlinkRunnerResult returned from
>>         pipeline.run()
>>         This would be our preferred method but p.run() doesn't return
>>         until termination and even if it did, the runner code simply
>>         throws:
>>         "throw new UnsupportedOperationException("FlinkRunnerResult
>>         does not support cancel.");"
>>         so this doesn't appear to be a near-term option.
>>
>>         2) Inject a "termination" message into the pipeline via Kafka
>>         This does get through, but calling exit() from a stage in the
>>         pipeline also terminates the Flink TaskManager.
>>
>>         3) Inject a "sleep" message, then manually restart the cluster
>>         This is our current method: we pause the data at the source,
>>         flood all branches of the pipeline with a "we're going down"
>>         msg so the stages can do a bit of housekeeping, then hard-stop
>>         the entire environment and re-launch with the new version.
>>
>>         Is there a "Best Practice" method for gracefully terminating
>>         an unbounded pipeline from within the pipeline or from the
>>         mainline that launches it?
>>
>>         Thanks!
>>         Wayne
>>
>>         -- 
>>         Wayne Collins
>>         dades.ca  <http://dades.ca>  Inc.
>>         mailto:wayneco@dades.ca
>>         cell:416-898-5137
>>
> 
>     -- 
>     Wayne Collins
>     dades.ca  <http://dades.ca>  Inc.
>     mailto:wayneco@dades.ca
>     cell:416-898-5137
>