You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2019/07/01 14:49:47 UTC

Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

Hi Anyang,

as far as I can tell, FLINK-10868 has not been merged into Flink yet. Thus,
I cannot tell much about how well it works. The case you are describing
should be properly handled in a version which get's merged though. I guess
what needs to happen is that once the JM reconnects to the RM it should
synchronize the pending slot requests with the registered slot requests on
the RM. But this should be a follow up issue to FLINK-10868, because it
would widen the scope too much.

Cheers,
Till

On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu <hu...@gmail.com> wrote:

> Hi ZhenQiu && Rohrmann:
>
> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
> batch jobs) can be exited immediately after applying for the failed
> container to the upper limit, but there are still some jobs cannot be
> exited immediately. Through the log, it is observed that these jobs have
> the job manager timed out first for unknown reasons. The execution of code
> segment 1 is after the job manager timed out but before the job manager is
> reconnected, so it is suspected that the job manager is out of
> synchronization and notifyAllocationFailure() method in the code segment 2
> is not executed.
>
>
> I'm wandering if you have encountered similar problems and is there a
> solution? In order to solve the problem that cannot be immediately quit, it
> is currently considered that if (jobManagerRegistration==null) then
> executes the onFatalError() method to immediately exit the process, it is
> temporarily unclear whether this violent practice will have any side
> effects.
>
>
> Thanks,
> Anyang
>
>
> code segment 1 in ResourceManager.java:
>
> private void cancelAllPendingSlotRequests(Exception cause) {
>    slotManager.cancelAllPendingSlotRequests(cause);
> }
>
>
> code segment 2 in ResourceManager.java:
>
> @Override
> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
>    validateRunsInMainThread();
>    log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
>
>    JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
>    if (jobManagerRegistration != null) {
>       jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
>    }
> }
>
>

Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

Posted by Anyang Hu <hu...@gmail.com>.
Thanks for your replies.

To Peter:
The heartbeat.timeout has been increased to 3 minutes before, but the job
manager timeout will still occur. At present, the following logic is added
: When JM times out, onFatalError is called, which can ensure that the job
fails to exit quickly. Does the method have side effects?


@Override
public void notifyAllocationFailure(JobID jobId, AllocationID
allocationId, Exception cause) {
   validateRunsInMainThread();
   log.info("Slot request with allocation id {} for job {} failed.",
allocationId, jobId, cause);

   JobManagerRegistration jobManagerRegistration =
jobManagerRegistrations.get(jobId);
   if (jobManagerRegistration != null) {
      jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
cause);
   } else {
      ResourceManagerException exception = new
ResourceManagerException("Job Manager is lost, can not notify
allocation failure.");
      onFatalError(exception);
   }
}



Yours,
Anyang


Peter Huang <hu...@gmail.com> 于2019年7月2日周二 上午2:43写道:

