You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Libin Qin (Jira)" <ji...@apache.org> on 2020/08/03 10:17:00 UTC

[jira] [Created] (FLINK-18803) JobGraph cannot be GC when submit via RemoteStreamEnvironment in attach mode

Libin Qin created FLINK-18803:
---------------------------------

             Summary: JobGraph cannot be GC when submit via RemoteStreamEnvironment in attach mode 
                 Key: FLINK-18803
                 URL: https://issues.apache.org/jira/browse/FLINK-18803
             Project: Flink
          Issue Type: Bug
          Components: Client / Job Submission
    Affects Versions: 1.7.2
            Reporter: Libin Qin
         Attachments: image-2020-08-03-18-01-56-062.png, image-2020-08-03-18-02-22-519.png, image-2020-08-03-18-03-29-748.png, image-2020-08-03-18-08-50-811.png, image-2020-08-03-18-10-08-353.png, image-2020-08-03-18-12-29-467.png

When submit job using  RemoteStreamEnvironment in attach mode. The client submission thread is blocked on "jobResultFuture.get()" in the "submitJob" method of RestClusterClient.java, it holds the local variable jobGraph, if the job is complex with lots of vertexs and edges or client submits quite a lot of jobs. The size of jobGraph become large and the client may OOM. I think there is no need for client to hold it.

The biggest objects of client heap  is as below ,The number of tasks of this job is more than 408

!image-2020-08-03-18-03-29-748.png!

 

!image-2020-08-03-18-08-50-811.png!

 

 

perhaps we can null out it after success of submission

 
{code:java}
//代码占位符

public JobSubmissionResult run(FlinkPlan compiledPlan,
      List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
      throws ProgramInvocationException {
   return submitJob(() -> getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings), classLoader);
}


public JobSubmissionResult submitJob(Supplier<JobGraph> jobGraphSupplier, ClassLoader classLoader) throws ProgramInvocationException {
   JobGraph jobGraph = jobGraphSupplier.get();
   JobID jobID = jobGraph.getJobID();
   log.info("Submitting job {} (detached: {}).", jobID, isDetached());

   final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
   JobSubmissionResult result;
   try {
      result = jobSubmissionFuture.get();
      //help GC
      jobGraph = null;
   } catch (Exception e) {
      throw new ProgramInvocationException("Could not submit job",
         jobID, ExceptionUtils.stripExecutionException(e));
   }

   if (isDetached()) {
      return result;
   } else {
      final CompletableFuture<JobResult> jobResultFuture = requestJobResult(jobID);
      final JobResult jobResult;
      try {
         jobResult = jobResultFuture.get();
      } catch (Exception e) {
         throw new ProgramInvocationException("Could not retrieve the execution result.",
            jobID, ExceptionUtils.stripExecutionException(e));
      }

      try {
         this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
         return lastJobExecutionResult;
      } catch (JobExecutionException e) {
         throw new ProgramInvocationException("Job failed.", jobID, e);
      } catch (IOException | ClassNotFoundException e) {
         throw new ProgramInvocationException("Job failed.", jobID, e);
      }
   }
}
{code}
 

we can see the job graph has been GC

!image-2020-08-03-18-10-08-353.png!

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)