You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Niklas Semmler (Jira)" <ji...@apache.org> on 2022/01/12 21:33:00 UTC

[jira] [Commented] (FLINK-25277) Introduce explicit shutdown signalling between TaskManager and JobManager

    [ https://issues.apache.org/jira/browse/FLINK-25277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474973#comment-17474973 ] 

Niklas Semmler commented on FLINK-25277:
----------------------------------------

The [PR #18169 |https://github.com/apache/flink/pull/18169]introduces a minimal change to start the existing [TaskExecutor's onStop|https://github.com/apache/flink/blob/dbbf2a36111da1faea5c901e3b008cc788913bf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L444] method when a SIGTERM is received. This reduces the shutdown time from ~20 seconds (waiting for timeout) to less than one second.

In an infrequently occurring scenario, the new shutdown hook introduces a race condition in the YARNResourceManagerDriver. When the TaskExecutor disconnects from the [ResourceManager|https://github.com/apache/flink/blob/dbbf2a36111da1faea5c901e3b008cc788913bf8/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L494], it [releases the TaskExecutor's YARN container|https://github.com/apache/flink/blob/dbbf2a36111da1faea5c901e3b008cc788913bf8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java#L287]. After release attempt, it initates a [callback|https://github.com/apache/flink/blob/dbbf2a36111da1faea5c901e3b008cc788913bf8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java#L575]. When ResourceManager terminates at the same time (due to a SIGTERM), then it also [terminates the YARNResourceManagerDriver|https://github.com/apache/flink/blob/dbbf2a36111da1faea5c901e3b008cc788913bf8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java#L196]. If the release and termination process overlap, so that the termination starts before the container is fully released, YARN throws an exception.

To ensure that the container release process always completes before the termination process is started, I used a _java.util.concurrent.Phaser_ for synchronization. This only affects the rare cases where the two processes overlap.

The race condition occurred infrequently on Flink's Azure pipelines. I was unable to replicate it locally. However, after the addition of the synchronization mechanism, I did not see the race condition in about 100 executions.

Note, that the issue is somewhat superficial: By default, YARN issues a SIGKILL 250ms after the SIGTERM ([here|[https://hadoop.apache.org/docs/r2.8.5/hadoop-yarn/hadoop-yarn-common/yarn-default.xml]])

> Introduce explicit shutdown signalling between TaskManager and JobManager 
> --------------------------------------------------------------------------
>
>                 Key: FLINK-25277
>                 URL: https://issues.apache.org/jira/browse/FLINK-25277
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.13.0, 1.14.0
>            Reporter: Niklas Semmler
>            Assignee: Niklas Semmler
>            Priority: Major
>              Labels: pull-request-available, reactive
>             Fix For: 1.15.0
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> We need to introduce shutdown signalling between TaskManager and JobManager for fast & graceful shutdown in reactive scheduler mode.
> In Flink 1.14 and earlier versions, the JobManager tracks the availability of a TaskManager using a hearbeat. This heartbeat is by default configured with an interval of 10 seconds and a timeout of 50 seconds [1]. Hence, the shutdown of a TaskManager is recognized only after about 50-60 seconds. This works fine for the static scheduling mode, where a TaskManager only disappears as part of a cluster shutdown or a job failure. However, in the reactive scheduler mode (FLINK-10407), TaskManagers are regularly added and removed from a running job. Here, the heartbeat-mechanisms incurs additional delays.
> To remove these delays, we add an explicit shutdown signal from the TaskManager to the JobManager.
>  
> [1]https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout



--
This message was sent by Atlassian Jira
(v8.20.1#820001)