You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by 季文昊 <pr...@gmail.com> on 2020/09/23 12:31:09 UTC

[DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Hi there,

I'm working on a Flink platform in my corp, which provides a service to
provision and manage multiple dedicated Flink clusters. The problem is that
we want to sync a job status without delay after its submission through our
platform as long as it has been changed.

Since we want to update this in-time and make our services stateless,
pulling a job's status periodically is not a good solution. I do not find
any proper way to achieve this by letting a job manager push changes
directly to our platform except changing the source code, which registers
an additional `JobStatusListener` in the method
`org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.

I wonder if we can enhance `JobStatusListener` a little bit so that a Flink
user can register his custom JobStatusListener at the startup.

To be specific, we can have a `JobStatusListenerFactory` interface and its
corresponding `ServiceLoader<JobStatusListenerFactory>`, where
the JobStatusListenerFactory will have the following method:
 - JobStatusListener createJobStatusListener(Properties properties);

Custom listeners will be created during the JobMaster#startScheduling
method.

If someone would like to implement his own JobStatusListener, he will
package all the related classes into a standalone jar with a
`META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
file and place it under the `lib/` directory.

In addition, I find that there is a Jira ticket similar to what I'm
asking: FLINK-17104 but I do not see any comment or update yet. Hope anyone
could help me move on this feature or give me some suggestions about it.

Thanks,
Wenhao

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Wenhao Ji <pr...@gmail.com>.
Hi Till,

Since it has been a little bit while, I would like to restart this discussion.

Would you please share some ideas about this? Will it become a
stability problem if we create a "surefire" listener to prevent the
exception thrown from those custom listeners? [1]
By the way, as you have mentioned that the outcome should be a design
along with the POC, shall we create a FLIP for this where I can put
the design about the change. Or maybe we should continue the
discussion to reach a consensus before a FLIP?

[1] https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7#diff-4815dccc0cbf42d48f1668d4a076d19ec96c196bc562c3d2edae4d7bf9b9bd89R72

Thanks,
Wenhao

On Sat, Apr 10, 2021 at 9:32 PM Wenhao Ji <pr...@gmail.com> wrote:
>
> Hi Till,
>
> Thanks for taking time out of your busy schedule.
> I have created a POC for this feature.
>
> The code change for the Flink source code will be like this commit: https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7
> Generally, the idea is that custom `JobStatusListener`s will be loaded via their `JobStatusListenerFactories` using the `PluginManager`. They will be created and initialized during the construction of the `ClusterEntrypoint`. `JobMaster` will register these listeners when it starts scheduling.
>
> Finally, we can implement our own plugins. I have also written an example of a JobStatusListener plugin, which simply prints the job status changes: https://github.com/predatorray/flink-example-listener-plugin
>
> Hope you will have time to review the code and idea.
>
> Thanks again!
>
> Wenhao
>
> On Wed, Mar 10, 2021 at 11:23 PM Till Rohrmann <tr...@apache.org> wrote:
>>
>> Hi Wenhao,
>>
>> Aljoscha might not be as responsive as before. Surely you can create a POC
>> to evaluate different approaches. But the outcome should be a design which
>> we discuss before starting to implement the code properly. At the moment
>> the community might be a bit busy with the upcoming feature freeze (just
>> for your information).
>>
>> Cheers,
>> Till
>>
>> On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <pr...@gmail.com> wrote:
>>
>> > Hi Till. Indeed, there is no proper solution now other than the polling
>> > method. It is painful to have such code in our platform since it consumes a
>> > lot of resources to keep the polling run periodically when there are
>> > hundreds of Flink clusters to maintain. A lot of pollings are actually
>> > useless as the job status seldom changes. Also, it makes the status not
>> > synchronized in-time.
>> >
>> > Aljoscha, can I submit a PR firstly so that we can review and discuss
>> > whether it will introduce any stability problem or potential risks.
>> >
>> > Thanks,
>> > Wenhao
>> >
>> > On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <zj...@gmail.com> wrote:
>> >
>> > > Hi Till,
>> > >
>> > > IIUC for application mode, we already allow to run user code in job
>> > manager
>> > >
>> > > Till Rohrmann <tr...@apache.org> 于2021年1月8日周五 下午9:53写道:
>> > >
>> > > > At the moment, this requirement has not come up very often. In
>> > general, I
>> > > > am always a bit cautious when adding functionality which executes user
>> > > code
>> > > > in the JobManager because it can easily become a stability problem. On
>> > > the
>> > > > other hand, I can't think of a different solution other than polling
>> > the
>> > > > job status atm.
>> > > >
>> > > > Cheers,
>> > > > Till
>> > > >
>> > > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <al...@apache.org>
>> > > > wrote:
>> > > >
>> > > > > Till or Chesnay (cc'ed), have you thought about adding a hook on the
>> > > > > JobMaster/JobManager to allow external systems to get push
>> > > notifications
>> > > > > about submitted jobs.
>> > > > >
>> > > > > If they are ok with such as future, would you maybe be interested in
>> > > > > implementing it yourself, Wenhao?
>> > > > >
>> > > > > Best,
>> > > > > Aljoscha
>> > > > >
>> > > > > On 2020/09/28 11:14, 季文昊 wrote:
>> > > > > >Hi Aljoscha,
>> > > > > >
>> > > > > >Yes, that is not enough, since the `JobListener`s are called only
>> > once
>> > > > > when
>> > > > > >`excute()` or `executeAsync()` is called. And in order to sync the
>> > > > status,
>> > > > > >I also have to call `JobClient#getJobStatus` periodically.
>> > > > > >
>> > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <
>> > aljoscha@apache.org
>> > > >
>> > > > > >wrote:
>> > > > > >
>> > > > > >> Hi,
>> > > > > >>
>> > > > > >> I understand from your email that
>> > > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be
>> > > > enought
>> > > > > >> for you because you want to be notified of changes on the cluster
>> > > > side,
>> > > > > >> correct? That is when the job status changes on the master.
>> > > > > >>
>> > > > > >> Best,
>> > > > > >> Aljoscha
>> > > > > >>
>> > > > > >> On 23.09.20 14:31, 季文昊 wrote:
>> > > > > >> > Hi there,
>> > > > > >> >
>> > > > > >> > I'm working on a Flink platform in my corp, which provides a
>> > > service
>> > > > > to
>> > > > > >> > provision and manage multiple dedicated Flink clusters. The
>> > > problem
>> > > > is
>> > > > > >> that
>> > > > > >> > we want to sync a job status without delay after its submission
>> > > > > through
>> > > > > >> our
>> > > > > >> > platform as long as it has been changed.
>> > > > > >> >
>> > > > > >> > Since we want to update this in-time and make our services
>> > > > stateless,
>> > > > > >> > pulling a job's status periodically is not a good solution. I do
>> > > not
>> > > > > find
>> > > > > >> > any proper way to achieve this by letting a job manager push
>> > > changes
>> > > > > >> > directly to our platform except changing the source code, which
>> > > > > registers
>> > > > > >> > an additional `JobStatusListener` in the method
>> > > > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
>> > > > > >> >
>> > > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so
>> > > that
>> > > > a
>> > > > > >> Flink
>> > > > > >> > user can register his custom JobStatusListener at the startup.
>> > > > > >> >
>> > > > > >> > To be specific, we can have a `JobStatusListenerFactory`
>> > interface
>> > > > and
>> > > > > >> its
>> > > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
>> > > > > >> > the JobStatusListenerFactory will have the following method:
>> > > > > >> >   - JobStatusListener createJobStatusListener(Properties
>> > > > properties);
>> > > > > >> >
>> > > > > >> > Custom listeners will be created during the
>> > > > JobMaster#startScheduling
>> > > > > >> > method.
>> > > > > >> >
>> > > > > >> > If someone would like to implement his own JobStatusListener, he
>> > > > will
>> > > > > >> > package all the related classes into a standalone jar with a
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
>> > > > > >> > file and place it under the `lib/` directory.
>> > > > > >> >
>> > > > > >> > In addition, I find that there is a Jira ticket similar to what
>> > > I'm
>> > > > > >> > asking: FLINK-17104 but I do not see any comment or update yet.
>> > > Hope
>> > > > > >> anyone
>> > > > > >> > could help me move on this feature or give me some suggestions
>> > > about
>> > > > > it.
>> > > > > >> >
>> > > > > >> > Thanks,
>> > > > > >> > Wenhao
>> > > > > >> >
>> > > > > >>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > Best Regards
>> > >
>> > > Jeff Zhang
>> > >
>> >

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Wenhao Ji <pr...@gmail.com>.
Hi Till,

Thanks for taking time out of your busy schedule.
I have created a POC for this feature.

The code change for the Flink source code will be like this commit:
https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7
Generally, the idea is that custom `JobStatusListener`s will be loaded via
their `JobStatusListenerFactories` using the `PluginManager`. They will be
created and initialized during the construction of the `ClusterEntrypoint`.
`JobMaster` will register these listeners when it starts scheduling.

Finally, we can implement our own plugins. I have also written an example
of a JobStatusListener plugin, which simply prints the job status changes:
https://github.com/predatorray/flink-example-listener-plugin

Hope you will have time to review the code and idea.

Thanks again!

Wenhao

On Wed, Mar 10, 2021 at 11:23 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Wenhao,
>
> Aljoscha might not be as responsive as before. Surely you can create a POC
> to evaluate different approaches. But the outcome should be a design which
> we discuss before starting to implement the code properly. At the moment
> the community might be a bit busy with the upcoming feature freeze (just
> for your information).
>
> Cheers,
> Till
>
> On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <pr...@gmail.com> wrote:
>
> > Hi Till. Indeed, there is no proper solution now other than the polling
> > method. It is painful to have such code in our platform since it
> consumes a
> > lot of resources to keep the polling run periodically when there are
> > hundreds of Flink clusters to maintain. A lot of pollings are actually
> > useless as the job status seldom changes. Also, it makes the status not
> > synchronized in-time.
> >
> > Aljoscha, can I submit a PR firstly so that we can review and discuss
> > whether it will introduce any stability problem or potential risks.
> >
> > Thanks,
> > Wenhao
> >
> > On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <zj...@gmail.com> wrote:
> >
> > > Hi Till,
> > >
> > > IIUC for application mode, we already allow to run user code in job
> > manager
> > >
> > > Till Rohrmann <tr...@apache.org> 于2021年1月8日周五 下午9:53写道:
> > >
> > > > At the moment, this requirement has not come up very often. In
> > general, I
> > > > am always a bit cautious when adding functionality which executes
> user
> > > code
> > > > in the JobManager because it can easily become a stability problem.
> On
> > > the
> > > > other hand, I can't think of a different solution other than polling
> > the
> > > > job status atm.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <aljoscha@apache.org
> >
> > > > wrote:
> > > >
> > > > > Till or Chesnay (cc'ed), have you thought about adding a hook on
> the
> > > > > JobMaster/JobManager to allow external systems to get push
> > > notifications
> > > > > about submitted jobs.
> > > > >
> > > > > If they are ok with such as future, would you maybe be interested
> in
> > > > > implementing it yourself, Wenhao?
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > On 2020/09/28 11:14, 季文昊 wrote:
> > > > > >Hi Aljoscha,
> > > > > >
> > > > > >Yes, that is not enough, since the `JobListener`s are called only
> > once
> > > > > when
> > > > > >`excute()` or `executeAsync()` is called. And in order to sync the
> > > > status,
> > > > > >I also have to call `JobClient#getJobStatus` periodically.
> > > > > >
> > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > >wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> I understand from your email that
> > > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> > > > enought
> > > > > >> for you because you want to be notified of changes on the
> cluster
> > > > side,
> > > > > >> correct? That is when the job status changes on the master.
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On 23.09.20 14:31, 季文昊 wrote:
> > > > > >> > Hi there,
> > > > > >> >
> > > > > >> > I'm working on a Flink platform in my corp, which provides a
> > > service
> > > > > to
> > > > > >> > provision and manage multiple dedicated Flink clusters. The
> > > problem
> > > > is
> > > > > >> that
> > > > > >> > we want to sync a job status without delay after its
> submission
> > > > > through
> > > > > >> our
> > > > > >> > platform as long as it has been changed.
> > > > > >> >
> > > > > >> > Since we want to update this in-time and make our services
> > > > stateless,
> > > > > >> > pulling a job's status periodically is not a good solution. I
> do
> > > not
> > > > > find
> > > > > >> > any proper way to achieve this by letting a job manager push
> > > changes
> > > > > >> > directly to our platform except changing the source code,
> which
> > > > > registers
> > > > > >> > an additional `JobStatusListener` in the method
> > > > > >> >
> `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > > > > >> >
> > > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so
> > > that
> > > > a
> > > > > >> Flink
> > > > > >> > user can register his custom JobStatusListener at the startup.
> > > > > >> >
> > > > > >> > To be specific, we can have a `JobStatusListenerFactory`
> > interface
> > > > and
> > > > > >> its
> > > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > > > > >> > the JobStatusListenerFactory will have the following method:
> > > > > >> >   - JobStatusListener createJobStatusListener(Properties
> > > > properties);
> > > > > >> >
> > > > > >> > Custom listeners will be created during the
> > > > JobMaster#startScheduling
> > > > > >> > method.
> > > > > >> >
> > > > > >> > If someone would like to implement his own JobStatusListener,
> he
> > > > will
> > > > > >> > package all the related classes into a standalone jar with a
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > > > > >> > file and place it under the `lib/` directory.
> > > > > >> >
> > > > > >> > In addition, I find that there is a Jira ticket similar to
> what
> > > I'm
> > > > > >> > asking: FLINK-17104 but I do not see any comment or update
> yet.
> > > Hope
> > > > > >> anyone
> > > > > >> > could help me move on this feature or give me some suggestions
> > > about
> > > > > it.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Wenhao
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Till Rohrmann <tr...@apache.org>.
Hi Wenhao,

Aljoscha might not be as responsive as before. Surely you can create a POC
to evaluate different approaches. But the outcome should be a design which
we discuss before starting to implement the code properly. At the moment
the community might be a bit busy with the upcoming feature freeze (just
for your information).

Cheers,
Till

On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <pr...@gmail.com> wrote:

> Hi Till. Indeed, there is no proper solution now other than the polling
> method. It is painful to have such code in our platform since it consumes a
> lot of resources to keep the polling run periodically when there are
> hundreds of Flink clusters to maintain. A lot of pollings are actually
> useless as the job status seldom changes. Also, it makes the status not
> synchronized in-time.
>
> Aljoscha, can I submit a PR firstly so that we can review and discuss
> whether it will introduce any stability problem or potential risks.
>
> Thanks,
> Wenhao
>
> On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <zj...@gmail.com> wrote:
>
> > Hi Till,
> >
> > IIUC for application mode, we already allow to run user code in job
> manager
> >
> > Till Rohrmann <tr...@apache.org> 于2021年1月8日周五 下午9:53写道:
> >
> > > At the moment, this requirement has not come up very often. In
> general, I
> > > am always a bit cautious when adding functionality which executes user
> > code
> > > in the JobManager because it can easily become a stability problem. On
> > the
> > > other hand, I can't think of a different solution other than polling
> the
> > > job status atm.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <al...@apache.org>
> > > wrote:
> > >
> > > > Till or Chesnay (cc'ed), have you thought about adding a hook on the
> > > > JobMaster/JobManager to allow external systems to get push
> > notifications
> > > > about submitted jobs.
> > > >
> > > > If they are ok with such as future, would you maybe be interested in
> > > > implementing it yourself, Wenhao?
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > On 2020/09/28 11:14, 季文昊 wrote:
> > > > >Hi Aljoscha,
> > > > >
> > > > >Yes, that is not enough, since the `JobListener`s are called only
> once
> > > > when
> > > > >`excute()` or `executeAsync()` is called. And in order to sync the
> > > status,
> > > > >I also have to call `JobClient#getJobStatus` periodically.
> > > > >
> > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > >wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> I understand from your email that
> > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> > > enought
> > > > >> for you because you want to be notified of changes on the cluster
> > > side,
> > > > >> correct? That is when the job status changes on the master.
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >> On 23.09.20 14:31, 季文昊 wrote:
> > > > >> > Hi there,
> > > > >> >
> > > > >> > I'm working on a Flink platform in my corp, which provides a
> > service
> > > > to
> > > > >> > provision and manage multiple dedicated Flink clusters. The
> > problem
> > > is
> > > > >> that
> > > > >> > we want to sync a job status without delay after its submission
> > > > through
> > > > >> our
> > > > >> > platform as long as it has been changed.
> > > > >> >
> > > > >> > Since we want to update this in-time and make our services
> > > stateless,
> > > > >> > pulling a job's status periodically is not a good solution. I do
> > not
> > > > find
> > > > >> > any proper way to achieve this by letting a job manager push
> > changes
> > > > >> > directly to our platform except changing the source code, which
> > > > registers
> > > > >> > an additional `JobStatusListener` in the method
> > > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > > > >> >
> > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so
> > that
> > > a
> > > > >> Flink
> > > > >> > user can register his custom JobStatusListener at the startup.
> > > > >> >
> > > > >> > To be specific, we can have a `JobStatusListenerFactory`
> interface
> > > and
> > > > >> its
> > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > > > >> > the JobStatusListenerFactory will have the following method:
> > > > >> >   - JobStatusListener createJobStatusListener(Properties
> > > properties);
> > > > >> >
> > > > >> > Custom listeners will be created during the
> > > JobMaster#startScheduling
> > > > >> > method.
> > > > >> >
> > > > >> > If someone would like to implement his own JobStatusListener, he
> > > will
> > > > >> > package all the related classes into a standalone jar with a
> > > > >> >
> > > > >>
> > > >
> > >
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > > > >> > file and place it under the `lib/` directory.
> > > > >> >
> > > > >> > In addition, I find that there is a Jira ticket similar to what
> > I'm
> > > > >> > asking: FLINK-17104 but I do not see any comment or update yet.
> > Hope
> > > > >> anyone
> > > > >> > could help me move on this feature or give me some suggestions
> > about
> > > > it.
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Wenhao
> > > > >> >
> > > > >>
> > > > >>
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Wenhao Ji <pr...@gmail.com>.
Hi Till. Indeed, there is no proper solution now other than the polling
method. It is painful to have such code in our platform since it consumes a
lot of resources to keep the polling run periodically when there are
hundreds of Flink clusters to maintain. A lot of pollings are actually
useless as the job status seldom changes. Also, it makes the status not
synchronized in-time.

Aljoscha, can I submit a PR firstly so that we can review and discuss
whether it will introduce any stability problem or potential risks.

Thanks,
Wenhao

On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <zj...@gmail.com> wrote:

> Hi Till,
>
> IIUC for application mode, we already allow to run user code in job manager
>
> Till Rohrmann <tr...@apache.org> 于2021年1月8日周五 下午9:53写道:
>
> > At the moment, this requirement has not come up very often. In general, I
> > am always a bit cautious when adding functionality which executes user
> code
> > in the JobManager because it can easily become a stability problem. On
> the
> > other hand, I can't think of a different solution other than polling the
> > job status atm.
> >
> > Cheers,
> > Till
> >
> > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Till or Chesnay (cc'ed), have you thought about adding a hook on the
> > > JobMaster/JobManager to allow external systems to get push
> notifications
> > > about submitted jobs.
> > >
> > > If they are ok with such as future, would you maybe be interested in
> > > implementing it yourself, Wenhao?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 2020/09/28 11:14, 季文昊 wrote:
> > > >Hi Aljoscha,
> > > >
> > > >Yes, that is not enough, since the `JobListener`s are called only once
> > > when
> > > >`excute()` or `executeAsync()` is called. And in order to sync the
> > status,
> > > >I also have to call `JobClient#getJobStatus` periodically.
> > > >
> > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <aljoscha@apache.org
> >
> > > >wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I understand from your email that
> > > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> > enought
> > > >> for you because you want to be notified of changes on the cluster
> > side,
> > > >> correct? That is when the job status changes on the master.
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> On 23.09.20 14:31, 季文昊 wrote:
> > > >> > Hi there,
> > > >> >
> > > >> > I'm working on a Flink platform in my corp, which provides a
> service
> > > to
> > > >> > provision and manage multiple dedicated Flink clusters. The
> problem
> > is
> > > >> that
> > > >> > we want to sync a job status without delay after its submission
> > > through
> > > >> our
> > > >> > platform as long as it has been changed.
> > > >> >
> > > >> > Since we want to update this in-time and make our services
> > stateless,
> > > >> > pulling a job's status periodically is not a good solution. I do
> not
> > > find
> > > >> > any proper way to achieve this by letting a job manager push
> changes
> > > >> > directly to our platform except changing the source code, which
> > > registers
> > > >> > an additional `JobStatusListener` in the method
> > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > > >> >
> > > >> > I wonder if we can enhance `JobStatusListener` a little bit so
> that
> > a
> > > >> Flink
> > > >> > user can register his custom JobStatusListener at the startup.
> > > >> >
> > > >> > To be specific, we can have a `JobStatusListenerFactory` interface
> > and
> > > >> its
> > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > > >> > the JobStatusListenerFactory will have the following method:
> > > >> >   - JobStatusListener createJobStatusListener(Properties
> > properties);
> > > >> >
> > > >> > Custom listeners will be created during the
> > JobMaster#startScheduling
> > > >> > method.
> > > >> >
> > > >> > If someone would like to implement his own JobStatusListener, he
> > will
> > > >> > package all the related classes into a standalone jar with a
> > > >> >
> > > >>
> > >
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > > >> > file and place it under the `lib/` directory.
> > > >> >
> > > >> > In addition, I find that there is a Jira ticket similar to what
> I'm
> > > >> > asking: FLINK-17104 but I do not see any comment or update yet.
> Hope
> > > >> anyone
> > > >> > could help me move on this feature or give me some suggestions
> about
> > > it.
> > > >> >
> > > >> > Thanks,
> > > >> > Wenhao
> > > >> >
> > > >>
> > > >>
> > >
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Jeff Zhang <zj...@gmail.com>.
Hi Till,

IIUC for application mode, we already allow to run user code in job manager

Till Rohrmann <tr...@apache.org> 于2021年1月8日周五 下午9:53写道:

> At the moment, this requirement has not come up very often. In general, I
> am always a bit cautious when adding functionality which executes user code
> in the JobManager because it can easily become a stability problem. On the
> other hand, I can't think of a different solution other than polling the
> job status atm.
>
> Cheers,
> Till
>
> On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Till or Chesnay (cc'ed), have you thought about adding a hook on the
> > JobMaster/JobManager to allow external systems to get push notifications
> > about submitted jobs.
> >
> > If they are ok with such as future, would you maybe be interested in
> > implementing it yourself, Wenhao?
> >
> > Best,
> > Aljoscha
> >
> > On 2020/09/28 11:14, 季文昊 wrote:
> > >Hi Aljoscha,
> > >
> > >Yes, that is not enough, since the `JobListener`s are called only once
> > when
> > >`excute()` or `executeAsync()` is called. And in order to sync the
> status,
> > >I also have to call `JobClient#getJobStatus` periodically.
> > >
> > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <al...@apache.org>
> > >wrote:
> > >
> > >> Hi,
> > >>
> > >> I understand from your email that
> > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> enought
> > >> for you because you want to be notified of changes on the cluster
> side,
> > >> correct? That is when the job status changes on the master.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> On 23.09.20 14:31, 季文昊 wrote:
> > >> > Hi there,
> > >> >
> > >> > I'm working on a Flink platform in my corp, which provides a service
> > to
> > >> > provision and manage multiple dedicated Flink clusters. The problem
> is
> > >> that
> > >> > we want to sync a job status without delay after its submission
> > through
> > >> our
> > >> > platform as long as it has been changed.
> > >> >
> > >> > Since we want to update this in-time and make our services
> stateless,
> > >> > pulling a job's status periodically is not a good solution. I do not
> > find
> > >> > any proper way to achieve this by letting a job manager push changes
> > >> > directly to our platform except changing the source code, which
> > registers
> > >> > an additional `JobStatusListener` in the method
> > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > >> >
> > >> > I wonder if we can enhance `JobStatusListener` a little bit so that
> a
> > >> Flink
> > >> > user can register his custom JobStatusListener at the startup.
> > >> >
> > >> > To be specific, we can have a `JobStatusListenerFactory` interface
> and
> > >> its
> > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > >> > the JobStatusListenerFactory will have the following method:
> > >> >   - JobStatusListener createJobStatusListener(Properties
> properties);
> > >> >
> > >> > Custom listeners will be created during the
> JobMaster#startScheduling
> > >> > method.
> > >> >
> > >> > If someone would like to implement his own JobStatusListener, he
> will
> > >> > package all the related classes into a standalone jar with a
> > >> >
> > >>
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > >> > file and place it under the `lib/` directory.
> > >> >
> > >> > In addition, I find that there is a Jira ticket similar to what I'm
> > >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> > >> anyone
> > >> > could help me move on this feature or give me some suggestions about
> > it.
> > >> >
> > >> > Thanks,
> > >> > Wenhao
> > >> >
> > >>
> > >>
> >
>


-- 
Best Regards

Jeff Zhang

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Till Rohrmann <tr...@apache.org>.
At the moment, this requirement has not come up very often. In general, I
am always a bit cautious when adding functionality which executes user code
in the JobManager because it can easily become a stability problem. On the
other hand, I can't think of a different solution other than polling the
job status atm.

Cheers,
Till

On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <al...@apache.org> wrote:

> Till or Chesnay (cc'ed), have you thought about adding a hook on the
> JobMaster/JobManager to allow external systems to get push notifications
> about submitted jobs.
>
> If they are ok with such as future, would you maybe be interested in
> implementing it yourself, Wenhao?
>
> Best,
> Aljoscha
>
> On 2020/09/28 11:14, 季文昊 wrote:
> >Hi Aljoscha,
> >
> >Yes, that is not enough, since the `JobListener`s are called only once
> when
> >`excute()` or `executeAsync()` is called. And in order to sync the status,
> >I also have to call `JobClient#getJobStatus` periodically.
> >
> >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <al...@apache.org>
> >wrote:
> >
> >> Hi,
> >>
> >> I understand from your email that
> >> `StreamExecutionEnvironment.registerJobListener()` would not be enought
> >> for you because you want to be notified of changes on the cluster side,
> >> correct? That is when the job status changes on the master.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 23.09.20 14:31, 季文昊 wrote:
> >> > Hi there,
> >> >
> >> > I'm working on a Flink platform in my corp, which provides a service
> to
> >> > provision and manage multiple dedicated Flink clusters. The problem is
> >> that
> >> > we want to sync a job status without delay after its submission
> through
> >> our
> >> > platform as long as it has been changed.
> >> >
> >> > Since we want to update this in-time and make our services stateless,
> >> > pulling a job's status periodically is not a good solution. I do not
> find
> >> > any proper way to achieve this by letting a job manager push changes
> >> > directly to our platform except changing the source code, which
> registers
> >> > an additional `JobStatusListener` in the method
> >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> >> >
> >> > I wonder if we can enhance `JobStatusListener` a little bit so that a
> >> Flink
> >> > user can register his custom JobStatusListener at the startup.
> >> >
> >> > To be specific, we can have a `JobStatusListenerFactory` interface and
> >> its
> >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> >> > the JobStatusListenerFactory will have the following method:
> >> >   - JobStatusListener createJobStatusListener(Properties properties);
> >> >
> >> > Custom listeners will be created during the JobMaster#startScheduling
> >> > method.
> >> >
> >> > If someone would like to implement his own JobStatusListener, he will
> >> > package all the related classes into a standalone jar with a
> >> >
> >>
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> >> > file and place it under the `lib/` directory.
> >> >
> >> > In addition, I find that there is a Jira ticket similar to what I'm
> >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> >> anyone
> >> > could help me move on this feature or give me some suggestions about
> it.
> >> >
> >> > Thanks,
> >> > Wenhao
> >> >
> >>
> >>
>

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Aljoscha Krettek <al...@apache.org>.
Till or Chesnay (cc'ed), have you thought about adding a hook on the 
JobMaster/JobManager to allow external systems to get push notifications 
about submitted jobs.

If they are ok with such as future, would you maybe be interested in 
implementing it yourself, Wenhao?

Best,
Aljoscha

On 2020/09/28 11:14, 季文昊 wrote:
>Hi Aljoscha,
>
>Yes, that is not enough, since the `JobListener`s are called only once when
>`excute()` or `executeAsync()` is called. And in order to sync the status,
>I also have to call `JobClient#getJobStatus` periodically.
>
>On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <al...@apache.org>
>wrote:
>
>> Hi,
>>
>> I understand from your email that
>> `StreamExecutionEnvironment.registerJobListener()` would not be enought
>> for you because you want to be notified of changes on the cluster side,
>> correct? That is when the job status changes on the master.
>>
>> Best,
>> Aljoscha
>>
>> On 23.09.20 14:31, 季文昊 wrote:
>> > Hi there,
>> >
>> > I'm working on a Flink platform in my corp, which provides a service to
>> > provision and manage multiple dedicated Flink clusters. The problem is
>> that
>> > we want to sync a job status without delay after its submission through
>> our
>> > platform as long as it has been changed.
>> >
>> > Since we want to update this in-time and make our services stateless,
>> > pulling a job's status periodically is not a good solution. I do not find
>> > any proper way to achieve this by letting a job manager push changes
>> > directly to our platform except changing the source code, which registers
>> > an additional `JobStatusListener` in the method
>> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
>> >
>> > I wonder if we can enhance `JobStatusListener` a little bit so that a
>> Flink
>> > user can register his custom JobStatusListener at the startup.
>> >
>> > To be specific, we can have a `JobStatusListenerFactory` interface and
>> its
>> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
>> > the JobStatusListenerFactory will have the following method:
>> >   - JobStatusListener createJobStatusListener(Properties properties);
>> >
>> > Custom listeners will be created during the JobMaster#startScheduling
>> > method.
>> >
>> > If someone would like to implement his own JobStatusListener, he will
>> > package all the related classes into a standalone jar with a
>> >
>> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
>> > file and place it under the `lib/` directory.
>> >
>> > In addition, I find that there is a Jira ticket similar to what I'm
>> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
>> anyone
>> > could help me move on this feature or give me some suggestions about it.
>> >
>> > Thanks,
>> > Wenhao
>> >
>>
>>

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by 季文昊 <pr...@gmail.com>.
Hi Aljoscha,

Yes, that is not enough, since the `JobListener`s are called only once when
`excute()` or `executeAsync()` is called. And in order to sync the status,
I also have to call `JobClient#getJobStatus` periodically.

On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I understand from your email that
> `StreamExecutionEnvironment.registerJobListener()` would not be enought
> for you because you want to be notified of changes on the cluster side,
> correct? That is when the job status changes on the master.
>
> Best,
> Aljoscha
>
> On 23.09.20 14:31, 季文昊 wrote:
> > Hi there,
> >
> > I'm working on a Flink platform in my corp, which provides a service to
> > provision and manage multiple dedicated Flink clusters. The problem is
> that
> > we want to sync a job status without delay after its submission through
> our
> > platform as long as it has been changed.
> >
> > Since we want to update this in-time and make our services stateless,
> > pulling a job's status periodically is not a good solution. I do not find
> > any proper way to achieve this by letting a job manager push changes
> > directly to our platform except changing the source code, which registers
> > an additional `JobStatusListener` in the method
> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> >
> > I wonder if we can enhance `JobStatusListener` a little bit so that a
> Flink
> > user can register his custom JobStatusListener at the startup.
> >
> > To be specific, we can have a `JobStatusListenerFactory` interface and
> its
> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > the JobStatusListenerFactory will have the following method:
> >   - JobStatusListener createJobStatusListener(Properties properties);
> >
> > Custom listeners will be created during the JobMaster#startScheduling
> > method.
> >
> > If someone would like to implement his own JobStatusListener, he will
> > package all the related classes into a standalone jar with a
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > file and place it under the `lib/` directory.
> >
> > In addition, I find that there is a Jira ticket similar to what I'm
> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> anyone
> > could help me move on this feature or give me some suggestions about it.
> >
> > Thanks,
> > Wenhao
> >
>
>

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I understand from your email that 
`StreamExecutionEnvironment.registerJobListener()` would not be enought 
for you because you want to be notified of changes on the cluster side, 
correct? That is when the job status changes on the master.

Best,
Aljoscha

On 23.09.20 14:31, 季文昊 wrote:
> Hi there,
> 
> I'm working on a Flink platform in my corp, which provides a service to
> provision and manage multiple dedicated Flink clusters. The problem is that
> we want to sync a job status without delay after its submission through our
> platform as long as it has been changed.
> 
> Since we want to update this in-time and make our services stateless,
> pulling a job's status periodically is not a good solution. I do not find
> any proper way to achieve this by letting a job manager push changes
> directly to our platform except changing the source code, which registers
> an additional `JobStatusListener` in the method
> `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> 
> I wonder if we can enhance `JobStatusListener` a little bit so that a Flink
> user can register his custom JobStatusListener at the startup.
> 
> To be specific, we can have a `JobStatusListenerFactory` interface and its
> corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> the JobStatusListenerFactory will have the following method:
>   - JobStatusListener createJobStatusListener(Properties properties);
> 
> Custom listeners will be created during the JobMaster#startScheduling
> method.
> 
> If someone would like to implement his own JobStatusListener, he will
> package all the related classes into a standalone jar with a
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> file and place it under the `lib/` directory.
> 
> In addition, I find that there is a Jira ticket similar to what I'm
> asking: FLINK-17104 but I do not see any comment or update yet. Hope anyone
> could help me move on this feature or give me some suggestions about it.
> 
> Thanks,
> Wenhao
>