You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by enrico canzonieri <ec...@gmail.com> on 2019/08/15 23:22:57 UTC

JobServer JobInvocation for Flink portable runner blocks on job completion

Hello,

while launching Flink jobs using the PortableRunner I noticed that jobs are
marked by Beam as state RUNNING before actually running on Flink. The issue
doesn't seem specific to the Flink Runner though:
https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90

I'd assume the TODO in the code is referring exactly to this issue. For
Flink specifically I guess that one of the problems is that jobs are
submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment
<https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361>.
So we essentially never return from the job submission, unless the job is
completed (never for streaming) or it's cancelled or failed. A possible
solution to this is to set the Flink client as detached and return a
JobSubmissionResult instead of a JobExecutionResult and then have a Future
or event loop that tracks the actual job execution and changes the job
state accordingly.

Playing around with the code it seems that this would be possible but it
could require a large code change, including possibly the duplication of
the entire Flink RemoteStreamEnvironment
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java>
code inside Beam to customize it even more than it is already.

Is there any ticket tracking this already?

Cheers,
Enrico

Re: JobServer JobInvocation for Flink portable runner blocks on job completion

Posted by enrico canzonieri <ec...@gmail.com>.
Despite not being directly related to the issue described in this thread, I
think that if we followed the approach of the Portable Jar described in
https://lists.apache.org/thread.html/2122928a0a5f678d475ec15af538eb7303f73557870af174b1fdef7e@%3Cdev.beam.apache.org%3E
we could solve this issue by simply removing the dependency on the job
server to start up Beam jobs.

On Fri, Aug 16, 2019 at 9:37 AM Thomas Weise <th...@apache.org> wrote:

