You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Luke Xiong <le...@gmail.com> on 2023/05/11 18:57:25 UTC

How to know when a pipeline ends

Hi,

My flink job needs to do something when the pipeline execution has ended.
The job code is like this:

createSomeStream().applySomeOperators();
env.execute(jobName);
doSomeCleanupTasks();

It looks like doSomeCleanupTasks() can be called while the pipeline is
still running. The job is for processing a bounded stream, so it doesn't
run forever. Is it possible to achieve this so doSomeCleanupTasks is called
only when the pipeline has processed all the data? This happens when the
runtime mode is STREAMING. Would running it in BATCH mode make any
difference?

Regards,
Luke

Re: How to know when a pipeline ends

Posted by Alexey Novakov via user <us...@flink.apache.org>.
Hi Luke,

Did you try to use *env.registerJobListener()* to be notified on job
execution?

https://coderstea.in/post/big-data/flink-job-listener-run-a-task-after-flink-job-is-completed/

Best regards,
Alex

On Fri, May 12, 2023 at 8:01 PM Luke Xiong <le...@gmail.com> wrote:

> Hi Weihua and Shammon,
>
> Thanks for the pointers.I tried both, unfortunately neither works.
>
> By enabling "execution.attached", there doesn't seem to be any difference
> than the default settings. doSomeCleanupTasks() is called right away
> while the pipeline is still running; and env.executeAsync().getJobStatus()
> causes an exception:
>     org.apache.flink.util.FlinkRuntimeException: The Job Status cannot be
> requested when in Web Submission.
>
> FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*
>
> Regards,
> Luke
>
> On Fri, May 12, 2023 at 1:32 AM Weihua Hu <hu...@gmail.com> wrote:
>
>>
>> Hi, Luke
>>
>> You can enable "execution.attached", then env.execute() will wait until
>> the job is finished.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached
>>
>> Best,
>> Weihua
>>
>>
>> On Fri, May 12, 2023 at 8:59 AM Shammon FY <zj...@gmail.com> wrote:
>>
>>> Hi Luke,
>>>
>>> Maybe you can get 'JobClient' after submit the job and check the job
>>> status with 'JobClient.getJobStatus()'
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> My flink job needs to do something when the pipeline execution has
>>>> ended. The job code is like this:
>>>>
>>>> createSomeStream().applySomeOperators();
>>>> env.execute(jobName);
>>>> doSomeCleanupTasks();
>>>>
>>>> It looks like doSomeCleanupTasks() can be called while the pipeline is
>>>> still running. The job is for processing a bounded stream, so it doesn't
>>>> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
>>>> only when the pipeline has processed all the data? This happens when the
>>>> runtime mode is STREAMING. Would running it in BATCH mode make any
>>>> difference?
>>>>
>>>> Regards,
>>>> Luke
>>>>
>>>>
>>>>

Re: How to know when a pipeline ends

Posted by Jiadong lu <ar...@gmail.com>.
Hi Luke.

I hope this email finds you well. I wanted to share my agreement with 
Shammon's solution regarding your query. Additionally, I would like to 
provide some helpful hints that might assist you further:

1. To create a PackagedProgram, you can utilize the 
PackagedProgram.Builder class.
2. Building a JobGraph can be achieved by employing the 
PackagedProgramUtils.createJobGraph method.
3. Initializing a RestClusterClient with your Flink cluster 
configuration will allow you to interact with the cluster.
4. By submitting the jobgraph, you will obtain a JobID.
5. Finally, you can use the JobID to communicate with your job within 
the Flink cluster.

I hope these suggestions prove beneficial to you in your current 
endeavor. Should you require any further assistance, please do not 
hesitate to reach out. The solution that i mentioned below is my current 
solution of manage the flink job.

Best,
Jiadong Lu

