You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Luke Xiong <le...@gmail.com> on 2023/05/11 18:57:25 UTC
How to know when a pipeline ends
Hi,
My flink job needs to do something when the pipeline execution has ended.
The job code is like this:
createSomeStream().applySomeOperators();
env.execute(jobName);
doSomeCleanupTasks();
It looks like doSomeCleanupTasks() can be called while the pipeline is
still running. The job is for processing a bounded stream, so it doesn't
run forever. Is it possible to achieve this so doSomeCleanupTasks is called
only when the pipeline has processed all the data? This happens when the
runtime mode is STREAMING. Would running it in BATCH mode make any
difference?
Regards,
Luke
Re: How to know when a pipeline ends
Posted by Alexey Novakov via user <us...@flink.apache.org>.
Hi Luke,
Did you try to use *env.registerJobListener()* to be notified on job
execution?
https://coderstea.in/post/big-data/flink-job-listener-run-a-task-after-flink-job-is-completed/
Best regards,
Alex
On Fri, May 12, 2023 at 8:01 PM Luke Xiong <le...@gmail.com> wrote:
> Hi Weihua and Shammon,
>
> Thanks for the pointers.I tried both, unfortunately neither works.
>
> By enabling "execution.attached", there doesn't seem to be any difference
> than the default settings. doSomeCleanupTasks() is called right away
> while the pipeline is still running; and env.executeAsync().getJobStatus()
> causes an exception:
> org.apache.flink.util.FlinkRuntimeException: The Job Status cannot be
> requested when in Web Submission.
>
> FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*
>
> Regards,
> Luke
>
> On Fri, May 12, 2023 at 1:32 AM Weihua Hu <hu...@gmail.com> wrote:
>
>>
>> Hi, Luke
>>
>> You can enable "execution.attached", then env.execute() will wait until
>> the job is finished.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached
>>
>> Best,
>> Weihua
>>
>>
>> On Fri, May 12, 2023 at 8:59 AM Shammon FY <zj...@gmail.com> wrote:
>>
>>> Hi Luke,
>>>
>>> Maybe you can get 'JobClient' after submit the job and check the job
>>> status with 'JobClient.getJobStatus()'
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> My flink job needs to do something when the pipeline execution has
>>>> ended. The job code is like this:
>>>>
>>>> createSomeStream().applySomeOperators();
>>>> env.execute(jobName);
>>>> doSomeCleanupTasks();
>>>>
>>>> It looks like doSomeCleanupTasks() can be called while the pipeline is
>>>> still running. The job is for processing a bounded stream, so it doesn't
>>>> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
>>>> only when the pipeline has processed all the data? This happens when the
>>>> runtime mode is STREAMING. Would running it in BATCH mode make any
>>>> difference?
>>>>
>>>> Regards,
>>>> Luke
>>>>
>>>>
>>>>
Re: How to know when a pipeline ends
Posted by Jiadong lu <ar...@gmail.com>.
Hi Luke.
I hope this email finds you well. I wanted to share my agreement with
Shammon's solution regarding your query. Additionally, I would like to
provide some helpful hints that might assist you further:
1. To create a PackagedProgram, you can utilize the
PackagedProgram.Builder class.
2. Building a JobGraph can be achieved by employing the
PackagedProgramUtils.createJobGraph method.
3. Initializing a RestClusterClient with your Flink cluster
configuration will allow you to interact with the cluster.
4. By submitting the jobgraph, you will obtain a JobID.
5. Finally, you can use the JobID to communicate with your job within
the Flink cluster.
I hope these suggestions prove beneficial to you in your current
endeavor. Should you require any further assistance, please do not
hesitate to reach out. The solution that i mentioned below is my current
solution of manage the flink job.
Best,
Jiadong Lu
On 2023/5/13 2:00, Luke Xiong wrote:
> Hi Weihua and Shammon,
>
> Thanks for the pointers.I tried both, unfortunately neither works.
>
> By enabling "execution.attached", there doesn't seem to be any
> difference than the default settings. doSomeCleanupTasks() is called
> right away while the pipeline is still running; and
> env.executeAsync().getJobStatus() causes an exception:
> org.apache.flink.util.FlinkRuntimeException: The Job Status cannot
> be requested when in Web Submission.
>
> FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*
>
> Regards,
> Luke
>
> On Fri, May 12, 2023 at 1:32 AM Weihua Hu <huweihua.ckl@gmail.com
> <ma...@gmail.com>> wrote:
>
>
> Hi, Luke
>
> You can enable "execution.attached", then env.execute() will wait
> until the job is finished.
>
> [1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached>
>
> Best,
> Weihua
>
>
> On Fri, May 12, 2023 at 8:59 AM Shammon FY <zjureel@gmail.com
> <ma...@gmail.com>> wrote:
>
> Hi Luke,
>
> Maybe you can get 'JobClient' after submit the job and check the
> job status with 'JobClient.getJobStatus()'
>
> Best,
> Shammon FY
>
>
> On Fri, May 12, 2023 at 2:58 AM Luke Xiong <leixure@gmail.com
> <ma...@gmail.com>> wrote:
>
> Hi,
>
> My flink job needs to do something when the pipeline
> execution has ended. The job code is like this:
>
> createSomeStream().applySomeOperators();
> env.execute(jobName);
> doSomeCleanupTasks();
>
> It looks like doSomeCleanupTasks() can be called while the
> pipeline is still running. The job is for processing a
> bounded stream, so it doesn't run forever. Is it possible to
> achieve this so doSomeCleanupTasks is called only when the
> pipeline has processed all the data? This happens when the
> runtime mode is STREAMING. Would running it in BATCH mode
> make any difference?
>
> Regards,
> Luke
>
>
Re: How to know when a pipeline ends
Posted by Luke Xiong <le...@gmail.com>.
Hi Weihua and Shammon,
Thanks for the pointers.I tried both, unfortunately neither works.
By enabling "execution.attached", there doesn't seem to be any difference
than the default settings. doSomeCleanupTasks() is called right away while
the pipeline is still running; and env.executeAsync().getJobStatus() causes
an exception:
org.apache.flink.util.FlinkRuntimeException: The Job Status cannot be
requested when in Web Submission.
FYI, I am using 1.15 and the job is submitted with */jars/:jarid/run*
Regards,
Luke
On Fri, May 12, 2023 at 1:32 AM Weihua Hu <hu...@gmail.com> wrote:
>
> Hi, Luke
>
> You can enable "execution.attached", then env.execute() will wait until
> the job is finished.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached
>
> Best,
> Weihua
>
>
> On Fri, May 12, 2023 at 8:59 AM Shammon FY <zj...@gmail.com> wrote:
>
>> Hi Luke,
>>
>> Maybe you can get 'JobClient' after submit the job and check the job
>> status with 'JobClient.getJobStatus()'
>>
>> Best,
>> Shammon FY
>>
>>
>> On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> My flink job needs to do something when the pipeline execution has
>>> ended. The job code is like this:
>>>
>>> createSomeStream().applySomeOperators();
>>> env.execute(jobName);
>>> doSomeCleanupTasks();
>>>
>>> It looks like doSomeCleanupTasks() can be called while the pipeline is
>>> still running. The job is for processing a bounded stream, so it doesn't
>>> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
>>> only when the pipeline has processed all the data? This happens when the
>>> runtime mode is STREAMING. Would running it in BATCH mode make any
>>> difference?
>>>
>>> Regards,
>>> Luke
>>>
>>>
>>>
Re: How to know when a pipeline ends
Posted by Weihua Hu <hu...@gmail.com>.
Hi, Luke
You can enable "execution.attached", then env.execute() will wait until the
job is finished.
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-attached
Best,
Weihua
On Fri, May 12, 2023 at 8:59 AM Shammon FY <zj...@gmail.com> wrote:
> Hi Luke,
>
> Maybe you can get 'JobClient' after submit the job and check the job
> status with 'JobClient.getJobStatus()'
>
> Best,
> Shammon FY
>
>
> On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:
>
>> Hi,
>>
>> My flink job needs to do something when the pipeline execution has ended.
>> The job code is like this:
>>
>> createSomeStream().applySomeOperators();
>> env.execute(jobName);
>> doSomeCleanupTasks();
>>
>> It looks like doSomeCleanupTasks() can be called while the pipeline is
>> still running. The job is for processing a bounded stream, so it doesn't
>> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
>> only when the pipeline has processed all the data? This happens when the
>> runtime mode is STREAMING. Would running it in BATCH mode make any
>> difference?
>>
>> Regards,
>> Luke
>>
>>
>>
Re: How to know when a pipeline ends
Posted by Shammon FY <zj...@gmail.com>.
Hi Luke,
Maybe you can get 'JobClient' after submit the job and check the job status
with 'JobClient.getJobStatus()'
Best,
Shammon FY
On Fri, May 12, 2023 at 2:58 AM Luke Xiong <le...@gmail.com> wrote:
> Hi,
>
> My flink job needs to do something when the pipeline execution has ended.
> The job code is like this:
>
> createSomeStream().applySomeOperators();
> env.execute(jobName);
> doSomeCleanupTasks();
>
> It looks like doSomeCleanupTasks() can be called while the pipeline is
> still running. The job is for processing a bounded stream, so it doesn't
> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
> only when the pipeline has processed all the data? This happens when the
> runtime mode is STREAMING. Would running it in BATCH mode make any
> difference?
>
> Regards,
> Luke
>
>
>