> In our current deployment this is solved by a wrapper script that checks
> the Flink REST API and waits until the job has started or timeout.
>
> Even considering current Flink client API restrictions, the hack in
> JobInvocation is bad. The status should be set to running by the
> executor/runner, and there this can be handled depending on how the
> respective client API works.
>
> Thomas
>
> On Fri, Aug 16, 2019 at 7:46 AM enrico canzonieri <ec...@gmail.com>
> wrote:
>
>> I agree that the Job status is better checked from Flink itself. Our
>> model is similar to what Thomas described in the design docs shared with
>> this mailing list to run Beam on Flink in K8s. We launch a job server just
>> to submit the Beam job and then we start the job monitoring directly from
>> Flink.
>>
>> The problem I'm facing is that when submitting the beam job, the python
>> terminates immediately after calling pipeline.run(). In that moment, the
>> job is not yet running on Flink and it doesn't event appear on the Flink
>> Job Manager, our operator expects the job to appear in Flink immediately
>> after calling run (this is the behavior using the Flink CLI), so the
>> operator believes the job hasn't started yet (or submission has failed) and
>> tries to launch a new one. End result within few seconds we have two jobs
>> running instead of one.
>>
>> Looking at the code, I think that calling pipeline.run() should return
>> only when the job has actually transitioned in a state that is not
>> STARTING. I think it's ok for the method to hang if the job can't
>> transition successfully in any of the other states (RUNNING, DONE, FAILED,
>> CANCELLED). E.g. in case there are not enough resources in the cluster
>> it's ok for the job to be stuck in STARTING and eventually (after some
>> timeout) transition to FAILED.
>>
>> In our first iteration the workaround was to use a longer sleep to wait
>> for the job to potentially appear in Flink. I may be missing a better
>> workaround here, so please feel free to recommend better options.
>>
>> Cheers,
>> Enrico
>>
>> On Fri, Aug 16, 2019 at 6:57 AM Thomas Weise <th...@apache.org> wrote:
>>
>>> This gets into the gory details of the Flink client API and there is
>>> plenty room for improvement. There is relevant discussion happening on the
>>> Flink dev list right now.
>>>
>>> But before dwelling into that and how a workaround could look like in
>>> Beam, do you really want to rely on the status from the Beam job server? In
>>> our deployment we only use the job server to submit the pipeline and then
>>> terminate it. Any monitoring is based on Flink itself.
>>>
>>> You can obtain the job status through the Flink REST API and the metrics
>>> through Flink as well. To assert that a job was successfully launched, it
>>> isn't sufficient to check that it is RUNNING in any case. There is still
>>> the possibility that it never really starts due to resource issues,
>>> recovery from savepoint not completing and so on. To get an accurate
>>> "LAUNCHED OK" signal, it is necessary to check Flink metrics.
>>>
>>> Cheers,
>>> Thomas
>>>
>>>
>>>
>>> On Fri, Aug 16, 2019 at 3:03 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Hi Enrico,
>>>>
>>>> There is an old ticket for this:
>>>> https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
>>>> because submitting application "detached" is still possible using the
>>>> Flink CLI or in portable pipelines. It comes with some drawbacks that
>>>> you explained, e.g. inaccurate pipeline status, metrics retrieval.
>>>>
>>>> Things have improved since with respect to async client communication
>>>> with the introduction of RestClusterClient. The tricky part is still to
>>>> make this work across all deployment scenarios (local/remote). I think
>>>> there are two options:
>>>>
>>>> 1) Wrap the entire blocking execution of the Flink API and simply
>>>> propagate errors to the Beam application and update its status
>>>>
>>>> 2) Retrieve the JobGraph from the batch/streaming API. Submit the
>>>> JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
>>>> MiniCluster in case of local execution.
>>>>
>>>> On this note, for the next Flink version, there is a plan to add support
>>>> for submitting jobs async directly from the ExecutionEnvironment.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On 16.08.19 01:22, enrico canzonieri wrote:
>>>> > Hello,
>>>> >
>>>> > while launching Flink jobs using the PortableRunner I noticed that
>>>> jobs
>>>> > are marked by Beam as state RUNNING before actually running on Flink.
>>>> > The issue doesn't seem specific to the Flink Runner though:
>>>> >
>>>> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
>>>> >
>>>> > I'd assume the TODO in the code is referring exactly to this issue.
>>>> For
>>>> > Flink specifically I guess that one of the problems is that jobs are
>>>> > submitted using a blocking API from the
>>>> BeamFlinkRemoteStreamEnvironment
>>>> > <
>>>> https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361
>>>> >.
>>>> > So we essentially never return from the job submission, unless the job
>>>> > is completed (never for streaming) or it's cancelled or failed. A
>>>> > possible solution to this is to set the Flink client as detached and
>>>> > return a JobSubmissionResult instead of a JobExecutionResult and then
>>>> > have a Future or event loop that tracks the actual job execution and
>>>> > changes the job state accordingly.
>>>> >
>>>> > Playing around with the code it seems that this would be possible but
>>>> it
>>>> > could require a large code change, including possibly the duplication
>>>> of
>>>> > the entire Flink RemoteStreamEnvironment
>>>> > <
>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
>>>> >
>>>> > code inside Beam to customize it even more than it is already.
>>>> >
>>>> > Is there any ticket tracking this already?
>>>> >
>>>> > Cheers,
>>>> > Enrico
>>>>
>>>

Re: JobServer JobInvocation for Flink portable runner blocks on job completion

Posted by Thomas Weise <th...@apache.org>.
In our current deployment this is solved by a wrapper script that checks
the Flink REST API and waits until the job has started or timeout.

Even considering current Flink client API restrictions, the hack in
JobInvocation is bad. The status should be set to running by the
executor/runner, and there this can be handled depending on how the
respective client API works.

Thomas

On Fri, Aug 16, 2019 at 7:46 AM enrico canzonieri <ec...@gmail.com>
wrote:

> I agree that the Job status is better checked from Flink itself. Our model
> is similar to what Thomas described in the design docs shared with this
> mailing list to run Beam on Flink in K8s. We launch a job server just to
> submit the Beam job and then we start the job monitoring directly from
> Flink.
>
> The problem I'm facing is that when submitting the beam job, the python
> terminates immediately after calling pipeline.run(). In that moment, the
> job is not yet running on Flink and it doesn't event appear on the Flink
> Job Manager, our operator expects the job to appear in Flink immediately
> after calling run (this is the behavior using the Flink CLI), so the
> operator believes the job hasn't started yet (or submission has failed) and
> tries to launch a new one. End result within few seconds we have two jobs
> running instead of one.
>
> Looking at the code, I think that calling pipeline.run() should return
> only when the job has actually transitioned in a state that is not
> STARTING. I think it's ok for the method to hang if the job can't
> transition successfully in any of the other states (RUNNING, DONE, FAILED,
> CANCELLED). E.g. in case there are not enough resources in the cluster
> it's ok for the job to be stuck in STARTING and eventually (after some
> timeout) transition to FAILED.
>
> In our first iteration the workaround was to use a longer sleep to wait
> for the job to potentially appear in Flink. I may be missing a better
> workaround here, so please feel free to recommend better options.
>
> Cheers,
> Enrico
>
> On Fri, Aug 16, 2019 at 6:57 AM Thomas Weise <th...@apache.org> wrote:
>
>> This gets into the gory details of the Flink client API and there is
>> plenty room for improvement. There is relevant discussion happening on the
>> Flink dev list right now.
>>
>> But before dwelling into that and how a workaround could look like in
>> Beam, do you really want to rely on the status from the Beam job server? In
>> our deployment we only use the job server to submit the pipeline and then
>> terminate it. Any monitoring is based on Flink itself.
>>
>> You can obtain the job status through the Flink REST API and the metrics
>> through Flink as well. To assert that a job was successfully launched, it
>> isn't sufficient to check that it is RUNNING in any case. There is still
>> the possibility that it never really starts due to resource issues,
>> recovery from savepoint not completing and so on. To get an accurate
>> "LAUNCHED OK" signal, it is necessary to check Flink metrics.
>>
>> Cheers,
>> Thomas
>>
>>
>>
>> On Fri, Aug 16, 2019 at 3:03 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hi Enrico,
>>>
>>> There is an old ticket for this:
>>> https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
>>> because submitting application "detached" is still possible using the
>>> Flink CLI or in portable pipelines. It comes with some drawbacks that
>>> you explained, e.g. inaccurate pipeline status, metrics retrieval.
>>>
>>> Things have improved since with respect to async client communication
>>> with the introduction of RestClusterClient. The tricky part is still to
>>> make this work across all deployment scenarios (local/remote). I think
>>> there are two options:
>>>
>>> 1) Wrap the entire blocking execution of the Flink API and simply
>>> propagate errors to the Beam application and update its status
>>>
>>> 2) Retrieve the JobGraph from the batch/streaming API. Submit the
>>> JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
>>> MiniCluster in case of local execution.
>>>
>>> On this note, for the next Flink version, there is a plan to add support
>>> for submitting jobs async directly from the ExecutionEnvironment.
>>>
>>> Cheers,
>>> Max
>>>
>>> On 16.08.19 01:22, enrico canzonieri wrote:
>>> > Hello,
>>> >
>>> > while launching Flink jobs using the PortableRunner I noticed that jobs
>>> > are marked by Beam as state RUNNING before actually running on Flink.
>>> > The issue doesn't seem specific to the Flink Runner though:
>>> >
>>> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
>>> >
>>> > I'd assume the TODO in the code is referring exactly to this issue. For
>>> > Flink specifically I guess that one of the problems is that jobs are
>>> > submitted using a blocking API from the
>>> BeamFlinkRemoteStreamEnvironment
>>> > <
>>> https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361
>>> >.
>>> > So we essentially never return from the job submission, unless the job
>>> > is completed (never for streaming) or it's cancelled or failed. A
>>> > possible solution to this is to set the Flink client as detached and
>>> > return a JobSubmissionResult instead of a JobExecutionResult and then
>>> > have a Future or event loop that tracks the actual job execution and
>>> > changes the job state accordingly.
>>> >
>>> > Playing around with the code it seems that this would be possible but
>>> it
>>> > could require a large code change, including possibly the duplication
>>> of
>>> > the entire Flink RemoteStreamEnvironment
>>> > <
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
>>> >
>>> > code inside Beam to customize it even more than it is already.
>>> >
>>> > Is there any ticket tracking this already?
>>> >
>>> > Cheers,
>>> > Enrico
>>>
>>

Re: JobServer JobInvocation for Flink portable runner blocks on job completion

Posted by enrico canzonieri <ec...@gmail.com>.
I agree that the Job status is better checked from Flink itself. Our model
is similar to what Thomas described in the design docs shared with this
mailing list to run Beam on Flink in K8s. We launch a job server just to
submit the Beam job and then we start the job monitoring directly from
Flink.

The problem I'm facing is that when submitting the beam job, the python
terminates immediately after calling pipeline.run(). In that moment, the
job is not yet running on Flink and it doesn't event appear on the Flink
Job Manager, our operator expects the job to appear in Flink immediately
after calling run (this is the behavior using the Flink CLI), so the
operator believes the job hasn't started yet (or submission has failed) and
tries to launch a new one. End result within few seconds we have two jobs
running instead of one.

