You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (Jira)" <ji...@apache.org> on 2020/07/22 21:19:00 UTC

[jira] [Comment Edited] (FLINK-18451) Flink HA on yarn may appear TaskManager double running when HA is restored

    [ https://issues.apache.org/jira/browse/FLINK-18451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163075#comment-17163075 ] 

Till Rohrmann edited comment on FLINK-18451 at 7/22/20, 9:18 PM:
-----------------------------------------------------------------

I think one problem of the described problem is the abnormal ZooKeeper connection. We don't invalidate the current leader in {{ZooKeeperLeaderRetrievalService}} if the ZooKeeper connection becomes suspended. This could explain why the {{TaskManager}} does not stop the running tasks. This is definitely something we should investigate and then also fix if it is indeed a problem.

Concerning the data consumption I am not entirely sure whether I fully understand the problematic scenario. Are you concerned about multiple Flink tasks reading from an upstream Flink task or about multiple sources reading from an external system? The former scenario should not be possible because after a restart, the producers will be deployed with a different {{ExecutionAttemptID}} which will prevent old tasks from reading their data. For the latter case, the external system Flink reads from needs to be replayable anyway to support at-least-once or higher processing guarantees. Hence, I believe that this should not be a problem.


was (Author: till.rohrmann):
I think one problem of the described problem is the abnormal ZooKeeper connection. We don't invalidate the current leader in {{ZooKeeperLeaderRetrievalService}} if the ZooKeeper connection becomes suspended. This could explain why the {{TaskManager}} does not stop the running tasks. This is definitely something we should investigate and then also fix if it is indeed a problem.

Concerning the data consumption I am not entirely sure whether I fully understand the problematic scenario. Are you concerned about multiple Flink tasks reading from an upstream Flink task or about multiple sources reading from an external system? The former scenario should not be possible because after a restart, the producers will be deployed with a different {{ExecutionAttemptID}} which will prevent old tasks from reading their data. For the latter case, the external system Flink reads from needs to be replayable anyway to support at-least-once or higher processing guarantees. Hence, I believe that this should not be a problem.

> Flink HA on yarn may appear TaskManager double running when HA is restored
> --------------------------------------------------------------------------
>
>                 Key: FLINK-18451
>                 URL: https://issues.apache.org/jira/browse/FLINK-18451
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / YARN
>    Affects Versions: 1.9.0
>            Reporter: ming li
>            Priority: Major
>              Labels: high-availability
>
> We found that when NodeManager is lost, the new JobManager will be restored by Yarn's ResourceManager, and the Leader node will be registered on Zookeeper. The original TaskManager will find the new JobManager through Zookeeper and close the old JobManager connection. At this time, all tasks of the TaskManager will fail. The new JobManager will directly perform job recovery and recover from the latest checkpoint.
> However, during the recovery process, when a TaskManager is abnormally connected to Zookeeper, it is not registered with the new JobManager in time. Before the following timeout:
> 1. Connect with Zookeeper
> 2. Heartbeat with JobManager/ResourceManager
> Task will continue to run (assuming that Task can run independently in TaskManager). Assuming that HA recovers fast enough, some Task double runs will occur at this time.
> Do we need to make a persistent record of the cluster resources we allocated during the runtime, and use it to judge all Task stops when HA is restored?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)