You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Wenlong Lyu (Jira)" <ji...@apache.org> on 2020/02/25 14:01:00 UTC

[jira] [Updated] (FLINK-16279) Per job Yarn application leak in normal execution mode.

     [ https://issues.apache.org/jira/browse/FLINK-16279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenlong Lyu updated FLINK-16279:
--------------------------------
    Description: 
I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed.

After some research on the code, I found that:

when running in attached mode, MiniDispatcher will never set {{shutDownfuture}} before received a request from job client. 
{code}
		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
			// terminate the MiniDispatcher once we served the first JobResult successfully
			jobResultFuture.thenAccept((JobResult result) -> {
				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;

				LOG.debug("Shutting down per-job cluster because someone retrieved the job result.");
				shutDownFuture.complete(status);
			});
		} 
{code}

However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again.

  was:
I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed.

After some research on the code, I found that:

when running in attached mode, MiniDispatcher will neve set {{shutDownfuture}} before received a request from job client. 
{code}
		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
			// terminate the MiniDispatcher once we served the first JobResult successfully
			jobResultFuture.thenAccept((JobResult result) -> {
				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;

				LOG.debug("Shutting down per-job cluster because someone retrieved the job result.");
				shutDownFuture.complete(status);
			});
		} 
{code}

However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again.


> Per job Yarn application leak in normal execution mode.
> -------------------------------------------------------
>
>                 Key: FLINK-16279
>                 URL: https://issues.apache.org/jira/browse/FLINK-16279
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Wenlong Lyu
>            Priority: Major
>
> I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed.
> After some research on the code, I found that:
> when running in attached mode, MiniDispatcher will never set {{shutDownfuture}} before received a request from job client. 
> {code}
> 		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
> 			// terminate the MiniDispatcher once we served the first JobResult successfully
> 			jobResultFuture.thenAccept((JobResult result) -> {
> 				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
> 						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
> 				LOG.debug("Shutting down per-job cluster because someone retrieved the job result.");
> 				shutDownFuture.complete(status);
> 			});
> 		} 
> {code}
> However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)