On 2023/5/13 2:00, Luke Xiong wrote:
> Hi Weihua and Shammon,
> 
> Thanks for the pointers.I tried both, unfortunately neither works.
> 
> By enabling "execution.attached", there doesn't seem to be any 
> difference than the default settings. doSomeCleanupTasks() is called 
> right away while the pipeline is still running; and 
> env.executeAsync().getJobStatus() causes an exception:
>      org.apache.flink.util.FlinkRuntimeException: The Job Status cannot 
> be requested when in Web Submission.
> 
> FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*
> 
> Regards,
> Luke
> 
> On Fri, May 12, 2023 at 1:32 AM Weihua Hu <huweihua.ckl@gmail.com 
> <ma...@gmail.com>> wrote:
> 
> 
>     Hi, Luke
> 
>     You can enable "execution.attached", then env.execute() will wait
>     until the job is finished.
> 
>     [1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached>
> 
>     Best,
>     Weihua
> 
> 
>     On Fri, May 12, 2023 at 8:59 AM Shammon FY <zjureel@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>         Hi Luke,
> 
>         Maybe you can get 'JobClient' after submit the job and check the
>         job status with 'JobClient.getJobStatus()'
> 
>         Best,
>         Shammon FY
> 
> 
>         On Fri, May 12, 2023 at 2:58 AM Luke Xiong <leixure@gmail.com
>         <ma...@gmail.com>> wrote:
> 
>             Hi,
> 
>             My flink job needs to do something when the pipeline
>             execution has ended. The job code is like this:
> 
>             createSomeStream().applySomeOperators();
>             env.execute(jobName);
>             doSomeCleanupTasks();
> 
>             It looks like doSomeCleanupTasks() can be called while the
>             pipeline is still running. The job is for processing a
>             bounded stream, so it doesn't run forever. Is it possible to
>             achieve this so doSomeCleanupTasks is called only when the
>             pipeline has processed all the data? This happens when the
>             runtime mode is STREAMING. Would running it in BATCH mode
>             make any difference?
> 
>             Regards,
>             Luke
> 
> 

Re: How to know when a pipeline ends

Posted by Luke Xiong <le...@gmail.com>.
Hi Weihua and Shammon,

Thanks for the pointers.I tried both, unfortunately neither works.

By enabling "execution.attached", there doesn't seem to be any difference
than the default settings. doSomeCleanupTasks() is called right away while
the pipeline is still running; and env.executeAsync().getJobStatus() causes
an exception:
    org.apache.flink.util.FlinkRuntimeException: The Job Status cannot be
requested when in Web Submission.

FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*

Regards,
Luke

On Fri, May 12, 2023 at 1:32 AM Weihua Hu <hu...@gmail.com> wrote:

>
> Hi, Luke
>
> You can enable "execution.attached", then env.execute() will wait until
> the job is finished.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached
>
> Best,
> Weihua
>
>
> On Fri, May 12, 2023 at 8:59 AM Shammon FY <zj...@gmail.com> wrote:
>
>> Hi Luke,
>>
>> Maybe you can get 'JobClient' after submit the job and check the job
>> status with 'JobClient.getJobStatus()'
>>
>> Best,
>> Shammon FY
>>
>>
>> On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> My flink job needs to do something when the pipeline execution has
>>> ended. The job code is like this:
>>>
>>> createSomeStream().applySomeOperators();
>>> env.execute(jobName);
>>> doSomeCleanupTasks();
>>>
>>> It looks like doSomeCleanupTasks() can be called while the pipeline is
>>> still running. The job is for processing a bounded stream, so it doesn't
>>> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
>>> only when the pipeline has processed all the data? This happens when the
>>> runtime mode is STREAMING. Would running it in BATCH mode make any
>>> difference?
>>>
>>> Regards,
>>> Luke
>>>
>>>
>>>

Re: How to know when a pipeline ends

Posted by Weihua Hu <hu...@gmail.com>.
Hi, Luke

You can enable "execution.attached", then env.execute() will wait until the
job is finished.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached

Best,
Weihua


On Fri, May 12, 2023 at 8:59 AM Shammon FY <zj...@gmail.com> wrote:

> Hi Luke,
>
> Maybe you can get 'JobClient' after submit the job and check the job
> status with 'JobClient.getJobStatus()'
>
> Best,
> Shammon FY
>
>
> On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:
>
>> Hi,
>>
>> My flink job needs to do something when the pipeline execution has ended.
>> The job code is like this:
>>
>> createSomeStream().applySomeOperators();
>> env.execute(jobName);
>> doSomeCleanupTasks();
>>
>> It looks like doSomeCleanupTasks() can be called while the pipeline is
>> still running. The job is for processing a bounded stream, so it doesn't
>> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
>> only when the pipeline has processed all the data? This happens when the
>> runtime mode is STREAMING. Would running it in BATCH mode make any
>> difference?
>>
>> Regards,
>> Luke
>>
>>
>>

Re: How to know when a pipeline ends

Posted by Shammon FY <zj...@gmail.com>.
Hi Luke,

Maybe you can get 'JobClient' after submit the job and check the job status
with 'JobClient.getJobStatus()'

Best,
Shammon FY


On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:

> Hi,
>
> My flink job needs to do something when the pipeline execution has ended.
> The job code is like this:
>
> createSomeStream().applySomeOperators();
> env.execute(jobName);
> doSomeCleanupTasks();
>
> It looks like doSomeCleanupTasks() can be called while the pipeline is
> still running. The job is for processing a bounded stream, so it doesn't
> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
> only when the pipeline has processed all the data? This happens when the
> runtime mode is STREAMING. Would running it in BATCH mode make any
> difference?
>
> Regards,
> Luke
>
>
>