You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (JIRA)" <ji...@apache.org> on 2019/07/30 17:27:00 UTC

[jira] [Comment Edited] (FLINK-13487) TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis

    [ https://issues.apache.org/jira/browse/FLINK-13487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16896346#comment-16896346 ] 

Yun Gao edited comment on FLINK-13487 at 7/30/19 5:26 PM:
----------------------------------------------------------

Very thanks [~SleePy] and [~zjwang] for the help at this issue! A more detailed cause of this issue is described as follows.

From the logs recorded in the _error_log_  attached, we can see that:
 # The TM registers to JM for two times.
 # When the task is submitted, the jobManagerTable does not contains the corresponding JobId, and thus cause the test failed.

The basic process of the test is:

 
{code:java}
// Step 1 
taskManagerServices.getJobLeaderService().addJob(jobId, jobMasterAddress); 

// Step 2
jobManagerLeaderRetriever.notifyListener(jobMasterAddress, UUID.randomUUID());

// Step 3 
taskExecutorGateway.requestSlot() 

// Step 4 
taskExecutorGateway.submitTask(){code}
 

The two registration is triggered by step 1/2 and step 3 respectively. However, since the registration is asynchronous, the submit task is executed before the two registrations get finished. To fix this problem, we need to postpone the submitting till the slot is offered to JM. In real case this is guaranteed by JM not start deploying the task before the slot offer succeed.

 

A more detailed description is as follows for reference:

In step 1, the job id is added to the JobLeaderService of the TM side, and it triggers the leader retrieve process. Since the leader retrieve service is an instance of SettableLeaderRetrieveService and by now the address has not been set, the leader retrieve will not take effects.

In step 2, the SettableLeaderRetrieveService is notified with the actual JobMaster address and then TM starts to register to JM. Part of the registration should be done asynchronously by the execution thread pool of the RpcService. However, in this case the asynchronous part might takes longer time than usual.

In step 3, the request slot allocates the slot first, then TM found that the registration has not succeeded. Then is call jobLeaderService#addJob again. Since now the JM address has been set, it will trigger the second registration. The registration is also executed asynchronously and the request will return successfully.

In step 4, the submit started before the registration succeeded. Then the error occurred.

The the finally part in the test code executes and shutdown the TM, the the tests end and the original submit failing exception was thrown. This is the reason that the exception is printed after the TM stopped. At this time, the Akka thread of TM continued to run since the exception in the akka thread is caught, and one of the registration finally succeed and it triggers the slot offering again. This is why we can also see the offering slot event in the error log. 

 The above exception could reoccur stably if we add a sleep in the RetryingRegistration#register, before the call of "completionFuture.complete(Tuple2.of(gateway, success))" in the asynchronous executor thread. We can also see the two jobLeaderService#addJob calls in this case. If we also add sleep to the finally part of the test method, we can also see the offer slot event after the submitting failure.

 

 


was (Author: gaoyunhaii):
Very thanks [~SleePy] and [~zjwang] for the help at this issue! A more detailed cause of this issue is described as follows.

From the logs recorded in the _error_log_  attached, we can see that:
 # The TM registers to JM for two times.
 # When the task is submitted, the jobManagerTable does not contains the corresponding JobId, and thus cause the test failed.

The basic process of the test is:

 
{code:java}
// Step 1 
taskManagerServices.getJobLeaderService().addJob(jobId, jobMasterAddress); 

// Step 2
jobManagerLeaderRetriever.notifyListener(jobMasterAddress, UUID.randomUUID());

// Step 3 
taskExecutorGateway.requestSlot() 

// Step 4 
taskExecutorGateway.submitTask(){code}  {code}
 

The two registration is triggered by step 1/2 and step 3 respectively. However, since the registration is asynchronous, the submit task is executed before the two registrations get finished. To fix this problem, we need to postpone the submitting till the slot is offered to JM. In real case this is guaranteed by JM not start deploying the task before the slot offer succeed.

 

A more detailed description is as follows for reference:

In step 1, the job id is added to the JobLeaderService of the TM side, and it triggers the leader retrieve process. Since the leader retrieve service is an instance of SettableLeaderRetrieveService and by now the address has not been set, the leader retrieve will not take effects.

In step 2, the SettableLeaderRetrieveService is notified with the actual JobMaster address and then TM starts to register to JM. Part of the registration should be done asynchronously by the execution thread pool of the RpcService. However, in this case the asynchronous part might takes longer time than usual.

In step 3, the request slot allocates the slot first, then TM found that the registration has not succeeded. Then is call jobLeaderService#addJob again. Since now the JM address has been set, it will trigger the second registration. The registration is also executed asynchronously and the request will return successfully.

In step 4, the submit started before the registration succeeded. Then the error occurred.

The the finally part in the test code executes and shutdown the TM, the the tests end and the original submit failing exception was thrown. This is the reason that the exception is printed after the TM stopped. At this time, the Akka thread of TM continued to run since the exception in the akka thread is caught, and one of the registration finally succeed and it triggers the slot offering again. This is why we can also see the offering slot event in the error log. 

 The above exception could reoccur stably if we add a sleep in the RetryingRegistration#register, before the call of "completionFuture.complete(Tuple2.of(gateway, success))" in the asynchronous executor thread. We can also see the two jobLeaderService#addJob calls in this case. If we also add sleep to the finally part of the test method, we can also see the offer slot event after the submitting failure.

 

 

> TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall failed on Travis
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-13487
>                 URL: https://issues.apache.org/jira/browse/FLINK-13487
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task, Tests
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Yun Gao
>            Priority: Blocker
>             Fix For: 1.9.0
>
>         Attachments: error_log.png
>
>
> https://api.travis-ci.org/v3/job/564925114/log.txt
> {code}
> 21:14:47.090 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 5.754 s <<< FAILURE! - in org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
> 21:14:47.090 [ERROR] testPartitionReleaseAfterReleaseCall(org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest)  Time elapsed: 0.136 s  <<< ERROR!
> java.util.concurrent.ExecutionException: org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: Could not submit task because there is no JobManager associated for the job 2a0ab40cb53241799b71ff6fd2f53d3d.
> 	at org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionRelease(TaskExecutorPartitionLifecycleTest.java:331)
> 	at org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest.testPartitionReleaseAfterReleaseCall(TaskExecutorPartitionLifecycleTest.java:201)
> Caused by: org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: Could not submit task because there is no JobManager associated for the job 2a0ab40cb53241799b71ff6fd2f53d3d.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)