You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Adlae D'Orazio <ad...@liminalinsights.com> on 2023/07/06 20:07:12 UTC

[Question] check if pipeline is still running in pipeline runner

Hello,


I am using an Apache Flink cluster to run a streaming pipeline that I've
created using Apache Beam. This streaming pipeline should be the only one
of its type running on the Flink cluster, and I need some help with how to
ensure that is the case.


A Dockerized pipeline runner program submits the streaming pipeline, and if
the pipeline exits (i.e. because of an error), then the pipeline runner
program exits and is re-run, so that the pipeline is submitted again and
continues running.


The problem I am running into is that if the pipeline runner program exits,
but the streaming pipeline is still running (i.e. because the job server
went down and came back up), then I need to check in the pipeline runner
program whether or not the pipeline is still running, or if it has gone
down.


My first thought was to try to create a specific job name that would be
stored in Flink's REST API, and then to see if the job was already running,
I could query the REST API for that name. I'm having trouble doing this. I
seem to be able to set a job name in Beam, but that job name does not seem
to be accessible via Flink’s REST API once the pipeline is run using Flink.
From researching this problem, I found this
<https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>
method, which initializes an AppName. This seems promising to me, but it is
written in Java and I am looking to do it in Python.


Is there a way to specify the Flink job name via the Beam Python SDK? Or is
there a simpler way to know that a particular Beam pipeline is running, and
therefore not resubmit it?


Please let me know if you have any suggestions - either about how to
execute the approaches I've described or if there's a simpler solution that
I am overlooking. Thank you for your help!


Best,

Adlae D'Orazio

Re: [Question] check if pipeline is still running in pipeline runner

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

when JM goes down, it should be brought up (if configured as HA, running 
on k8s, ...), and it should recover all running jobs. If this does not 
happen then it means that:

  a) either the JM is not in HA configuration, or

  b) it is unable to recover after failure, which typically means that 
there is some problem reading jobs metadata from external storage, or 
some other persistent error

Either way, monitoring the job for simple presence in the JM might not 
be sufficient. What I personally prefer is monitoring the lag of job's 
output watermark behind current processing time. It is possible to do 
this on Beam's application level, by using a looping event-time timer, 
that outputs the current input watermark each T seconds. This is then 
able to capture even the case, when the job is running, but is unable to 
do any progress.

Best,

  Jan

