You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yang Wang <da...@gmail.com> on 2020/10/01 00:31:29 UTC

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2. Yes. This is exactly what I mean. Storing the HA information relevant to
a specific component in a single ConfigMap and ensuring that “Get(check the
leader)-and-Update(write back to the ConfigMap)” is a transactional
operation. Since we only store the job graph stateHandler(not the real
data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
ConfigMap(the biggest one with multiple jobs). I roughly calculate that
could we have more than 1000 Flink jobs in a Flink session cluster.

3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
could provide at most one semantics if no manually force-deletion
happened[1]. Based on the previous discussion, we have successfully avoided
the "lock-and-release" in the implementation. So I still insist on using
the current Deployment.


[1].
https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:

> Thanks for the clarifications Yang Wang.
>
> 2. Keeping the HA information relevant for a component (Dispatcher,
> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
> check that we don't exceed the 1 MB size limit with this approach though.
> The Dispatcher's ConfigMap would then contain the current leader, the
> running jobs and the pointers to the persisted JobGraphs. The JobManager's
> ConfigMap would then contain the current leader, the pointers to the
> checkpoints and the checkpoint ID counter, for example.
>
> 3. Ah ok, I somehow thought that K8s would give us stronger
> guarantees than Yarn in this regard. That's a pity.
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
>
>> Thanks for your explanation. It would be fine if only checking leadership
>> & actually write information is atomic.
>>
>> Best,
>> tison.
>>
>>
>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>
>>> Thanks till and tison for your comments.
>>>
>>> @Till Rohrmann <tr...@apache.org>
>>> 1. I am afraid we could not do this if we are going to use fabric8
>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>> client[1] also could not support it. Unless we implement a new
>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>> that we could gain too much from this.
>>>
>>> 2. Yes, the implementation will be a little complicated if we want to
>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>> your suggestion, another different solution has come into my mind. We could
>>> use a same ConfigMap storing the JobManager leader, job graph,
>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>> the HA meta storage. Then it will be easier to guarantee that only the
>>> leader could write the ConfigMap in a transactional operation. Since
>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>> transactional operation.
>>>
>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>> However, we still have the chances that two JobManager are running and
>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>> benefit is we do not need to implement a leader election/retrieval service.
>>>
>>> @tison
>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>> old leader could wrongly override the leader information. Once a JobManager
>>> want to write his leader information to the ConfigMap, it will check
>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>> written a different update while the client was in the process of
>>> performing its update.
>>>
>>>
>>> [1].
>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>> [2].
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>> [3].
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>
>>>> Hi,
>>>>
>>>> Generally +1 for a native k8s HA service.
>>>>
>>>> For leader election & publish leader information, there was a
>>>> discussion[1]
>>>> pointed out that since these two actions is NOT atomic, there will be
>>>> always
>>>> edge case where a previous leader overwrite leader information, even
>>>> with
>>>> versioned write. Versioned write helps on read again if version
>>>> mismatches
>>>> so if we want version write works, information in the kv pair should
>>>> help the
>>>> contender reflects whether it is the current leader.
>>>>
>>>> The idea of writes leader information on contender node or something
>>>> equivalent makes sense but the details depends on how it is implemented.
>>>> General problems are that
>>>>
>>>> 1. TM might be a bit late before it updated correct leader information
>>>> but
>>>> only if the leader election process is short and leadership is stable
>>>> at most
>>>> time, it won't be a serious issue.
>>>> 2. The process TM extract leader information might be a bit more complex
>>>> than directly watching a fixed key.
>>>>
>>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>>> txn
>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>> interface,
>>>> thought, provides only a self-consistent mechanism which doesn't promise
>>>> more consistency for extension.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1]
>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>
>>>>
>>>>
>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>
>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>> information directly when trying to obtain the leadership (trying to
>>>>> update
>>>>> the leader key with one's own value)? This might be a little detail,
>>>>> though.
>>>>>
>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>> ZooKeeper
>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>> implementation a bit, unfortunately.
>>>>>
>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>> the
>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>> benefit I
>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>> in
>>>>> the ConfigMaps using the annotations because we can be sure that there
>>>>> is
>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>> correctly.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi Till, thanks for your valuable feedback.
>>>>> >
>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>> same
>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>> annotation
>>>>> > update operation to the ConfigMap, it means that it has been elected
>>>>> as the
>>>>> > leader. And it will write the leader information in the callback of
>>>>> leader
>>>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>>>> > leader ConfigMap is wrongly updated.
>>>>> >
>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>> current
>>>>> > design, we could not guarantee that the node who tries to write his
>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>> > address this issue, we need to store all the owners of the key. Only
>>>>> when
>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>> graph) could
>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>> when
>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>> lock. To
>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>> to the
>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>> >
>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>> in my
>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>> StatefulSet to
>>>>> > avoid leader election and leader retrieval. But I am not sure whether
>>>>> > TaskManager could properly handle the situation that same hostname
>>>>> with
>>>>> > different IPs, because the JobManager failed and relaunched. Also we
>>>>> may
>>>>> > still have two JobManagers running in some corner cases(e.g. kubelet
>>>>> is
>>>>> > down but the pod is running). Another concern is we have a strong
>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>> But it
>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>> Moreover,
>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>> once. Since
>>>>> > the native HA proposal could cover all the functionality of
>>>>> StatefulSet
>>>>> > proposal, that's why I prefer the former.
>>>>> >
>>>>> >
>>>>> > [1].
>>>>> >
>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>> >
>>>>> > Best,
>>>>> > Yang
>>>>> >
>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>> >
>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>>>> users
>>>>> >> will like a ZooKeeper-less HA setup.
>>>>> >>
>>>>> >> +1 for not separating the leader information and the leader
>>>>> election if
>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>> leader
>>>>> >> information directly when trying to obtain the leadership by
>>>>> performing a
>>>>> >> versioned write operation.
>>>>> >>
>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>> there be
>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>> how can
>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>> leader
>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>> problem
>>>>> >> (we should probably change it at some point to simply use a
>>>>> >> transaction which checks whether the writer is still the leader) and
>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>> that
>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>> owner
>>>>> >> will then be responsible for the cleanup of the node.
>>>>> >>
>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>> because
>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>> key-value
>>>>> >> pairs it might be simpler to start with this approach where we only
>>>>> have
>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>> Was
>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>> >> generality)?
>>>>> >>
>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>> you
>>>>> >> already implemented a K8s based HA service.
>>>>> >>
>>>>> >> Cheers,
>>>>> >> Till
>>>>> >>
>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >>> Hi Xintong and Stephan,
>>>>> >>>
>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>> >>> comments inline.
>>>>> >>>
>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>> >>>
>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>> rest
>>>>> >>> server component) for the leader election
>>>>> >>> and storage. Once a JobManager win the election, it will update the
>>>>> >>> ConfigMap with leader address and periodically
>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>> update
>>>>> >>> the FLIP document, including the architecture diagram,
>>>>> >>> to avoid the misunderstanding.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > Lock and release
>>>>> >>>
>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>> will be
>>>>> >>> deleted by the ZK server automatically when
>>>>> >>> the client is timeout. It could happen in a bad network
>>>>> environment or
>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>> lock a
>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>> annotation
>>>>> >>> will be cleaned up when releasing the lock. When
>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>>>> >>> following conditions. If not, the delete operation could not be
>>>>> done.
>>>>> >>> * Current instance is the owner of the key.
>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>> released the
>>>>> >>> lock.
>>>>> >>> * The owner annotation timed out, which usually indicate the owner
>>>>> died.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > HA data clean up
>>>>> >>>
>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>> pods
>>>>> >>> to JobManager Deployment. So when we want to
>>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>>>> For
>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>> >>> so that they could be retained even though we delete the whole
>>>>> Flink
>>>>> >>> cluster.
>>>>> >>>
>>>>> >>>
>>>>> >>> [1].
>>>>> >>>
>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>> >>> [2].
>>>>> >>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>> >>>
>>>>> >>>
>>>>> >>> Best,
>>>>> >>> Yang
>>>>> >>>
>>>>> >>>
>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>> >>>
>>>>> >>>> This is a very cool feature proposal.
>>>>> >>>>
>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>> overly
>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>> than the
>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>> converge and the
>>>>> >>>> can be temporarily out of sync.
>>>>> >>>>
>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>> in the
>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>> token is
>>>>> >>>> stored as payload of the lock.
>>>>> >>>> I think for the design above it would mean having a single
>>>>> ConfigMap
>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>> >>>>
>>>>> >>>> This probably serves as a good design principle in general - not
>>>>> divide
>>>>> >>>> information that is updated together over different resources.
>>>>> >>>>
>>>>> >>>> Best,
>>>>> >>>> Stephan
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>> tonysong820@gmail.com>
>>>>> >>>> wrote:
>>>>> >>>>
>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>> >>>>>
>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>> reduce the
>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>>>> this is an
>>>>> >>>>> attractive feature for users.
>>>>> >>>>>
>>>>> >>>>> Concerning the proposed design, I have some questions. Might not
>>>>> be
>>>>> >>>>> problems, just trying to understand.
>>>>> >>>>>
>>>>> >>>>> ## Architecture
>>>>> >>>>>
>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>> contending
>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>> ConfigMaps are
>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>> becoming leader
>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>> leader's
>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > Lock and release
>>>>> >>>>>
>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>> lock so
>>>>> >>>>> that other peers can write/remove the stored object. What if the
>>>>> previous
>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>> Would there
>>>>> >>>>> be any problem?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > HA data clean up
>>>>> >>>>>
>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>> <ClusterID>`,
>>>>> >>>>> how are the HA dada retained?
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> Thank you~
>>>>> >>>>>
>>>>> >>>>> Xintong Song
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>> danrtsey.wy@gmail.com>
>>>>> >>>>> wrote:
>>>>> >>>>>
>>>>> >>>>>> Hi devs and users,
>>>>> >>>>>>
>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>> will
>>>>> >>>>>> introduce
>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>> >>>>>>
>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>> widely
>>>>> >>>>>> used
>>>>> >>>>>> in production environments. It could be integrated in standalone
>>>>> >>>>>> cluster,
>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>> in K8s
>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>> cluster.
>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>> >>>>>> election[2]
>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>>>> these
>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>> more
>>>>> >>>>>> convenient.
>>>>> >>>>>>
>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>> the new
>>>>> >>>>>> introduced KubernetesHaService.
>>>>> >>>>>>
>>>>> >>>>>> [1].
>>>>> >>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>> >>>>>> [2].
>>>>> >>>>>>
>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>> >>>>>> [3].
>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>> >>>>>>
>>>>> >>>>>> Looking forward to your feedback.
>>>>> >>>>>>
>>>>> >>>>>> Best,
>>>>> >>>>>> Yang
>>>>> >>>>>>
>>>>> >>>>>
>>>>>
>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
3. Make sense to me. And we could add a new HA solution "StatefulSet + PV +
FileSystem"
at any time if we need in the future.

Since there are no more open questions, I will start the voting now.
Thanks all for your comments and feedback. Feel feel to continue the
discussion if you get
other concerns.


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年10月1日周四 下午4:52写道:

> 3. We could avoid force deletions from within Flink. If the user does it,
> then we don't give guarantees.
>
> I am fine with your current proposal. +1 for moving forward with it.
>
> Cheers,
> Till
>
> On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:
>
> > 2. Yes. This is exactly what I mean. Storing the HA information relevant
> > to a specific component in a single ConfigMap and ensuring that
> “Get(check
> > the leader)-and-Update(write back to the ConfigMap)” is a transactional
> > operation. Since we only store the job graph stateHandler(not the real
> > data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> > ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> > could we have more than 1000 Flink jobs in a Flink session cluster.
> >
> > 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> > could provide at most one semantics if no manually force-deletion
> > happened[1]. Based on the previous discussion, we have successfully
> avoided
> > the "lock-and-release" in the implementation. So I still insist on using
> > the current Deployment.
> >
> >
> > [1].
> >
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
> >
> >
> > Best,
> > Yang
> >
> > Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
> >
> >> Thanks for the clarifications Yang Wang.
> >>
> >> 2. Keeping the HA information relevant for a component (Dispatcher,
> >> JobManager, ResourceManager) in a single ConfigMap sounds good. We
> should
> >> check that we don't exceed the 1 MB size limit with this approach
> though.
> >> The Dispatcher's ConfigMap would then contain the current leader, the
> >> running jobs and the pointers to the persisted JobGraphs. The
> JobManager's
> >> ConfigMap would then contain the current leader, the pointers to the
> >> checkpoints and the checkpoint ID counter, for example.
> >>
> >> 3. Ah ok, I somehow thought that K8s would give us stronger
> >> guarantees than Yarn in this regard. That's a pity.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
> >>
> >>> Thanks for your explanation. It would be fine if only checking
> >>> leadership & actually write information is atomic.
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
> >>>
> >>>> Thanks till and tison for your comments.
> >>>>
> >>>> @Till Rohrmann <tr...@apache.org>
> >>>> 1. I am afraid we could not do this if we are going to use fabric8
> >>>> Kubernetes client SDK for the leader election. The official
> Kubernetes Java
> >>>> client[1] also could not support it. Unless we implement a new
> >>>> LeaderElector in Flink based on the very basic Kubernetes API. But it
> seems
> >>>> that we could gain too much from this.
> >>>>
> >>>> 2. Yes, the implementation will be a little complicated if we want to
> >>>> completely eliminate the residual job graphs or checkpoints. Inspired
> by
> >>>> your suggestion, another different solution has come into my mind. We
> could
> >>>> use a same ConfigMap storing the JobManager leader, job graph,
> >>>> checkpoint-counter, checkpoint. Each job will have a specific
> ConfigMap for
> >>>> the HA meta storage. Then it will be easier to guarantee that only the
> >>>> leader could write the ConfigMap in a transactional operation. Since
> >>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> >>>> transactional operation.
> >>>>
> >>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
> >>>> However, we still have the chances that two JobManager are running and
> >>>> trying to get/delete a key in the same ConfigMap concurrently.
> Imagine that
> >>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
> >>>> could not be deleted. A new JobManager pod will be launched. We are
> just in
> >>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
> >>>> benefit is we do not need to implement a leader election/retrieval
> service.
> >>>>
> >>>> @tison
> >>>> Actually, I do not think we will have such issue in the Kubernetes HA
> >>>> service. In the Kubernetes LeaderElector[2], we have the leader
> information
> >>>> stored on the annotation of leader ConfigMap. So it would not happen
> the
> >>>> old leader could wrongly override the leader information. Once a
> JobManager
> >>>> want to write his leader information to the ConfigMap, it will check
> >>>> whether it is the leader now. If not, anything will happen. Moreover,
> the
> >>>> Kubernetes Resource Version[3] ensures that no one else has snuck in
> and
> >>>> written a different update while the client was in the process of
> >>>> performing its update.
> >>>>
> >>>>
> >>>> [1].
> >>>>
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> >>>> [2].
> >>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> >>>> <
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >
> >>>> [3].
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
> >>>>
> >>>>
> >>>> Best,
> >>>> Yang
> >>>>
> >>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Generally +1 for a native k8s HA service.
> >>>>>
> >>>>> For leader election & publish leader information, there was a
> >>>>> discussion[1]
> >>>>> pointed out that since these two actions is NOT atomic, there will be
> >>>>> always
> >>>>> edge case where a previous leader overwrite leader information, even
> >>>>> with
> >>>>> versioned write. Versioned write helps on read again if version
> >>>>> mismatches
> >>>>> so if we want version write works, information in the kv pair should
> >>>>> help the
> >>>>> contender reflects whether it is the current leader.
> >>>>>
> >>>>> The idea of writes leader information on contender node or something
> >>>>> equivalent makes sense but the details depends on how it is
> >>>>> implemented.
> >>>>> General problems are that
> >>>>>
> >>>>> 1. TM might be a bit late before it updated correct leader
> information
> >>>>> but
> >>>>> only if the leader election process is short and leadership is stable
> >>>>> at most
> >>>>> time, it won't be a serious issue.
> >>>>> 2. The process TM extract leader information might be a bit more
> >>>>> complex
> >>>>> than directly watching a fixed key.
> >>>>>
> >>>>> Atomic issue can be addressed if one leverages low APIs such as lease
> >>>>> & txn
> >>>>> but it causes more developing efforts. ConfigMap and encapsulated
> >>>>> interface,
> >>>>> thought, provides only a self-consistent mechanism which doesn't
> >>>>> promise
> >>>>> more consistency for extension.
> >>>>>
> >>>>> Best,
> >>>>> tison.
> >>>>>
> >>>>> [1]
> >>>>>
> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
> >>>>>
> >>>>>
> >>>>>
> >>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
> >>>>>
> >>>>>> For 1. I was wondering whether we can't write the leader connection
> >>>>>> information directly when trying to obtain the leadership (trying to
> >>>>>> update
> >>>>>> the leader key with one's own value)? This might be a little detail,
> >>>>>> though.
> >>>>>>
> >>>>>> 2. Alright, so we are having a similar mechanism as we have in
> >>>>>> ZooKeeper
> >>>>>> with the ephemeral lock nodes. I guess that this complicates the
> >>>>>> implementation a bit, unfortunately.
> >>>>>>
> >>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One
> could
> >>>>>> configure a different persistent storage like HDFS or S3 for storing
> >>>>>> the
> >>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
> >>>>>> benefit I
> >>>>>> see is that we avoid having to implement this multi locking
> mechanism
> >>>>>> in
> >>>>>> the ConfigMaps using the annotations because we can be sure that
> >>>>>> there is
> >>>>>> only a single leader at a time if I understood the guarantees of K8s
> >>>>>> correctly.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Till
> >>>>>>
> >>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> > Hi Till, thanks for your valuable feedback.
> >>>>>> >
> >>>>>> > 1. Yes, leader election and storing leader information will use a
> >>>>>> same
> >>>>>> > ConfigMap. When a contender successfully performs a versioned
> >>>>>> annotation
> >>>>>> > update operation to the ConfigMap, it means that it has been
> >>>>>> elected as the
> >>>>>> > leader. And it will write the leader information in the callback
> of
> >>>>>> leader
> >>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
> >>>>>> the
> >>>>>> > leader ConfigMap is wrongly updated.
> >>>>>> >
> >>>>>> > 2. The lock and release is really a valid concern. Actually in
> >>>>>> current
> >>>>>> > design, we could not guarantee that the node who tries to write
> his
> >>>>>> > ownership is the real leader. Who writes later, who is the owner.
> To
> >>>>>> > address this issue, we need to store all the owners of the key.
> >>>>>> Only when
> >>>>>> > the owner is empty, the specific key(means a checkpoint or job
> >>>>>> graph) could
> >>>>>> > be deleted. However, we may have a residual checkpoint or job
> graph
> >>>>>> when
> >>>>>> > the old JobManager crashed exceptionally and do not release the
> >>>>>> lock. To
> >>>>>> > solve this problem completely, we need a timestamp renew mechanism
> >>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help
> us
> >>>>>> to the
> >>>>>> > check the JobManager timeout and then clean up the residual keys.
> >>>>>> >
> >>>>>> > 3. Frankly speaking, I am not against with this solution. However,
> >>>>>> in my
> >>>>>> > opinion, it is more like a temporary proposal. We could use
> >>>>>> StatefulSet to
> >>>>>> > avoid leader election and leader retrieval. But I am not sure
> >>>>>> whether
> >>>>>> > TaskManager could properly handle the situation that same hostname
> >>>>>> with
> >>>>>> > different IPs, because the JobManager failed and relaunched. Also
> >>>>>> we may
> >>>>>> > still have two JobManagers running in some corner cases(e.g.
> >>>>>> kubelet is
> >>>>>> > down but the pod is running). Another concern is we have a strong
> >>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
> >>>>>> But it
> >>>>>> > is not always true especially in self-build Kubernetes cluster.
> >>>>>> Moreover,
> >>>>>> > PV provider should guarantee that each PV could only be mounted
> >>>>>> once. Since
> >>>>>> > the native HA proposal could cover all the functionality of
> >>>>>> StatefulSet
> >>>>>> > proposal, that's why I prefer the former.
> >>>>>> >
> >>>>>> >
> >>>>>> > [1].
> >>>>>> >
> >>>>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >>>>>> >
> >>>>>> > Best,
> >>>>>> > Yang
> >>>>>> >
> >>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
> >>>>>> >
> >>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
> >>>>>> our users
> >>>>>> >> will like a ZooKeeper-less HA setup.
> >>>>>> >>
> >>>>>> >> +1 for not separating the leader information and the leader
> >>>>>> election if
> >>>>>> >> possible. Maybe it is even possible that the contender writes his
> >>>>>> leader
> >>>>>> >> information directly when trying to obtain the leadership by
> >>>>>> performing a
> >>>>>> >> versioned write operation.
> >>>>>> >>
> >>>>>> >> Concerning the lock and release operation I have a question: Can
> >>>>>> there be
> >>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If
> not,
> >>>>>> how can
> >>>>>> >> we ensure that the node which writes his ownership is actually
> the
> >>>>>> leader
> >>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
> >>>>>> problem
> >>>>>> >> (we should probably change it at some point to simply use a
> >>>>>> >> transaction which checks whether the writer is still the leader)
> >>>>>> and
> >>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
> >>>>>> that
> >>>>>> >> there can be multiple owners of a given ZNode at a time. The last
> >>>>>> owner
> >>>>>> >> will then be responsible for the cleanup of the node.
> >>>>>> >>
> >>>>>> >> I see the benefit of your proposal over the stateful set proposal
> >>>>>> because
> >>>>>> >> it can support multiple standby JMs. Given the problem of locking
> >>>>>> key-value
> >>>>>> >> pairs it might be simpler to start with this approach where we
> >>>>>> only have
> >>>>>> >> single JM. This might already add a lot of benefits for our
> users.
> >>>>>> Was
> >>>>>> >> there a specific reason why you discarded this proposal (other
> than
> >>>>>> >> generality)?
> >>>>>> >>
> >>>>>> >> @Uce it would be great to hear your feedback on the proposal
> since
> >>>>>> you
> >>>>>> >> already implemented a K8s based HA service.
> >>>>>> >>
> >>>>>> >> Cheers,
> >>>>>> >> Till
> >>>>>> >>
> >>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey.wy@gmail.com
> >
> >>>>>> wrote:
> >>>>>> >>
> >>>>>> >>> Hi Xintong and Stephan,
> >>>>>> >>>
> >>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
> >>>>>> >>> comments inline.
> >>>>>> >>>
> >>>>>> >>> # Architecture -> One or two ConfigMaps
> >>>>>> >>>
> >>>>>> >>> Both of you are right. One ConfigMap will make the design and
> >>>>>> >>> implementation easier. Actually, in my POC codes,
> >>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
> >>>>>> rest
> >>>>>> >>> server component) for the leader election
> >>>>>> >>> and storage. Once a JobManager win the election, it will update
> >>>>>> the
> >>>>>> >>> ConfigMap with leader address and periodically
> >>>>>> >>> renew the lock annotation to keep as the active leader. I will
> >>>>>> update
> >>>>>> >>> the FLIP document, including the architecture diagram,
> >>>>>> >>> to avoid the misunderstanding.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > Lock and release
> >>>>>> >>>
> >>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
> >>>>>> will be
> >>>>>> >>> deleted by the ZK server automatically when
> >>>>>> >>> the client is timeout. It could happen in a bad network
> >>>>>> environment or
> >>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
> >>>>>> >>> we need to implement a similar mechanism. First, when we want to
> >>>>>> lock a
> >>>>>> >>> specific key in ConfigMap, we will put the owner identify,
> >>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
> >>>>>> annotation
> >>>>>> >>> will be cleaned up when releasing the lock. When
> >>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
> >>>>>> the
> >>>>>> >>> following conditions. If not, the delete operation could not be
> >>>>>> done.
> >>>>>> >>> * Current instance is the owner of the key.
> >>>>>> >>> * The owner annotation is empty, which means the owner has
> >>>>>> released the
> >>>>>> >>> lock.
> >>>>>> >>> * The owner annotation timed out, which usually indicate the
> >>>>>> owner died.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > HA data clean up
> >>>>>> >>>
> >>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
> >>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
> >>>>>> >>> we set owner of the flink-conf configmap, service and
> TaskManager
> >>>>>> pods
> >>>>>> >>> to JobManager Deployment. So when we want to
> >>>>>> >>> destroy a Flink cluster, we just need to delete the
> >>>>>> deployment[2]. For
> >>>>>> >>> the HA related ConfigMaps, we do not set the owner
> >>>>>> >>> so that they could be retained even though we delete the whole
> >>>>>> Flink
> >>>>>> >>> cluster.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> [1].
> >>>>>> >>>
> >>>>>>
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> >>>>>> >>> [2].
> >>>>>> >>>
> >>>>>>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Best,
> >>>>>> >>> Yang
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
> >>>>>> >>>
> >>>>>> >>>> This is a very cool feature proposal.
> >>>>>> >>>>
> >>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
> >>>>>> overly
> >>>>>> >>>> complicated to have the Leader RPC address in a different node
> >>>>>> than the
> >>>>>> >>>> LeaderLock. There is extra code needed to make sure these
> >>>>>> converge and the
> >>>>>> >>>> can be temporarily out of sync.
> >>>>>> >>>>
> >>>>>> >>>> A much easier design would be to have the RPC address as
> payload
> >>>>>> in the
> >>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
> >>>>>> token is
> >>>>>> >>>> stored as payload of the lock.
> >>>>>> >>>> I think for the design above it would mean having a single
> >>>>>> ConfigMap
> >>>>>> >>>> for both leader lock and leader RPC address discovery.
> >>>>>> >>>>
> >>>>>> >>>> This probably serves as a good design principle in general -
> not
> >>>>>> divide
> >>>>>> >>>> information that is updated together over different resources.
> >>>>>> >>>>
> >>>>>> >>>> Best,
> >>>>>> >>>> Stephan
> >>>>>> >>>>
> >>>>>> >>>>
> >>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
> >>>>>> tonysong820@gmail.com>
> >>>>>> >>>> wrote:
> >>>>>> >>>>
> >>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
> >>>>>> >>>>>
> >>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging
> Kubernetes's
> >>>>>> >>>>> buildtin ConfigMap for Flink's HA services should
> significantly
> >>>>>> reduce the
> >>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
> >>>>>> think this is an
> >>>>>> >>>>> attractive feature for users.
> >>>>>> >>>>>
> >>>>>> >>>>> Concerning the proposed design, I have some questions. Might
> >>>>>> not be
> >>>>>> >>>>> problems, just trying to understand.
> >>>>>> >>>>>
> >>>>>> >>>>> ## Architecture
> >>>>>> >>>>>
> >>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
> >>>>>> contending
> >>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
> >>>>>> ConfigMaps are
> >>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
> >>>>>> becoming leader
> >>>>>> >>>>> (lock for contending leader updated), but still gets the old
> >>>>>> leader's
> >>>>>> >>>>> address when trying to read `leader RPC address`?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > Lock and release
> >>>>>> >>>>>
> >>>>>> >>>>> It seems to me that the owner needs to explicitly release the
> >>>>>> lock so
> >>>>>> >>>>> that other peers can write/remove the stored object. What if
> >>>>>> the previous
> >>>>>> >>>>> owner failed to release the lock (e.g., dead before
> releasing)?
> >>>>>> Would there
> >>>>>> >>>>> be any problem?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > HA data clean up
> >>>>>> >>>>>
> >>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
> >>>>>> <ClusterID>`,
> >>>>>> >>>>> how are the HA dada retained?
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> Thank you~
> >>>>>> >>>>>
> >>>>>> >>>>> Xintong Song
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
> >>>>>> danrtsey.wy@gmail.com>
> >>>>>> >>>>> wrote:
> >>>>>> >>>>>
> >>>>>> >>>>>> Hi devs and users,
> >>>>>> >>>>>>
> >>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
> >>>>>> will
> >>>>>> >>>>>> introduce
> >>>>>> >>>>>> a new native high availability service for Kubernetes.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
> >>>>>> widely
> >>>>>> >>>>>> used
> >>>>>> >>>>>> in production environments. It could be integrated in
> >>>>>> standalone
> >>>>>> >>>>>> cluster,
> >>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
> >>>>>> in K8s
> >>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
> >>>>>> cluster.
> >>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
> >>>>>> >>>>>> election[2]
> >>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
> >>>>>> leverage these
> >>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
> >>>>>> more
> >>>>>> >>>>>> convenient.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
> >>>>>> the new
> >>>>>> >>>>>> introduced KubernetesHaService.
> >>>>>> >>>>>>
> >>>>>> >>>>>> [1].
> >>>>>> >>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> >>>>>> >>>>>> [2].
> >>>>>> >>>>>>
> >>>>>>
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> >>>>>> >>>>>> [3].
> >>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
> >>>>>> >>>>>>
> >>>>>> >>>>>> Looking forward to your feedback.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Best,
> >>>>>> >>>>>> Yang
> >>>>>> >>>>>>
> >>>>>> >>>>>
> >>>>>>
> >>>>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
3. Make sense to me. And we could add a new HA solution "StatefulSet + PV +
FileSystem"
at any time if we need in the future.