Looking at the code, I think that calling pipeline.run() should return only
when the job has actually transitioned in a state that is not STARTING. I
think it's ok for the method to hang if the job can't transition
successfully in any of the other states (RUNNING, DONE, FAILED, CANCELLED).
E.g. in case there are not enough resources in the cluster it's ok for the
job to be stuck in STARTING and eventually (after some timeout) transition
to FAILED.

In our first iteration the workaround was to use a longer sleep to wait for
the job to potentially appear in Flink. I may be missing a better
workaround here, so please feel free to recommend better options.

Cheers,
Enrico

On Fri, Aug 16, 2019 at 6:57 AM Thomas Weise <th...@apache.org> wrote:

> This gets into the gory details of the Flink client API and there is
> plenty room for improvement. There is relevant discussion happening on the
> Flink dev list right now.
>
> But before dwelling into that and how a workaround could look like in
> Beam, do you really want to rely on the status from the Beam job server? In
> our deployment we only use the job server to submit the pipeline and then
> terminate it. Any monitoring is based on Flink itself.
>
> You can obtain the job status through the Flink REST API and the metrics
> through Flink as well. To assert that a job was successfully launched, it
> isn't sufficient to check that it is RUNNING in any case. There is still
> the possibility that it never really starts due to resource issues,
> recovery from savepoint not completing and so on. To get an accurate
> "LAUNCHED OK" signal, it is necessary to check Flink metrics.
>
> Cheers,
> Thomas
>
>
>
> On Fri, Aug 16, 2019 at 3:03 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Hi Enrico,
>>
>> There is an old ticket for this:
>> https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
>> because submitting application "detached" is still possible using the
>> Flink CLI or in portable pipelines. It comes with some drawbacks that
>> you explained, e.g. inaccurate pipeline status, metrics retrieval.
>>
>> Things have improved since with respect to async client communication
>> with the introduction of RestClusterClient. The tricky part is still to
>> make this work across all deployment scenarios (local/remote). I think
>> there are two options:
>>
>> 1) Wrap the entire blocking execution of the Flink API and simply
>> propagate errors to the Beam application and update its status
>>
>> 2) Retrieve the JobGraph from the batch/streaming API. Submit the
>> JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
>> MiniCluster in case of local execution.
>>
>> On this note, for the next Flink version, there is a plan to add support
>> for submitting jobs async directly from the ExecutionEnvironment.
>>
>> Cheers,
>> Max
>>
>> On 16.08.19 01:22, enrico canzonieri wrote:
>> > Hello,
>> >
>> > while launching Flink jobs using the PortableRunner I noticed that jobs
>> > are marked by Beam as state RUNNING before actually running on Flink.
>> > The issue doesn't seem specific to the Flink Runner though:
>> >
>> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
>> >
>> > I'd assume the TODO in the code is referring exactly to this issue. For
>> > Flink specifically I guess that one of the problems is that jobs are
>> > submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment
>> > <
>> https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361
>> >.
>> > So we essentially never return from the job submission, unless the job
>> > is completed (never for streaming) or it's cancelled or failed. A
>> > possible solution to this is to set the Flink client as detached and
>> > return a JobSubmissionResult instead of a JobExecutionResult and then
>> > have a Future or event loop that tracks the actual job execution and
>> > changes the job state accordingly.
>> >
>> > Playing around with the code it seems that this would be possible but it
>> > could require a large code change, including possibly the duplication of
>> > the entire Flink RemoteStreamEnvironment
>> > <
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
>> >
>> > code inside Beam to customize it even more than it is already.
>> >
>> > Is there any ticket tracking this already?
>> >
>> > Cheers,
>> > Enrico
>>
>

Re: JobServer JobInvocation for Flink portable runner blocks on job completion

Posted by Thomas Weise <th...@apache.org>.
This gets into the gory details of the Flink client API and there is plenty
room for improvement. There is relevant discussion happening on the Flink
dev list right now.

But before dwelling into that and how a workaround could look like in Beam,
do you really want to rely on the status from the Beam job server? In our
deployment we only use the job server to submit the pipeline and then
terminate it. Any monitoring is based on Flink itself.