On 7/7/23 23:34, Adlae D'Orazio wrote:
> Hi Jan,
>
> Thank you for your response! Apologies that this wasn't clear, but 
> we're actually looking at what would//happen if the job server /were 
> /to go down. So what we are more interested in is understanding /how/ 
> to actually monitor that the job is running. We won't know the job id 
> so we can't use that to query the REST API. Like I said, we were 
> looking into that method that initializes an AppName, but that was 
> written in Java. What do you think we should do? Thank you so much for 
> your help!
>
> Best,
>
> Adlae D'Orazio
>
> On Fri, Jul 7, 2023 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>     Hi,
>
>     if I understand correctly, you have a 'program runner' (sometimes
>     called a driver), which is supposed to be long-running and
>     watching if the submitted Pipeline runs or not. If not, then the
>     driver resubmits the job. If my understanding is correct, I would
>     suggest looking into the reasons why the pipeline terminates in
>     the first place. Flink is designed to ensure that after job
>     submission it is fault-tolerant for both application-level errors
>     (e.g. transient user code errors, external dependencies failures,
>     etc) and the Flink runtime itself (failures of taskmanagers or
>     jobmanager). The most often case when this does not work is some
>     sort of misconfiguration (typically inability to restore jobs
>     after failure of jobmanager). Having said that it is good idea to
>     _monitor_ that your job runs (and ideally that it makes progress,
>     because the pure fact that job 'runs' does not imply that), but it
>     should require manual action in case the job is permanently gone.
>     Simple resubmission of the job is not what I would expect to work
>     well.
>
>     Best,
>
>      Jan
>
>     On 7/6/23 22:07, Adlae D'Orazio wrote:
>>
>>     Hello,
>>
>>
>>     I am using an Apache Flink cluster to run a streaming pipeline
>>     that I've created using Apache Beam. This streaming pipeline
>>     should be the only one of its type running on the Flink cluster,
>>     and I need some help with how to ensure that is the case.
>>
>>
>>     A Dockerized pipeline runner program submits the streaming
>>     pipeline, and if the pipeline exits (i.e. because of an error),
>>     then the pipeline runner program exits and is re-run, so that the
>>     pipeline is submitted again and continues running.
>>
>>
>>     The problem I am running into is that if the pipeline runner
>>     program exits, but the streaming pipeline is still running (i.e.
>>     because the job server went down and came back up), then I need
>>     to check in the pipeline runner program whether or not the
>>     pipeline is still running, or if it has gone down.
>>
>>
>>     My first thought was to try to create a specific job name that
>>     would be stored in Flink's REST API, and then to see if the job
>>     was already running, I could query the REST API for that name.
>>     I'm having trouble doing this. I seem to be able to set a job
>>     name in Beam, but that job name does not seem to be accessible
>>     via Flink’s REST API once the pipeline is run using Flink. From
>>     researching this problem, I found this
>>     <https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>method,
>>     which initializes an AppName. This seems promising to me, but it
>>     is written in Java and I am looking to do it in Python.
>>
>>
>>     Is there a way to specify the Flink job name via the Beam Python
>>     SDK? Or is there a simpler way to know that a particular Beam
>>     pipeline is running, and therefore not resubmit it?
>>
>>
>>     Please let me know if you have any suggestions - either about how
>>     to execute the approaches I've described or if there's a simpler
>>     solution that I am overlooking. Thank you for your help!
>>
>>
>>     Best,
>>
>>     Adlae D'Orazio
>>
>>

Re: [Question] check if pipeline is still running in pipeline runner

Posted by Lydian <ly...@gmail.com>.
I am using the lyft flink operator (in k8s), and it is able to monitor the
submitted job status for us. It shows both cluster and job healthiness.
The issue so far we’ve seen is sometimes the task keep failing and
retrying, but it was not detected by the flink operator. However, the flink
itself comes with metrics on restart task count, and we use that to help us
catching the issue like that.

- flink k8s operator:
https://github.com/lyft/flinkk8soperator

On Fri, Jul 7, 2023 at 2:34 PM Adlae D'Orazio <ad...@liminalinsights.com>
wrote:

> Hi Jan,
>
> Thank you for your response! Apologies that this wasn't clear, but we're
> actually looking at what would happen if the job server *were *to go
> down. So what we are more interested in is understanding *how* to
> actually monitor that the job is running. We won't know the job id so we
> can't use that to query the REST API. Like I said, we were looking into
> that method that initializes an AppName, but that was written in Java. What
> do you think we should do? Thank you so much for your help!
>
> Best,
>
> Adlae D'Orazio
>
> On Fri, Jul 7, 2023 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> if I understand correctly, you have a 'program runner' (sometimes called
>> a driver), which is supposed to be long-running and watching if the
>> submitted Pipeline runs or not. If not, then the driver resubmits the job.
>> If my understanding is correct, I would suggest looking into the reasons
>> why the pipeline terminates in the first place. Flink is designed to ensure
>> that after job submission it is fault-tolerant for both application-level
>> errors (e.g. transient user code errors, external dependencies failures,
>> etc) and the Flink runtime itself (failures of taskmanagers or jobmanager).
>> The most often case when this does not work is some sort of
>> misconfiguration (typically inability to restore jobs after failure of
>> jobmanager).  Having said that it is good idea to _monitor_ that your job
>> runs (and ideally that it makes progress, because the pure fact that job
>> 'runs' does not imply that), but it should require manual action in case
>> the job is permanently gone. Simple resubmission of the job is not what I
>> would expect to work well.
>>
>> Best,
>>
>>  Jan
>> On 7/6/23 22:07, Adlae D'Orazio wrote:
>>
>> Hello,
>>
>>
>> I am using an Apache Flink cluster to run a streaming pipeline that I've
>> created using Apache Beam. This streaming pipeline should be the only one
>> of its type running on the Flink cluster, and I need some help with how to
>> ensure that is the case.
>>
>>
>> A Dockerized pipeline runner program submits the streaming pipeline, and
>> if the pipeline exits (i.e. because of an error), then the pipeline runner
>> program exits and is re-run, so that the pipeline is submitted again and
>> continues running.
>>
>>
>> The problem I am running into is that if the pipeline runner program
>> exits, but the streaming pipeline is still running (i.e. because the job
>> server went down and came back up), then I need to check in the pipeline
>> runner program whether or not the pipeline is still running, or if it has
>> gone down.
>>
>>
>> My first thought was to try to create a specific job name that would be
>> stored in Flink's REST API, and then to see if the job was already running,
>> I could query the REST API for that name. I'm having trouble doing this. I
>> seem to be able to set a job name in Beam, but that job name does not seem
>> to be accessible via Flink’s REST API once the pipeline is run using Flink.
>> From researching this problem, I found this
>> <https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>
>> method, which initializes an AppName. This seems promising to me, but it is
>> written in Java and I am looking to do it in Python.
>>
>>
>> Is there a way to specify the Flink job name via the Beam Python SDK? Or
>> is there a simpler way to know that a particular Beam pipeline is running,
>> and therefore not resubmit it?
>>
>>
>> Please let me know if you have any suggestions - either about how to
>> execute the approaches I've described or if there's a simpler solution that
>> I am overlooking. Thank you for your help!
>>
>>
>> Best,
>>
>> Adlae D'Orazio
>>
>> --
Sincerely,
Lydian Lee

Re: [Question] check if pipeline is still running in pipeline runner

Posted by Adlae D'Orazio <ad...@liminalinsights.com>.
Hi Jan,

Thank you for your response! Apologies that this wasn't clear, but we're
actually looking at what would happen if the job server *were *to go down.
So what we are more interested in is understanding *how* to actually
monitor that the job is running. We won't know the job id so we can't use
that to query the REST API. Like I said, we were looking into that method
that initializes an AppName, but that was written in Java. What do you
think we should do? Thank you so much for your help!

Best,

Adlae D'Orazio

On Fri, Jul 7, 2023 at 1:28 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> if I understand correctly, you have a 'program runner' (sometimes called a
> driver), which is supposed to be long-running and watching if the submitted
> Pipeline runs or not. If not, then the driver resubmits the job. If my
> understanding is correct, I would suggest looking into the reasons why the
> pipeline terminates in the first place. Flink is designed to ensure that
> after job submission it is fault-tolerant for both application-level errors
> (e.g. transient user code errors, external dependencies failures, etc) and
> the Flink runtime itself (failures of taskmanagers or jobmanager). The most
> often case when this does not work is some sort of misconfiguration
> (typically inability to restore jobs after failure of jobmanager).  Having
> said that it is good idea to _monitor_ that your job runs (and ideally that
> it makes progress, because the pure fact that job 'runs' does not imply
> that), but it should require manual action in case the job is permanently
> gone. Simple resubmission of the job is not what I would expect to work
> well.
>
> Best,
>
>  Jan
> On 7/6/23 22:07, Adlae D'Orazio wrote:
>
> Hello,
>
>
> I am using an Apache Flink cluster to run a streaming pipeline that I've
> created using Apache Beam. This streaming pipeline should be the only one
> of its type running on the Flink cluster, and I need some help with how to
> ensure that is the case.
>
>
> A Dockerized pipeline runner program submits the streaming pipeline, and
> if the pipeline exits (i.e. because of an error), then the pipeline runner
> program exits and is re-run, so that the pipeline is submitted again and
> continues running.
>
>
> The problem I am running into is that if the pipeline runner program
> exits, but the streaming pipeline is still running (i.e. because the job
> server went down and came back up), then I need to check in the pipeline
> runner program whether or not the pipeline is still running, or if it has
> gone down.
>
>
> My first thought was to try to create a specific job name that would be
> stored in Flink's REST API, and then to see if the job was already running,
> I could query the REST API for that name. I'm having trouble doing this. I
> seem to be able to set a job name in Beam, but that job name does not seem
> to be accessible via Flink’s REST API once the pipeline is run using Flink.
> From researching this problem, I found this
> <https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>
> method, which initializes an AppName. This seems promising to me, but it is
> written in Java and I am looking to do it in Python.
>
>
> Is there a way to specify the Flink job name via the Beam Python SDK? Or
> is there a simpler way to know that a particular Beam pipeline is running,
> and therefore not resubmit it?
>
>
> Please let me know if you have any suggestions - either about how to
> execute the approaches I've described or if there's a simpler solution that
> I am overlooking. Thank you for your help!
>
>
> Best,
>
> Adlae D'Orazio
>
>

