You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Libin Qin (Jira)" <ji...@apache.org> on 2020/08/05 09:51:00 UTC

[jira] [Closed] (FLINK-18803) JobGraph cannot be GC when submit via RemoteStreamEnvironment in attach mode

     [ https://issues.apache.org/jira/browse/FLINK-18803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Libin Qin closed FLINK-18803.
-----------------------------
    Resolution: Won't Do

> JobGraph cannot be GC when submit via RemoteStreamEnvironment in attach mode 
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-18803
>                 URL: https://issues.apache.org/jira/browse/FLINK-18803
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.7.2
>            Reporter: Libin Qin
>            Priority: Minor
>         Attachments: image-2020-08-03-18-01-56-062.png, image-2020-08-03-18-02-22-519.png, image-2020-08-03-18-03-29-748.png, image-2020-08-03-18-08-50-811.png, image-2020-08-03-18-10-08-353.png, image-2020-08-03-18-12-29-467.png
>
>
> When submit job using  RemoteStreamEnvironment in attach mode. The client submission thread is blocked on "jobResultFuture.get()" in the "submitJob" method of RestClusterClient.java, it holds the local variable jobGraph, if the job is complex with lots of vertexs and edges or client submits quite a lot of jobs. The size of jobGraph become large and the client may OOM. I think there is no need for client to hold it.
> The biggest objects of client heap  is as below ,The number of tasks of this job is  408
> !image-2020-08-03-18-03-29-748.png!
>  
> !image-2020-08-03-18-08-50-811.png!
>  
>  
> perhaps we can null out it after success of submission
>  
> {code:java}
> //代码占位符
> public JobSubmissionResult run(FlinkPlan compiledPlan,
>       List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
>       throws ProgramInvocationException {
>    return submitJob(() -> getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings), classLoader);
> }
> public JobSubmissionResult submitJob(Supplier<JobGraph> jobGraphSupplier, ClassLoader classLoader) throws ProgramInvocationException {
>    JobGraph jobGraph = jobGraphSupplier.get();
>    JobID jobID = jobGraph.getJobID();
>    log.info("Submitting job {} (detached: {}).", jobID, isDetached());
>    final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
>    JobSubmissionResult result;
>    try {
>       result = jobSubmissionFuture.get();
>       //help GC
>       jobGraph = null;
>    } catch (Exception e) {
>       throw new ProgramInvocationException("Could not submit job",
>          jobID, ExceptionUtils.stripExecutionException(e));
>    }
>    if (isDetached()) {
>       return result;
>    } else {
>       final CompletableFuture<JobResult> jobResultFuture = requestJobResult(jobID);
>       final JobResult jobResult;
>       try {
>          jobResult = jobResultFuture.get();
>       } catch (Exception e) {
>          throw new ProgramInvocationException("Could not retrieve the execution result.",
>             jobID, ExceptionUtils.stripExecutionException(e));
>       }
>       try {
>          this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
>          return lastJobExecutionResult;
>       } catch (JobExecutionException e) {
>          throw new ProgramInvocationException("Job failed.", jobID, e);
>       } catch (IOException | ClassNotFoundException e) {
>          throw new ProgramInvocationException("Job failed.", jobID, e);
>       }
>    }
> }
> {code}
>  
> we can see the job graph has been GC
> !image-2020-08-03-18-10-08-353.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)