> Hi Anyang,
>
> Thanks for rising the question. I didn't test the PR in batch mode, the
> observation helps me to have better implementation. From my understanding,
> if rm to a job manager heartbeat timeout, the job manager connection will
> be closed, so it will not be reconnected. Are you running batch job in per
> job cluster or session cluster? To temporarily mitigate the issue you are
> facing, you probable can tune the heartbeat.timecout (default 50s) to a
> larger value.
>
>
> Best Regards
> Peter Huang
>
> On Mon, Jul 1, 2019 at 7:50 AM Till Rohrmann <tr...@apache.org> wrote:
>
>> Hi Anyang,
>>
>> as far as I can tell, FLINK-10868 has not been merged into Flink yet.
>> Thus, I cannot tell much about how well it works. The case you are
>> describing should be properly handled in a version which get's merged
>> though. I guess what needs to happen is that once the JM reconnects to the
>> RM it should synchronize the pending slot requests with the registered slot
>> requests on the RM. But this should be a follow up issue to FLINK-10868,
>> because it would widen the scope too much.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu <hu...@gmail.com>
>> wrote:
>>
>>> Hi ZhenQiu && Rohrmann:
>>>
>>> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
>>> batch jobs) can be exited immediately after applying for the failed
>>> container to the upper limit, but there are still some jobs cannot be
>>> exited immediately. Through the log, it is observed that these jobs have
>>> the job manager timed out first for unknown reasons. The execution of code
>>> segment 1 is after the job manager timed out but before the job manager is
>>> reconnected, so it is suspected that the job manager is out of
>>> synchronization and notifyAllocationFailure() method in the code segment 2
>>> is not executed.
>>>
>>>
>>> I'm wandering if you have encountered similar problems and is there a
>>> solution? In order to solve the problem that cannot be immediately quit, it
>>> is currently considered that if (jobManagerRegistration==null) then
>>> executes the onFatalError() method to immediately exit the process, it is
>>> temporarily unclear whether this violent practice will have any side
>>> effects.
>>>
>>>
>>> Thanks,
>>> Anyang
>>>
>>>
>>> code segment 1 in ResourceManager.java:
>>>
>>> private void cancelAllPendingSlotRequests(Exception cause) {
>>>    slotManager.cancelAllPendingSlotRequests(cause);
>>> }
>>>
>>>
>>> code segment 2 in ResourceManager.java:
>>>
>>> @Override
>>> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
>>>    validateRunsInMainThread();
>>>    log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
>>>
>>>    JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
>>>    if (jobManagerRegistration != null) {
>>>       jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
>>>    }
>>> }
>>>
>>>

Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

Posted by Peter Huang <hu...@gmail.com>.
Hi Anyang,

Thanks for rising the question. I didn't test the PR in batch mode, the
observation helps me to have better implementation. From my understanding,
if rm to a job manager heartbeat timeout, the job manager connection will
be closed, so it will not be reconnected. Are you running batch job in per
job cluster or session cluster? To temporarily mitigate the issue you are
facing, you probable can tune the heartbeat.timecout (default 50s) to a
larger value.


Best Regards
Peter Huang

On Mon, Jul 1, 2019 at 7:50 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Anyang,
>
> as far as I can tell, FLINK-10868 has not been merged into Flink yet.
> Thus, I cannot tell much about how well it works. The case you are
> describing should be properly handled in a version which get's merged
> though. I guess what needs to happen is that once the JM reconnects to the
> RM it should synchronize the pending slot requests with the registered slot
> requests on the RM. But this should be a follow up issue to FLINK-10868,
> because it would widen the scope too much.
>
> Cheers,
> Till
>
> On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu <hu...@gmail.com> wrote:
>
>> Hi ZhenQiu && Rohrmann:
>>
>> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
>> batch jobs) can be exited immediately after applying for the failed
>> container to the upper limit, but there are still some jobs cannot be
>> exited immediately. Through the log, it is observed that these jobs have
>> the job manager timed out first for unknown reasons. The execution of code
>> segment 1 is after the job manager timed out but before the job manager is
>> reconnected, so it is suspected that the job manager is out of
>> synchronization and notifyAllocationFailure() method in the code segment 2
>> is not executed.
>>
>>
>> I'm wandering if you have encountered similar problems and is there a
>> solution? In order to solve the problem that cannot be immediately quit, it
>> is currently considered that if (jobManagerRegistration==null) then
>> executes the onFatalError() method to immediately exit the process, it is
>> temporarily unclear whether this violent practice will have any side
>> effects.
>>
>>
>> Thanks,
>> Anyang
>>
>>
>> code segment 1 in ResourceManager.java:
>>
>> private void cancelAllPendingSlotRequests(Exception cause) {
>>    slotManager.cancelAllPendingSlotRequests(cause);
>> }
>>
>>
>> code segment 2 in ResourceManager.java:
>>
>> @Override
>> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
>>    validateRunsInMainThread();
>>    log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
>>
>>    JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
>>    if (jobManagerRegistration != null) {
>>       jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause);
>>    }
>> }
>>
>>