You can obtain the job status through the Flink REST API and the metrics
through Flink as well. To assert that a job was successfully launched, it
isn't sufficient to check that it is RUNNING in any case. There is still
the possibility that it never really starts due to resource issues,
recovery from savepoint not completing and so on. To get an accurate
"LAUNCHED OK" signal, it is necessary to check Flink metrics.

Cheers,
Thomas



On Fri, Aug 16, 2019 at 3:03 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Enrico,
>
> There is an old ticket for this:
> https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
> because submitting application "detached" is still possible using the
> Flink CLI or in portable pipelines. It comes with some drawbacks that
> you explained, e.g. inaccurate pipeline status, metrics retrieval.
>
> Things have improved since with respect to async client communication
> with the introduction of RestClusterClient. The tricky part is still to
> make this work across all deployment scenarios (local/remote). I think
> there are two options:
>
> 1) Wrap the entire blocking execution of the Flink API and simply
> propagate errors to the Beam application and update its status
>
> 2) Retrieve the JobGraph from the batch/streaming API. Submit the
> JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
> MiniCluster in case of local execution.
>
> On this note, for the next Flink version, there is a plan to add support
> for submitting jobs async directly from the ExecutionEnvironment.
>
> Cheers,
> Max
>
> On 16.08.19 01:22, enrico canzonieri wrote:
> > Hello,
> >
> > while launching Flink jobs using the PortableRunner I noticed that jobs
> > are marked by Beam as state RUNNING before actually running on Flink.
> > The issue doesn't seem specific to the Flink Runner though:
> >
> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
> >
> > I'd assume the TODO in the code is referring exactly to this issue. For
> > Flink specifically I guess that one of the problems is that jobs are
> > submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment
> > <
> https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361
> >.
> > So we essentially never return from the job submission, unless the job
> > is completed (never for streaming) or it's cancelled or failed. A
> > possible solution to this is to set the Flink client as detached and
> > return a JobSubmissionResult instead of a JobExecutionResult and then
> > have a Future or event loop that tracks the actual job execution and
> > changes the job state accordingly.
> >
> > Playing around with the code it seems that this would be possible but it
> > could require a large code change, including possibly the duplication of
> > the entire Flink RemoteStreamEnvironment
> > <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
> >
> > code inside Beam to customize it even more than it is already.
> >
> > Is there any ticket tracking this already?
> >
> > Cheers,
> > Enrico
>

Re: JobServer JobInvocation for Flink portable runner blocks on job completion

Posted by Maximilian Michels <mx...@apache.org>.
Hi Enrico,

There is an old ticket for this:
https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
because submitting application "detached" is still possible using the
Flink CLI or in portable pipelines. It comes with some drawbacks that
you explained, e.g. inaccurate pipeline status, metrics retrieval.

Things have improved since with respect to async client communication
with the introduction of RestClusterClient. The tricky part is still to
make this work across all deployment scenarios (local/remote). I think
there are two options:

1) Wrap the entire blocking execution of the Flink API and simply
propagate errors to the Beam application and update its status

2) Retrieve the JobGraph from the batch/streaming API. Submit the
JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
MiniCluster in case of local execution.

On this note, for the next Flink version, there is a plan to add support
for submitting jobs async directly from the ExecutionEnvironment.

Cheers,
Max

On 16.08.19 01:22, enrico canzonieri wrote:
> Hello,
> 
> while launching Flink jobs using the PortableRunner I noticed that jobs
> are marked by Beam as state RUNNING before actually running on Flink.
> The issue doesn't seem specific to the Flink Runner though:
> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
> 
> I'd assume the TODO in the code is referring exactly to this issue. For
> Flink specifically I guess that one of the problems is that jobs are
> submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment
> <https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361>.
> So we essentially never return from the job submission, unless the job
> is completed (never for streaming) or it's cancelled or failed. A
> possible solution to this is to set the Flink client as detached and
> return a JobSubmissionResult instead of a JobExecutionResult and then
> have a Future or event loop that tracks the actual job execution and
> changes the job state accordingly.
> 
> Playing around with the code it seems that this would be possible but it
> could require a large code change, including possibly the duplication of
> the entire Flink RemoteStreamEnvironment
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java>
> code inside Beam to customize it even more than it is already.
> 
> Is there any ticket tracking this already?
> 
> Cheers,
> Enrico