Since there are no more open questions, I will start the voting now.
Thanks all for your comments and feedback. Feel feel to continue the
discussion if you get
other concerns.


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年10月1日周四 下午4:52写道:

> 3. We could avoid force deletions from within Flink. If the user does it,
> then we don't give guarantees.
>
> I am fine with your current proposal. +1 for moving forward with it.
>
> Cheers,
> Till
>
> On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:
>
> > 2. Yes. This is exactly what I mean. Storing the HA information relevant
> > to a specific component in a single ConfigMap and ensuring that
> “Get(check
> > the leader)-and-Update(write back to the ConfigMap)” is a transactional
> > operation. Since we only store the job graph stateHandler(not the real
> > data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> > ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> > could we have more than 1000 Flink jobs in a Flink session cluster.
> >
> > 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> > could provide at most one semantics if no manually force-deletion
> > happened[1]. Based on the previous discussion, we have successfully
> avoided
> > the "lock-and-release" in the implementation. So I still insist on using
> > the current Deployment.
> >
> >
> > [1].
> >
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
> >
> >
> > Best,
> > Yang
> >
> > Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
> >
> >> Thanks for the clarifications Yang Wang.
> >>
> >> 2. Keeping the HA information relevant for a component (Dispatcher,
> >> JobManager, ResourceManager) in a single ConfigMap sounds good. We
> should
> >> check that we don't exceed the 1 MB size limit with this approach
> though.
> >> The Dispatcher's ConfigMap would then contain the current leader, the
> >> running jobs and the pointers to the persisted JobGraphs. The
> JobManager's
> >> ConfigMap would then contain the current leader, the pointers to the
> >> checkpoints and the checkpoint ID counter, for example.
> >>
> >> 3. Ah ok, I somehow thought that K8s would give us stronger
> >> guarantees than Yarn in this regard. That's a pity.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
> >>
> >>> Thanks for your explanation. It would be fine if only checking
> >>> leadership & actually write information is atomic.
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
> >>>
> >>>> Thanks till and tison for your comments.
> >>>>
> >>>> @Till Rohrmann <tr...@apache.org>
> >>>> 1. I am afraid we could not do this if we are going to use fabric8
> >>>> Kubernetes client SDK for the leader election. The official
> Kubernetes Java
> >>>> client[1] also could not support it. Unless we implement a new
> >>>> LeaderElector in Flink based on the very basic Kubernetes API. But it
> seems
> >>>> that we could gain too much from this.
> >>>>
> >>>> 2. Yes, the implementation will be a little complicated if we want to
> >>>> completely eliminate the residual job graphs or checkpoints. Inspired
> by
> >>>> your suggestion, another different solution has come into my mind. We
> could
> >>>> use a same ConfigMap storing the JobManager leader, job graph,
> >>>> checkpoint-counter, checkpoint. Each job will have a specific
> ConfigMap for
> >>>> the HA meta storage. Then it will be easier to guarantee that only the
> >>>> leader could write the ConfigMap in a transactional operation. Since
> >>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> >>>> transactional operation.
> >>>>
> >>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
> >>>> However, we still have the chances that two JobManager are running and
> >>>> trying to get/delete a key in the same ConfigMap concurrently.
> Imagine that
> >>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
> >>>> could not be deleted. A new JobManager pod will be launched. We are
> just in
> >>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
> >>>> benefit is we do not need to implement a leader election/retrieval
> service.
> >>>>
> >>>> @tison
> >>>> Actually, I do not think we will have such issue in the Kubernetes HA
> >>>> service. In the Kubernetes LeaderElector[2], we have the leader
> information
> >>>> stored on the annotation of leader ConfigMap. So it would not happen
> the
> >>>> old leader could wrongly override the leader information. Once a
> JobManager
> >>>> want to write his leader information to the ConfigMap, it will check
> >>>> whether it is the leader now. If not, anything will happen. Moreover,
> the
> >>>> Kubernetes Resource Version[3] ensures that no one else has snuck in
> and
> >>>> written a different update while the client was in the process of
> >>>> performing its update.
> >>>>
> >>>>
> >>>> [1].
> >>>>
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> >>>> [2].
> >>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> >>>> <
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >
> >>>> [3].
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
> >>>>
> >>>>
> >>>> Best,
> >>>> Yang
> >>>>
> >>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Generally +1 for a native k8s HA service.
> >>>>>
> >>>>> For leader election & publish leader information, there was a
> >>>>> discussion[1]
> >>>>> pointed out that since these two actions is NOT atomic, there will be
> >>>>> always
> >>>>> edge case where a previous leader overwrite leader information, even
> >>>>> with
> >>>>> versioned write. Versioned write helps on read again if version
> >>>>> mismatches
> >>>>> so if we want version write works, information in the kv pair should
> >>>>> help the
> >>>>> contender reflects whether it is the current leader.
> >>>>>
> >>>>> The idea of writes leader information on contender node or something
> >>>>> equivalent makes sense but the details depends on how it is
> >>>>> implemented.
> >>>>> General problems are that
> >>>>>
> >>>>> 1. TM might be a bit late before it updated correct leader
> information
> >>>>> but
> >>>>> only if the leader election process is short and leadership is stable
> >>>>> at most
> >>>>> time, it won't be a serious issue.
> >>>>> 2. The process TM extract leader information might be a bit more
> >>>>> complex
> >>>>> than directly watching a fixed key.
> >>>>>
> >>>>> Atomic issue can be addressed if one leverages low APIs such as lease
> >>>>> & txn
> >>>>> but it causes more developing efforts. ConfigMap and encapsulated
> >>>>> interface,
> >>>>> thought, provides only a self-consistent mechanism which doesn't
> >>>>> promise
> >>>>> more consistency for extension.
> >>>>>
> >>>>> Best,
> >>>>> tison.
> >>>>>
> >>>>> [1]
> >>>>>
> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
> >>>>>
> >>>>>
> >>>>>
> >>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
> >>>>>
> >>>>>> For 1. I was wondering whether we can't write the leader connection
> >>>>>> information directly when trying to obtain the leadership (trying to
> >>>>>> update
> >>>>>> the leader key with one's own value)? This might be a little detail,
> >>>>>> though.
> >>>>>>
> >>>>>> 2. Alright, so we are having a similar mechanism as we have in
> >>>>>> ZooKeeper
> >>>>>> with the ephemeral lock nodes. I guess that this complicates the
> >>>>>> implementation a bit, unfortunately.
> >>>>>>
> >>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One
> could
> >>>>>> configure a different persistent storage like HDFS or S3 for storing
> >>>>>> the
> >>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
> >>>>>> benefit I
> >>>>>> see is that we avoid having to implement this multi locking
> mechanism
> >>>>>> in
> >>>>>> the ConfigMaps using the annotations because we can be sure that
> >>>>>> there is
> >>>>>> only a single leader at a time if I understood the guarantees of K8s
> >>>>>> correctly.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Till
> >>>>>>
> >>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> > Hi Till, thanks for your valuable feedback.
> >>>>>> >
> >>>>>> > 1. Yes, leader election and storing leader information will use a
> >>>>>> same
> >>>>>> > ConfigMap. When a contender successfully performs a versioned
> >>>>>> annotation
> >>>>>> > update operation to the ConfigMap, it means that it has been
> >>>>>> elected as the
> >>>>>> > leader. And it will write the leader information in the callback
> of
> >>>>>> leader
> >>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
> >>>>>> the
> >>>>>> > leader ConfigMap is wrongly updated.
> >>>>>> >
> >>>>>> > 2. The lock and release is really a valid concern. Actually in
> >>>>>> current
> >>>>>> > design, we could not guarantee that the node who tries to write
> his
> >>>>>> > ownership is the real leader. Who writes later, who is the owner.
> To
> >>>>>> > address this issue, we need to store all the owners of the key.
> >>>>>> Only when
> >>>>>> > the owner is empty, the specific key(means a checkpoint or job
> >>>>>> graph) could
> >>>>>> > be deleted. However, we may have a residual checkpoint or job
> graph
> >>>>>> when
> >>>>>> > the old JobManager crashed exceptionally and do not release the
> >>>>>> lock. To
> >>>>>> > solve this problem completely, we need a timestamp renew mechanism
> >>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help
> us
> >>>>>> to the
> >>>>>> > check the JobManager timeout and then clean up the residual keys.
> >>>>>> >
> >>>>>> > 3. Frankly speaking, I am not against with this solution. However,
> >>>>>> in my
> >>>>>> > opinion, it is more like a temporary proposal. We could use
> >>>>>> StatefulSet to
> >>>>>> > avoid leader election and leader retrieval. But I am not sure
> >>>>>> whether
> >>>>>> > TaskManager could properly handle the situation that same hostname
> >>>>>> with
> >>>>>> > different IPs, because the JobManager failed and relaunched. Also
> >>>>>> we may
> >>>>>> > still have two JobManagers running in some corner cases(e.g.
> >>>>>> kubelet is
> >>>>>> > down but the pod is running). Another concern is we have a strong
> >>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
> >>>>>> But it
> >>>>>> > is not always true especially in self-build Kubernetes cluster.
> >>>>>> Moreover,
> >>>>>> > PV provider should guarantee that each PV could only be mounted
> >>>>>> once. Since
> >>>>>> > the native HA proposal could cover all the functionality of
> >>>>>> StatefulSet
> >>>>>> > proposal, that's why I prefer the former.
> >>>>>> >
> >>>>>> >
> >>>>>> > [1].
> >>>>>> >
> >>>>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >>>>>> >
> >>>>>> > Best,
> >>>>>> > Yang
> >>>>>> >
> >>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
> >>>>>> >
> >>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
> >>>>>> our users
> >>>>>> >> will like a ZooKeeper-less HA setup.
> >>>>>> >>
> >>>>>> >> +1 for not separating the leader information and the leader
> >>>>>> election if
> >>>>>> >> possible. Maybe it is even possible that the contender writes his
> >>>>>> leader
> >>>>>> >> information directly when trying to obtain the leadership by
> >>>>>> performing a
> >>>>>> >> versioned write operation.
> >>>>>> >>
> >>>>>> >> Concerning the lock and release operation I have a question: Can
> >>>>>> there be
> >>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If
> not,
> >>>>>> how can
> >>>>>> >> we ensure that the node which writes his ownership is actually
> the
> >>>>>> leader
> >>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
> >>>>>> problem
> >>>>>> >> (we should probably change it at some point to simply use a
> >>>>>> >> transaction which checks whether the writer is still the leader)
> >>>>>> and
> >>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
> >>>>>> that
> >>>>>> >> there can be multiple owners of a given ZNode at a time. The last
> >>>>>> owner
> >>>>>> >> will then be responsible for the cleanup of the node.
> >>>>>> >>
> >>>>>> >> I see the benefit of your proposal over the stateful set proposal
> >>>>>> because
> >>>>>> >> it can support multiple standby JMs. Given the problem of locking
> >>>>>> key-value
> >>>>>> >> pairs it might be simpler to start with this approach where we
> >>>>>> only have
> >>>>>> >> single JM. This might already add a lot of benefits for our
> users.
> >>>>>> Was
> >>>>>> >> there a specific reason why you discarded this proposal (other
> than
> >>>>>> >> generality)?
> >>>>>> >>
> >>>>>> >> @Uce it would be great to hear your feedback on the proposal
> since
> >>>>>> you
> >>>>>> >> already implemented a K8s based HA service.
> >>>>>> >>
> >>>>>> >> Cheers,
> >>>>>> >> Till
> >>>>>> >>
> >>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey.wy@gmail.com
> >
> >>>>>> wrote:
> >>>>>> >>
> >>>>>> >>> Hi Xintong and Stephan,
> >>>>>> >>>
> >>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
> >>>>>> >>> comments inline.
> >>>>>> >>>
> >>>>>> >>> # Architecture -> One or two ConfigMaps
> >>>>>> >>>
> >>>>>> >>> Both of you are right. One ConfigMap will make the design and
> >>>>>> >>> implementation easier. Actually, in my POC codes,
> >>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
> >>>>>> rest
> >>>>>> >>> server component) for the leader election
> >>>>>> >>> and storage. Once a JobManager win the election, it will update
> >>>>>> the
> >>>>>> >>> ConfigMap with leader address and periodically
> >>>>>> >>> renew the lock annotation to keep as the active leader. I will
> >>>>>> update
> >>>>>> >>> the FLIP document, including the architecture diagram,
> >>>>>> >>> to avoid the misunderstanding.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > Lock and release
> >>>>>> >>>
> >>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
> >>>>>> will be
> >>>>>> >>> deleted by the ZK server automatically when
> >>>>>> >>> the client is timeout. It could happen in a bad network
> >>>>>> environment or
> >>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
> >>>>>> >>> we need to implement a similar mechanism. First, when we want to
> >>>>>> lock a
> >>>>>> >>> specific key in ConfigMap, we will put the owner identify,
> >>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
> >>>>>> annotation
> >>>>>> >>> will be cleaned up when releasing the lock. When
> >>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
> >>>>>> the
> >>>>>> >>> following conditions. If not, the delete operation could not be
> >>>>>> done.
> >>>>>> >>> * Current instance is the owner of the key.
> >>>>>> >>> * The owner annotation is empty, which means the owner has
> >>>>>> released the
> >>>>>> >>> lock.
> >>>>>> >>> * The owner annotation timed out, which usually indicate the
> >>>>>> owner died.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > HA data clean up
> >>>>>> >>>
> >>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
> >>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
> >>>>>> >>> we set owner of the flink-conf configmap, service and
> TaskManager
> >>>>>> pods
> >>>>>> >>> to JobManager Deployment. So when we want to
> >>>>>> >>> destroy a Flink cluster, we just need to delete the
> >>>>>> deployment[2]. For
> >>>>>> >>> the HA related ConfigMaps, we do not set the owner
> >>>>>> >>> so that they could be retained even though we delete the whole
> >>>>>> Flink
> >>>>>> >>> cluster.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> [1].
> >>>>>> >>>
> >>>>>>
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> >>>>>> >>> [2].
> >>>>>> >>>
> >>>>>>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Best,
> >>>>>> >>> Yang
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
> >>>>>> >>>
> >>>>>> >>>> This is a very cool feature proposal.
> >>>>>> >>>>
> >>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
> >>>>>> overly
> >>>>>> >>>> complicated to have the Leader RPC address in a different node
> >>>>>> than the
> >>>>>> >>>> LeaderLock. There is extra code needed to make sure these
> >>>>>> converge and the
> >>>>>> >>>> can be temporarily out of sync.
> >>>>>> >>>>
> >>>>>> >>>> A much easier design would be to have the RPC address as
> payload
> >>>>>> in the
> >>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
> >>>>>> token is
> >>>>>> >>>> stored as payload of the lock.
> >>>>>> >>>> I think for the design above it would mean having a single
> >>>>>> ConfigMap
> >>>>>> >>>> for both leader lock and leader RPC address discovery.
> >>>>>> >>>>
> >>>>>> >>>> This probably serves as a good design principle in general -
> not
> >>>>>> divide
> >>>>>> >>>> information that is updated together over different resources.
> >>>>>> >>>>
> >>>>>> >>>> Best,
> >>>>>> >>>> Stephan
> >>>>>> >>>>
> >>>>>> >>>>
> >>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
> >>>>>> tonysong820@gmail.com>
> >>>>>> >>>> wrote:
> >>>>>> >>>>
> >>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
> >>>>>> >>>>>
> >>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging
> Kubernetes's
> >>>>>> >>>>> buildtin ConfigMap for Flink's HA services should
> significantly
> >>>>>> reduce the
> >>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
> >>>>>> think this is an
> >>>>>> >>>>> attractive feature for users.
> >>>>>> >>>>>
> >>>>>> >>>>> Concerning the proposed design, I have some questions. Might
> >>>>>> not be
> >>>>>> >>>>> problems, just trying to understand.
> >>>>>> >>>>>
> >>>>>> >>>>> ## Architecture
> >>>>>> >>>>>
> >>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
> >>>>>> contending
> >>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
> >>>>>> ConfigMaps are
> >>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
> >>>>>> becoming leader
> >>>>>> >>>>> (lock for contending leader updated), but still gets the old
> >>>>>> leader's
> >>>>>> >>>>> address when trying to read `leader RPC address`?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > Lock and release
> >>>>>> >>>>>
> >>>>>> >>>>> It seems to me that the owner needs to explicitly release the
> >>>>>> lock so
> >>>>>> >>>>> that other peers can write/remove the stored object. What if
> >>>>>> the previous
> >>>>>> >>>>> owner failed to release the lock (e.g., dead before
> releasing)?
> >>>>>> Would there
> >>>>>> >>>>> be any problem?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > HA data clean up
> >>>>>> >>>>>
> >>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
> >>>>>> <ClusterID>`,
> >>>>>> >>>>> how are the HA dada retained?
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> Thank you~
> >>>>>> >>>>>
> >>>>>> >>>>> Xintong Song
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
> >>>>>> danrtsey.wy@gmail.com>
> >>>>>> >>>>> wrote:
> >>>>>> >>>>>
> >>>>>> >>>>>> Hi devs and users,
> >>>>>> >>>>>>
> >>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
> >>>>>> will
> >>>>>> >>>>>> introduce
> >>>>>> >>>>>> a new native high availability service for Kubernetes.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
> >>>>>> widely
> >>>>>> >>>>>> used
> >>>>>> >>>>>> in production environments. It could be integrated in
> >>>>>> standalone
> >>>>>> >>>>>> cluster,
> >>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
> >>>>>> in K8s
> >>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
> >>>>>> cluster.
> >>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
> >>>>>> >>>>>> election[2]
> >>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
> >>>>>> leverage these
> >>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
> >>>>>> more
> >>>>>> >>>>>> convenient.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
> >>>>>> the new
> >>>>>> >>>>>> introduced KubernetesHaService.
> >>>>>> >>>>>>
> >>>>>> >>>>>> [1].
> >>>>>> >>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> >>>>>> >>>>>> [2].
> >>>>>> >>>>>>
> >>>>>>
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> >>>>>> >>>>>> [3].
> >>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
> >>>>>> >>>>>>
> >>>>>> >>>>>> Looking forward to your feedback.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Best,
> >>>>>> >>>>>> Yang
> >>>>>> >>>>>>
> >>>>>> >>>>>
> >>>>>>
> >>>>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
3. We could avoid force deletions from within Flink. If the user does it,
then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:

> 2. Yes. This is exactly what I mean. Storing the HA information relevant
> to a specific component in a single ConfigMap and ensuring that “Get(check
> the leader)-and-Update(write back to the ConfigMap)” is a transactional
> operation. Since we only store the job graph stateHandler(not the real
> data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> could we have more than 1000 Flink jobs in a Flink session cluster.
>
> 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> could provide at most one semantics if no manually force-deletion
> happened[1]. Based on the previous discussion, we have successfully avoided
> the "lock-and-release" in the implementation. So I still insist on using
> the current Deployment.
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
>
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
>
>> Thanks for the clarifications Yang Wang.
>>
>> 2. Keeping the HA information relevant for a component (Dispatcher,
>> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
>> check that we don't exceed the 1 MB size limit with this approach though.
>> The Dispatcher's ConfigMap would then contain the current leader, the
>> running jobs and the pointers to the persisted JobGraphs. The JobManager's
>> ConfigMap would then contain the current leader, the pointers to the
>> checkpoints and the checkpoint ID counter, for example.
>>
>> 3. Ah ok, I somehow thought that K8s would give us stronger
>> guarantees than Yarn in this regard. That's a pity.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
>>
>>> Thanks for your explanation. It would be fine if only checking
>>> leadership & actually write information is atomic.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>>
>>>> Thanks till and tison for your comments.
>>>>
>>>> @Till Rohrmann <tr...@apache.org>
>>>> 1. I am afraid we could not do this if we are going to use fabric8
>>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>>> client[1] also could not support it. Unless we implement a new
>>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>>> that we could gain too much from this.
>>>>
>>>> 2. Yes, the implementation will be a little complicated if we want to
>>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>>> your suggestion, another different solution has come into my mind. We could
>>>> use a same ConfigMap storing the JobManager leader, job graph,
>>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>>> the HA meta storage. Then it will be easier to guarantee that only the
>>>> leader could write the ConfigMap in a transactional operation. Since
>>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>>> transactional operation.
>>>>
>>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>>> However, we still have the chances that two JobManager are running and
>>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>>> benefit is we do not need to implement a leader election/retrieval service.
>>>>
>>>> @tison
>>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>>> old leader could wrongly override the leader information. Once a JobManager
>>>> want to write his leader information to the ConfigMap, it will check
>>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>>> written a different update while the client was in the process of
>>>> performing its update.
>>>>
>>>>
>>>> [1].
>>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>>> [2].
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>>> [3].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Generally +1 for a native k8s HA service.
>>>>>
>>>>> For leader election & publish leader information, there was a
>>>>> discussion[1]
>>>>> pointed out that since these two actions is NOT atomic, there will be
>>>>> always
>>>>> edge case where a previous leader overwrite leader information, even
>>>>> with
>>>>> versioned write. Versioned write helps on read again if version
>>>>> mismatches
>>>>> so if we want version write works, information in the kv pair should
>>>>> help the
>>>>> contender reflects whether it is the current leader.
>>>>>
>>>>> The idea of writes leader information on contender node or something
>>>>> equivalent makes sense but the details depends on how it is
>>>>> implemented.
>>>>> General problems are that
>>>>>
>>>>> 1. TM might be a bit late before it updated correct leader information
>>>>> but
>>>>> only if the leader election process is short and leadership is stable
>>>>> at most
>>>>> time, it won't be a serious issue.
>>>>> 2. The process TM extract leader information might be a bit more
>>>>> complex
>>>>> than directly watching a fixed key.
>>>>>
>>>>> Atomic issue can be addressed if one leverages low APIs such as lease
>>>>> & txn
>>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>>> interface,
>>>>> thought, provides only a self-consistent mechanism which doesn't
>>>>> promise
>>>>> more consistency for extension.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>>
>>>>>
>>>>>
>>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>>
>>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>>> information directly when trying to obtain the leadership (trying to
>>>>>> update
>>>>>> the leader key with one's own value)? This might be a little detail,
>>>>>> though.
>>>>>>
>>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>>> ZooKeeper
>>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>>> implementation a bit, unfortunately.
>>>>>>
>>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>>> the
>>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>>> benefit I
>>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>>> in
>>>>>> the ConfigMaps using the annotations because we can be sure that
>>>>>> there is
>>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>>> correctly.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Till, thanks for your valuable feedback.
>>>>>> >
>>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>>> same
>>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>>> annotation
>>>>>> > update operation to the ConfigMap, it means that it has been
>>>>>> elected as the
>>>>>> > leader. And it will write the leader information in the callback of
>>>>>> leader
>>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
>>>>>> the
>>>>>> > leader ConfigMap is wrongly updated.
>>>>>> >
>>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>>> current
>>>>>> > design, we could not guarantee that the node who tries to write his
>>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>>> > address this issue, we need to store all the owners of the key.
>>>>>> Only when
>>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>>> graph) could
>>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>>> when
>>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>>> lock. To
>>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>>> to the
>>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>>> >
>>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>>> in my
>>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>>> StatefulSet to
>>>>>> > avoid leader election and leader retrieval. But I am not sure
>>>>>> whether
>>>>>> > TaskManager could properly handle the situation that same hostname
>>>>>> with
>>>>>> > different IPs, because the JobManager failed and relaunched. Also
>>>>>> we may
>>>>>> > still have two JobManagers running in some corner cases(e.g.
>>>>>> kubelet is
>>>>>> > down but the pod is running). Another concern is we have a strong
>>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>>> But it
>>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>>> Moreover,
>>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>>> once. Since
>>>>>> > the native HA proposal could cover all the functionality of
>>>>>> StatefulSet
>>>>>> > proposal, that's why I prefer the former.
>>>>>> >
>>>>>> >
>>>>>> > [1].
>>>>>> >
>>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>>> >
>>>>>> > Best,
>>>>>> > Yang
>>>>>> >
>>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>>> >
>>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
>>>>>> our users
>>>>>> >> will like a ZooKeeper-less HA setup.
>>>>>> >>
>>>>>> >> +1 for not separating the leader information and the leader
>>>>>> election if
>>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>>> leader
>>>>>> >> information directly when trying to obtain the leadership by
>>>>>> performing a
>>>>>> >> versioned write operation.
>>>>>> >>
>>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>>> there be
>>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>>> how can
>>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>>> leader
>>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>>> problem
>>>>>> >> (we should probably change it at some point to simply use a
>>>>>> >> transaction which checks whether the writer is still the leader)
>>>>>> and
>>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>>> that
>>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>>> owner
>>>>>> >> will then be responsible for the cleanup of the node.
>>>>>> >>
>>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>>> because
>>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>>> key-value
>>>>>> >> pairs it might be simpler to start with this approach where we
>>>>>> only have
>>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>>> Was
>>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>>> >> generality)?
>>>>>> >>
>>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>>> you
>>>>>> >> already implemented a K8s based HA service.
>>>>>> >>
>>>>>> >> Cheers,
>>>>>> >> Till
>>>>>> >>
>>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>> Hi Xintong and Stephan,
>>>>>> >>>
>>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>>> >>> comments inline.
>>>>>> >>>
>>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>>> >>>
>>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>>> rest
>>>>>> >>> server component) for the leader election
>>>>>> >>> and storage. Once a JobManager win the election, it will update
>>>>>> the
>>>>>> >>> ConfigMap with leader address and periodically
>>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>>> update
>>>>>> >>> the FLIP document, including the architecture diagram,
>>>>>> >>> to avoid the misunderstanding.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > Lock and release
>>>>>> >>>
>>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>>> will be
>>>>>> >>> deleted by the ZK server automatically when
>>>>>> >>> the client is timeout. It could happen in a bad network
>>>>>> environment or
>>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>>> lock a
>>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>>> annotation
>>>>>> >>> will be cleaned up when releasing the lock. When
>>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
>>>>>> the
>>>>>> >>> following conditions. If not, the delete operation could not be
>>>>>> done.
>>>>>> >>> * Current instance is the owner of the key.
>>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>>> released the
>>>>>> >>> lock.
>>>>>> >>> * The owner annotation timed out, which usually indicate the
>>>>>> owner died.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > HA data clean up
>>>>>> >>>
>>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>>> pods
>>>>>> >>> to JobManager Deployment. So when we want to
>>>>>> >>> destroy a Flink cluster, we just need to delete the
>>>>>> deployment[2]. For
>>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>>> >>> so that they could be retained even though we delete the whole
>>>>>> Flink
>>>>>> >>> cluster.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> [1].
>>>>>> >>>
>>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>>> >>> [2].
>>>>>> >>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Best,
>>>>>> >>> Yang
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>>> >>>
>>>>>> >>>> This is a very cool feature proposal.
>>>>>> >>>>
>>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>>> overly
>>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>>> than the
>>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>>> converge and the
>>>>>> >>>> can be temporarily out of sync.
>>>>>> >>>>
>>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>>> in the
>>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>>> token is
>>>>>> >>>> stored as payload of the lock.
>>>>>> >>>> I think for the design above it would mean having a single
>>>>>> ConfigMap
>>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>>> >>>>
>>>>>> >>>> This probably serves as a good design principle in general - not
>>>>>> divide
>>>>>> >>>> information that is updated together over different resources.
>>>>>> >>>>
>>>>>> >>>> Best,
>>>>>> >>>> Stephan
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>>> tonysong820@gmail.com>
>>>>>> >>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>>> >>>>>
>>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>>> reduce the
>>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
>>>>>> think this is an
>>>>>> >>>>> attractive feature for users.
>>>>>> >>>>>
>>>>>> >>>>> Concerning the proposed design, I have some questions. Might
>>>>>> not be
>>>>>> >>>>> problems, just trying to understand.
>>>>>> >>>>>
>>>>>> >>>>> ## Architecture
>>>>>> >>>>>
>>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>>> contending
>>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>>> ConfigMaps are
>>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>>> becoming leader
>>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>>> leader's
>>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > Lock and release
>>>>>> >>>>>
>>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>>> lock so
>>>>>> >>>>> that other peers can write/remove the stored object. What if
>>>>>> the previous
>>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>>> Would there
>>>>>> >>>>> be any problem?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > HA data clean up
>>>>>> >>>>>
>>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>>> <ClusterID>`,
>>>>>> >>>>> how are the HA dada retained?
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> Thank you~
>>>>>> >>>>>
>>>>>> >>>>> Xintong Song
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>>> danrtsey.wy@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>>> Hi devs and users,
>>>>>> >>>>>>
>>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>>> will
>>>>>> >>>>>> introduce
>>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>>> >>>>>>
>>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>>> widely
>>>>>> >>>>>> used
>>>>>> >>>>>> in production environments. It could be integrated in
>>>>>> standalone
>>>>>> >>>>>> cluster,
>>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>>> in K8s
>>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>>> cluster.
>>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> >>>>>> election[2]
>>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
>>>>>> leverage these
>>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>>> more
>>>>>> >>>>>> convenient.
>>>>>> >>>>>>
>>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>>> the new
>>>>>> >>>>>> introduced KubernetesHaService.
>>>>>> >>>>>>
>>>>>> >>>>>> [1].
>>>>>> >>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> >>>>>> [2].
>>>>>> >>>>>>
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> >>>>>> [3].
>>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>> >>>>>>
>>>>>> >>>>>> Looking forward to your feedback.
>>>>>> >>>>>>
>>>>>> >>>>>> Best,
>>>>>> >>>>>> Yang
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>>
>>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
3. We could avoid force deletions from within Flink. If the user does it,
then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:

> 2. Yes. This is exactly what I mean. Storing the HA information relevant
> to a specific component in a single ConfigMap and ensuring that “Get(check
> the leader)-and-Update(write back to the ConfigMap)” is a transactional
> operation. Since we only store the job graph stateHandler(not the real
> data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> could we have more than 1000 Flink jobs in a Flink session cluster.
>
> 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> could provide at most one semantics if no manually force-deletion
> happened[1]. Based on the previous discussion, we have successfully avoided
> the "lock-and-release" in the implementation. So I still insist on using
> the current Deployment.
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
>
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
>
>> Thanks for the clarifications Yang Wang.
>>
>> 2. Keeping the HA information relevant for a component (Dispatcher,
>> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
>> check that we don't exceed the 1 MB size limit with this approach though.
>> The Dispatcher's ConfigMap would then contain the current leader, the
>> running jobs and the pointers to the persisted JobGraphs. The JobManager's
>> ConfigMap would then contain the current leader, the pointers to the
>> checkpoints and the checkpoint ID counter, for example.
>>
>> 3. Ah ok, I somehow thought that K8s would give us stronger
>> guarantees than Yarn in this regard. That's a pity.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
>>
>>> Thanks for your explanation. It would be fine if only checking
>>> leadership & actually write information is atomic.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>>
>>>> Thanks till and tison for your comments.
>>>>
>>>> @Till Rohrmann <tr...@apache.org>
>>>> 1. I am afraid we could not do this if we are going to use fabric8
>>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>>> client[1] also could not support it. Unless we implement a new
>>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>>> that we could gain too much from this.
>>>>
>>>> 2. Yes, the implementation will be a little complicated if we want to
>>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>>> your suggestion, another different solution has come into my mind. We could
>>>> use a same ConfigMap storing the JobManager leader, job graph,
>>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>>> the HA meta storage. Then it will be easier to guarantee that only the
>>>> leader could write the ConfigMap in a transactional operation. Since
>>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>>> transactional operation.
>>>>
>>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>>> However, we still have the chances that two JobManager are running and
>>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>>> benefit is we do not need to implement a leader election/retrieval service.
>>>>
>>>> @tison
>>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>>> old leader could wrongly override the leader information. Once a JobManager
>>>> want to write his leader information to the ConfigMap, it will check
>>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>>> written a different update while the client was in the process of
>>>> performing its update.
>>>>
>>>>
>>>> [1].
>>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>>> [2].
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>>> [3].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Generally +1 for a native k8s HA service.
>>>>>
>>>>> For leader election & publish leader information, there was a
>>>>> discussion[1]
>>>>> pointed out that since these two actions is NOT atomic, there will be
>>>>> always
>>>>> edge case where a previous leader overwrite leader information, even
>>>>> with
>>>>> versioned write. Versioned write helps on read again if version
>>>>> mismatches
>>>>> so if we want version write works, information in the kv pair should
>>>>> help the
>>>>> contender reflects whether it is the current leader.
>>>>>
>>>>> The idea of writes leader information on contender node or something
>>>>> equivalent makes sense but the details depends on how it is
>>>>> implemented.
>>>>> General problems are that
>>>>>
>>>>> 1. TM might be a bit late before it updated correct leader information
>>>>> but
>>>>> only if the leader election process is short and leadership is stable
>>>>> at most
>>>>> time, it won't be a serious issue.
>>>>> 2. The process TM extract leader information might be a bit more
>>>>> complex
>>>>> than directly watching a fixed key.
>>>>>
>>>>> Atomic issue can be addressed if one leverages low APIs such as lease
>>>>> & txn
>>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>>> interface,
>>>>> thought, provides only a self-consistent mechanism which doesn't
>>>>> promise
>>>>> more consistency for extension.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>>
>>>>>
>>>>>
>>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>>
>>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>>> information directly when trying to obtain the leadership (trying to
>>>>>> update
>>>>>> the leader key with one's own value)? This might be a little detail,
>>>>>> though.
>>>>>>
>>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>>> ZooKeeper
>>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>>> implementation a bit, unfortunately.
>>>>>>
>>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>>> the
>>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>>> benefit I
>>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>>> in
>>>>>> the ConfigMaps using the annotations because we can be sure that
>>>>>> there is
>>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>>> correctly.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Till, thanks for your valuable feedback.
>>>>>> >
>>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>>> same
>>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>>> annotation
>>>>>> > update operation to the ConfigMap, it means that it has been
>>>>>> elected as the
>>>>>> > leader. And it will write the leader information in the callback of
>>>>>> leader
>>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
>>>>>> the
>>>>>> > leader ConfigMap is wrongly updated.
>>>>>> >
>>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>>> current
>>>>>> > design, we could not guarantee that the node who tries to write his
>>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>>> > address this issue, we need to store all the owners of the key.
>>>>>> Only when
>>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>>> graph) could
>>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>>> when
>>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>>> lock. To
>>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>>> to the
>>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>>> >
>>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>>> in my
>>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>>> StatefulSet to
>>>>>> > avoid leader election and leader retrieval. But I am not sure
>>>>>> whether
>>>>>> > TaskManager could properly handle the situation that same hostname
>>>>>> with
>>>>>> > different IPs, because the JobManager failed and relaunched. Also
>>>>>> we may
>>>>>> > still have two JobManagers running in some corner cases(e.g.
>>>>>> kubelet is
>>>>>> > down but the pod is running). Another concern is we have a strong
>>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>>> But it
>>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>>> Moreover,
>>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>>> once. Since
>>>>>> > the native HA proposal could cover all the functionality of
>>>>>> StatefulSet
>>>>>> > proposal, that's why I prefer the former.
>>>>>> >
>>>>>> >
>>>>>> > [1].
>>>>>> >
>>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>>> >
>>>>>> > Best,
>>>>>> > Yang
>>>>>> >
>>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>>> >
>>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
>>>>>> our users
>>>>>> >> will like a ZooKeeper-less HA setup.
>>>>>> >>
>>>>>> >> +1 for not separating the leader information and the leader
>>>>>> election if
>>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>>> leader
>>>>>> >> information directly when trying to obtain the leadership by
>>>>>> performing a
>>>>>> >> versioned write operation.
>>>>>> >>
>>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>>> there be
>>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>>> how can
>>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>>> leader
>>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>>> problem
>>>>>> >> (we should probably change it at some point to simply use a
>>>>>> >> transaction which checks whether the writer is still the leader)
>>>>>> and
>>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>>> that
>>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>>> owner
>>>>>> >> will then be responsible for the cleanup of the node.
>>>>>> >>
>>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>>> because
>>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>>> key-value
>>>>>> >> pairs it might be simpler to start with this approach where we
>>>>>> only have
>>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>>> Was
>>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>>> >> generality)?
>>>>>> >>
>>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>>> you
>>>>>> >> already implemented a K8s based HA service.
>>>>>> >>
>>>>>> >> Cheers,
>>>>>> >> Till
>>>>>> >>
>>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>> Hi Xintong and Stephan,
>>>>>> >>>
>>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>>> >>> comments inline.
>>>>>> >>>
>>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>>> >>>
>>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>>> rest
>>>>>> >>> server component) for the leader election
>>>>>> >>> and storage. Once a JobManager win the election, it will update
>>>>>> the
>>>>>> >>> ConfigMap with leader address and periodically
>>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>>> update
>>>>>> >>> the FLIP document, including the architecture diagram,
>>>>>> >>> to avoid the misunderstanding.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > Lock and release
>>>>>> >>>
>>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>>> will be
>>>>>> >>> deleted by the ZK server automatically when
>>>>>> >>> the client is timeout. It could happen in a bad network
>>>>>> environment or
>>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>>> lock a
>>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>>> annotation
>>>>>> >>> will be cleaned up when releasing the lock. When
>>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
>>>>>> the
>>>>>> >>> following conditions. If not, the delete operation could not be
>>>>>> done.
>>>>>> >>> * Current instance is the owner of the key.
>>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>>> released the
>>>>>> >>> lock.
>>>>>> >>> * The owner annotation timed out, which usually indicate the
>>>>>> owner died.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > HA data clean up
>>>>>> >>>
>>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>>> pods
>>>>>> >>> to JobManager Deployment. So when we want to
>>>>>> >>> destroy a Flink cluster, we just need to delete the
>>>>>> deployment[2]. For
>>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>>> >>> so that they could be retained even though we delete the whole
>>>>>> Flink
>>>>>> >>> cluster.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> [1].
>>>>>> >>>
>>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>>> >>> [2].
>>>>>> >>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Best,
>>>>>> >>> Yang
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>>> >>>
>>>>>> >>>> This is a very cool feature proposal.
>>>>>> >>>>
>>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>>> overly
>>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>>> than the
>>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>>> converge and the
>>>>>> >>>> can be temporarily out of sync.
>>>>>> >>>>
>>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>>> in the
>>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>>> token is
>>>>>> >>>> stored as payload of the lock.
>>>>>> >>>> I think for the design above it would mean having a single
>>>>>> ConfigMap
>>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>>> >>>>
>>>>>> >>>> This probably serves as a good design principle in general - not
>>>>>> divide
>>>>>> >>>> information that is updated together over different resources.
>>>>>> >>>>
>>>>>> >>>> Best,
>>>>>> >>>> Stephan
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>>> tonysong820@gmail.com>
>>>>>> >>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>>> >>>>>
>>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>>> reduce the
>>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
>>>>>> think this is an
>>>>>> >>>>> attractive feature for users.
>>>>>> >>>>>
>>>>>> >>>>> Concerning the proposed design, I have some questions. Might
>>>>>> not be
>>>>>> >>>>> problems, just trying to understand.
>>>>>> >>>>>
>>>>>> >>>>> ## Architecture
>>>>>> >>>>>
>>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>>> contending
>>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>>> ConfigMaps are
>>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>>> becoming leader
>>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>>> leader's
>>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > Lock and release
>>>>>> >>>>>
>>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>>> lock so
>>>>>> >>>>> that other peers can write/remove the stored object. What if
>>>>>> the previous
>>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>>> Would there
>>>>>> >>>>> be any problem?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > HA data clean up
>>>>>> >>>>>
>>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>>> <ClusterID>`,
>>>>>> >>>>> how are the HA dada retained?
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> Thank you~
>>>>>> >>>>>
>>>>>> >>>>> Xintong Song
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>>> danrtsey.wy@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>>> Hi devs and users,
>>>>>> >>>>>>
>>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>>> will
>>>>>> >>>>>> introduce
>>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>>> >>>>>>
>>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>>> widely
>>>>>> >>>>>> used
>>>>>> >>>>>> in production environments. It could be integrated in
>>>>>> standalone
>>>>>> >>>>>> cluster,
>>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>>> in K8s
>>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>>> cluster.
>>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> >>>>>> election[2]
>>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
>>>>>> leverage these
>>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>>> more
>>>>>> >>>>>> convenient.
>>>>>> >>>>>>
>>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>>> the new
>>>>>> >>>>>> introduced KubernetesHaService.
>>>>>> >>>>>>
>>>>>> >>>>>> [1].
>>>>>> >>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> >>>>>> [2].
>>>>>> >>>>>>
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> >>>>>> [3].
>>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>> >>>>>>
>>>>>> >>>>>> Looking forward to your feedback.
>>>>>> >>>>>>
>>>>>> >>>>>> Best,
>>>>>> >>>>>> Yang
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>>
>>>>>