Re: [Question] check if pipeline is still running in pipeline runner

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

if I understand correctly, you have a 'program runner' (sometimes called 
a driver), which is supposed to be long-running and watching if the 
submitted Pipeline runs or not. If not, then the driver resubmits the 
job. If my understanding is correct, I would suggest looking into the 
reasons why the pipeline terminates in the first place. Flink is 
designed to ensure that after job submission it is fault-tolerant for 
both application-level errors (e.g. transient user code errors, external 
dependencies failures, etc) and the Flink runtime itself (failures of 
taskmanagers or jobmanager). The most often case when this does not work 
is some sort of misconfiguration (typically inability to restore jobs 
after failure of jobmanager).  Having said that it is good idea to 
_monitor_ that your job runs (and ideally that it makes progress, 
because the pure fact that job 'runs' does not imply that), but it 
should require manual action in case the job is permanently gone. Simple 
resubmission of the job is not what I would expect to work well.

Best,

  Jan

On 7/6/23 22:07, Adlae D'Orazio wrote:
>
> Hello,
>
>
> I am using an Apache Flink cluster to run a streaming pipeline that 
> I've created using Apache Beam. This streaming pipeline should be the 
> only one of its type running on the Flink cluster, and I need some 
> help with how to ensure that is the case.
>
>
> A Dockerized pipeline runner program submits the streaming pipeline, 
> and if the pipeline exits (i.e. because of an error), then the 
> pipeline runner program exits and is re-run, so that the pipeline is 
> submitted again and continues running.
>
>
> The problem I am running into is that if the pipeline runner program 
> exits, but the streaming pipeline is still running (i.e. because the 
> job server went down and came back up), then I need to check in the 
> pipeline runner program whether or not the pipeline is still running, 
> or if it has gone down.
>
>
> My first thought was to try to create a specific job name that would 
> be stored in Flink's REST API, and then to see if the job was already 
> running, I could query the REST API for that name. I'm having trouble 
> doing this. I seem to be able to set a job name in Beam, but that job 
> name does not seem to be accessible via Flink’s REST API once the 
> pipeline is run using Flink. From researching this problem, I found 
> this 
> <https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>method, 
> which initializes an AppName. This seems promising to me, but it is 
> written in Java and I am looking to do it in Python.
>
>
> Is there a way to specify the Flink job name via the Beam Python SDK? 
> Or is there a simpler way to know that a particular Beam pipeline is 
> running, and therefore not resubmit it?
>
>
> Please let me know if you have any suggestions - either about how to 
> execute the approaches I've described or if there's a simpler solution 
> that I am overlooking. Thank you for your help!
>
>
> Best,
>
> Adlae D'Orazio
>
>