You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yang Wang <da...@gmail.com> on 2020/09/15 03:25:36 UTC

[DISCUSS] FLIP-144: Native Kubernetes HA for Flink

 Hi devs and users,

I would like to start the discussion about FLIP-144[1], which will introduce
a new native high availability service for Kubernetes.

Currently, Flink has provided Zookeeper HA service and been widely used
in production environments. It could be integrated in standalone cluster,
Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
will take additional cost since we need to manage a Zookeeper cluster.
In the meantime, K8s has provided some public API for leader election[2]
and configuration storage(i.e. ConfigMap[3]). We could leverage these
features and make running HA configured Flink cluster on K8s more
convenient.

Both the standalone on K8s and native K8s could benefit from the new
introduced KubernetesHaService.

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[2].
https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
[3]. https://kubernetes.io/docs/concepts/configuration/configmap/

Looking forward to your feedback.

Best,
Yang

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
3. Make sense to me. And we could add a new HA solution "StatefulSet + PV +
FileSystem"
at any time if we need in the future.

Since there are no more open questions, I will start the voting now.
Thanks all for your comments and feedback. Feel feel to continue the
discussion if you get
other concerns.


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年10月1日周四 下午4:52写道:

> 3. We could avoid force deletions from within Flink. If the user does it,
> then we don't give guarantees.
>
> I am fine with your current proposal. +1 for moving forward with it.
>
> Cheers,
> Till
>
> On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:
>
> > 2. Yes. This is exactly what I mean. Storing the HA information relevant
> > to a specific component in a single ConfigMap and ensuring that
> “Get(check
> > the leader)-and-Update(write back to the ConfigMap)” is a transactional
> > operation. Since we only store the job graph stateHandler(not the real
> > data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> > ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> > could we have more than 1000 Flink jobs in a Flink session cluster.
> >
> > 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> > could provide at most one semantics if no manually force-deletion
> > happened[1]. Based on the previous discussion, we have successfully
> avoided
> > the "lock-and-release" in the implementation. So I still insist on using
> > the current Deployment.
> >
> >
> > [1].
> >
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
> >
> >
> > Best,
> > Yang
> >
> > Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
> >
> >> Thanks for the clarifications Yang Wang.
> >>
> >> 2. Keeping the HA information relevant for a component (Dispatcher,
> >> JobManager, ResourceManager) in a single ConfigMap sounds good. We
> should
> >> check that we don't exceed the 1 MB size limit with this approach
> though.
> >> The Dispatcher's ConfigMap would then contain the current leader, the
> >> running jobs and the pointers to the persisted JobGraphs. The
> JobManager's
> >> ConfigMap would then contain the current leader, the pointers to the
> >> checkpoints and the checkpoint ID counter, for example.
> >>
> >> 3. Ah ok, I somehow thought that K8s would give us stronger
> >> guarantees than Yarn in this regard. That's a pity.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
> >>
> >>> Thanks for your explanation. It would be fine if only checking
> >>> leadership & actually write information is atomic.
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
> >>>
> >>>> Thanks till and tison for your comments.
> >>>>
> >>>> @Till Rohrmann <tr...@apache.org>
> >>>> 1. I am afraid we could not do this if we are going to use fabric8
> >>>> Kubernetes client SDK for the leader election. The official
> Kubernetes Java
> >>>> client[1] also could not support it. Unless we implement a new
> >>>> LeaderElector in Flink based on the very basic Kubernetes API. But it
> seems
> >>>> that we could gain too much from this.
> >>>>
> >>>> 2. Yes, the implementation will be a little complicated if we want to
> >>>> completely eliminate the residual job graphs or checkpoints. Inspired
> by
> >>>> your suggestion, another different solution has come into my mind. We
> could
> >>>> use a same ConfigMap storing the JobManager leader, job graph,
> >>>> checkpoint-counter, checkpoint. Each job will have a specific
> ConfigMap for
> >>>> the HA meta storage. Then it will be easier to guarantee that only the
> >>>> leader could write the ConfigMap in a transactional operation. Since
> >>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> >>>> transactional operation.
> >>>>
> >>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
> >>>> However, we still have the chances that two JobManager are running and
> >>>> trying to get/delete a key in the same ConfigMap concurrently.
> Imagine that
> >>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
> >>>> could not be deleted. A new JobManager pod will be launched. We are
> just in
> >>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
> >>>> benefit is we do not need to implement a leader election/retrieval
> service.
> >>>>
> >>>> @tison
> >>>> Actually, I do not think we will have such issue in the Kubernetes HA
> >>>> service. In the Kubernetes LeaderElector[2], we have the leader
> information
> >>>> stored on the annotation of leader ConfigMap. So it would not happen
> the
> >>>> old leader could wrongly override the leader information. Once a
> JobManager
> >>>> want to write his leader information to the ConfigMap, it will check
> >>>> whether it is the leader now. If not, anything will happen. Moreover,
> the
> >>>> Kubernetes Resource Version[3] ensures that no one else has snuck in
> and
> >>>> written a different update while the client was in the process of
> >>>> performing its update.
> >>>>
> >>>>
> >>>> [1].
> >>>>
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> >>>> [2].
> >>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> >>>> <
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >
> >>>> [3].
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
> >>>>
> >>>>
> >>>> Best,
> >>>> Yang
> >>>>
> >>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Generally +1 for a native k8s HA service.
> >>>>>
> >>>>> For leader election & publish leader information, there was a
> >>>>> discussion[1]
> >>>>> pointed out that since these two actions is NOT atomic, there will be
> >>>>> always
> >>>>> edge case where a previous leader overwrite leader information, even
> >>>>> with
> >>>>> versioned write. Versioned write helps on read again if version
> >>>>> mismatches
> >>>>> so if we want version write works, information in the kv pair should
> >>>>> help the
> >>>>> contender reflects whether it is the current leader.
> >>>>>
> >>>>> The idea of writes leader information on contender node or something
> >>>>> equivalent makes sense but the details depends on how it is
> >>>>> implemented.
> >>>>> General problems are that
> >>>>>
> >>>>> 1. TM might be a bit late before it updated correct leader
> information
> >>>>> but
> >>>>> only if the leader election process is short and leadership is stable
> >>>>> at most
> >>>>> time, it won't be a serious issue.
> >>>>> 2. The process TM extract leader information might be a bit more
> >>>>> complex
> >>>>> than directly watching a fixed key.
> >>>>>
> >>>>> Atomic issue can be addressed if one leverages low APIs such as lease
> >>>>> & txn
> >>>>> but it causes more developing efforts. ConfigMap and encapsulated
> >>>>> interface,
> >>>>> thought, provides only a self-consistent mechanism which doesn't
> >>>>> promise
> >>>>> more consistency for extension.
> >>>>>
> >>>>> Best,
> >>>>> tison.
> >>>>>
> >>>>> [1]
> >>>>>
> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
> >>>>>
> >>>>>
> >>>>>
> >>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
> >>>>>
> >>>>>> For 1. I was wondering whether we can't write the leader connection
> >>>>>> information directly when trying to obtain the leadership (trying to
> >>>>>> update
> >>>>>> the leader key with one's own value)? This might be a little detail,
> >>>>>> though.
> >>>>>>
> >>>>>> 2. Alright, so we are having a similar mechanism as we have in
> >>>>>> ZooKeeper
> >>>>>> with the ephemeral lock nodes. I guess that this complicates the
> >>>>>> implementation a bit, unfortunately.
> >>>>>>
> >>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One
> could
> >>>>>> configure a different persistent storage like HDFS or S3 for storing
> >>>>>> the
> >>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
> >>>>>> benefit I
> >>>>>> see is that we avoid having to implement this multi locking
> mechanism
> >>>>>> in
> >>>>>> the ConfigMaps using the annotations because we can be sure that
> >>>>>> there is
> >>>>>> only a single leader at a time if I understood the guarantees of K8s
> >>>>>> correctly.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Till
> >>>>>>
> >>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> > Hi Till, thanks for your valuable feedback.
> >>>>>> >
> >>>>>> > 1. Yes, leader election and storing leader information will use a
> >>>>>> same
> >>>>>> > ConfigMap. When a contender successfully performs a versioned
> >>>>>> annotation
> >>>>>> > update operation to the ConfigMap, it means that it has been
> >>>>>> elected as the
> >>>>>> > leader. And it will write the leader information in the callback
> of
> >>>>>> leader
> >>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
> >>>>>> the
> >>>>>> > leader ConfigMap is wrongly updated.
> >>>>>> >
> >>>>>> > 2. The lock and release is really a valid concern. Actually in
> >>>>>> current
> >>>>>> > design, we could not guarantee that the node who tries to write
> his
> >>>>>> > ownership is the real leader. Who writes later, who is the owner.
> To
> >>>>>> > address this issue, we need to store all the owners of the key.
> >>>>>> Only when
> >>>>>> > the owner is empty, the specific key(means a checkpoint or job
> >>>>>> graph) could
> >>>>>> > be deleted. However, we may have a residual checkpoint or job
> graph
> >>>>>> when
> >>>>>> > the old JobManager crashed exceptionally and do not release the
> >>>>>> lock. To
> >>>>>> > solve this problem completely, we need a timestamp renew mechanism
> >>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help
> us
> >>>>>> to the
> >>>>>> > check the JobManager timeout and then clean up the residual keys.
> >>>>>> >
> >>>>>> > 3. Frankly speaking, I am not against with this solution. However,
> >>>>>> in my
> >>>>>> > opinion, it is more like a temporary proposal. We could use
> >>>>>> StatefulSet to
> >>>>>> > avoid leader election and leader retrieval. But I am not sure
> >>>>>> whether
> >>>>>> > TaskManager could properly handle the situation that same hostname
> >>>>>> with
> >>>>>> > different IPs, because the JobManager failed and relaunched. Also
> >>>>>> we may
> >>>>>> > still have two JobManagers running in some corner cases(e.g.
> >>>>>> kubelet is
> >>>>>> > down but the pod is running). Another concern is we have a strong
> >>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
> >>>>>> But it
> >>>>>> > is not always true especially in self-build Kubernetes cluster.
> >>>>>> Moreover,
> >>>>>> > PV provider should guarantee that each PV could only be mounted
> >>>>>> once. Since
> >>>>>> > the native HA proposal could cover all the functionality of
> >>>>>> StatefulSet
> >>>>>> > proposal, that's why I prefer the former.
> >>>>>> >
> >>>>>> >
> >>>>>> > [1].
> >>>>>> >
> >>>>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >>>>>> >
> >>>>>> > Best,
> >>>>>> > Yang
> >>>>>> >
> >>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
> >>>>>> >
> >>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
> >>>>>> our users
> >>>>>> >> will like a ZooKeeper-less HA setup.
> >>>>>> >>
> >>>>>> >> +1 for not separating the leader information and the leader
> >>>>>> election if
> >>>>>> >> possible. Maybe it is even possible that the contender writes his
> >>>>>> leader
> >>>>>> >> information directly when trying to obtain the leadership by
> >>>>>> performing a
> >>>>>> >> versioned write operation.
> >>>>>> >>
> >>>>>> >> Concerning the lock and release operation I have a question: Can
> >>>>>> there be
> >>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If
> not,
> >>>>>> how can
> >>>>>> >> we ensure that the node which writes his ownership is actually
> the
> >>>>>> leader
> >>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
> >>>>>> problem
> >>>>>> >> (we should probably change it at some point to simply use a
> >>>>>> >> transaction which checks whether the writer is still the leader)
> >>>>>> and
> >>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
> >>>>>> that
> >>>>>> >> there can be multiple owners of a given ZNode at a time. The last
> >>>>>> owner
> >>>>>> >> will then be responsible for the cleanup of the node.
> >>>>>> >>
> >>>>>> >> I see the benefit of your proposal over the stateful set proposal
> >>>>>> because
> >>>>>> >> it can support multiple standby JMs. Given the problem of locking
> >>>>>> key-value
> >>>>>> >> pairs it might be simpler to start with this approach where we
> >>>>>> only have
> >>>>>> >> single JM. This might already add a lot of benefits for our
> users.
> >>>>>> Was
> >>>>>> >> there a specific reason why you discarded this proposal (other
> than
> >>>>>> >> generality)?
> >>>>>> >>
> >>>>>> >> @Uce it would be great to hear your feedback on the proposal
> since
> >>>>>> you
> >>>>>> >> already implemented a K8s based HA service.
> >>>>>> >>
> >>>>>> >> Cheers,
> >>>>>> >> Till
> >>>>>> >>
> >>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey.wy@gmail.com
> >
> >>>>>> wrote:
> >>>>>> >>
> >>>>>> >>> Hi Xintong and Stephan,
> >>>>>> >>>
> >>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
> >>>>>> >>> comments inline.
> >>>>>> >>>
> >>>>>> >>> # Architecture -> One or two ConfigMaps
> >>>>>> >>>
> >>>>>> >>> Both of you are right. One ConfigMap will make the design and
> >>>>>> >>> implementation easier. Actually, in my POC codes,
> >>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
> >>>>>> rest
> >>>>>> >>> server component) for the leader election
> >>>>>> >>> and storage. Once a JobManager win the election, it will update
> >>>>>> the
> >>>>>> >>> ConfigMap with leader address and periodically
> >>>>>> >>> renew the lock annotation to keep as the active leader. I will
> >>>>>> update
> >>>>>> >>> the FLIP document, including the architecture diagram,
> >>>>>> >>> to avoid the misunderstanding.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > Lock and release
> >>>>>> >>>
> >>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
> >>>>>> will be
> >>>>>> >>> deleted by the ZK server automatically when
> >>>>>> >>> the client is timeout. It could happen in a bad network
> >>>>>> environment or
> >>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
> >>>>>> >>> we need to implement a similar mechanism. First, when we want to
> >>>>>> lock a
> >>>>>> >>> specific key in ConfigMap, we will put the owner identify,
> >>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
> >>>>>> annotation
> >>>>>> >>> will be cleaned up when releasing the lock. When
> >>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
> >>>>>> the
> >>>>>> >>> following conditions. If not, the delete operation could not be
> >>>>>> done.
> >>>>>> >>> * Current instance is the owner of the key.
> >>>>>> >>> * The owner annotation is empty, which means the owner has
> >>>>>> released the
> >>>>>> >>> lock.
> >>>>>> >>> * The owner annotation timed out, which usually indicate the
> >>>>>> owner died.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > HA data clean up
> >>>>>> >>>
> >>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
> >>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
> >>>>>> >>> we set owner of the flink-conf configmap, service and
> TaskManager
> >>>>>> pods
> >>>>>> >>> to JobManager Deployment. So when we want to
> >>>>>> >>> destroy a Flink cluster, we just need to delete the
> >>>>>> deployment[2]. For
> >>>>>> >>> the HA related ConfigMaps, we do not set the owner
> >>>>>> >>> so that they could be retained even though we delete the whole
> >>>>>> Flink
> >>>>>> >>> cluster.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> [1].
> >>>>>> >>>
> >>>>>>
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> >>>>>> >>> [2].
> >>>>>> >>>
> >>>>>>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Best,
> >>>>>> >>> Yang
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
> >>>>>> >>>
> >>>>>> >>>> This is a very cool feature proposal.
> >>>>>> >>>>
> >>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
> >>>>>> overly
> >>>>>> >>>> complicated to have the Leader RPC address in a different node
> >>>>>> than the
> >>>>>> >>>> LeaderLock. There is extra code needed to make sure these
> >>>>>> converge and the
> >>>>>> >>>> can be temporarily out of sync.
> >>>>>> >>>>
> >>>>>> >>>> A much easier design would be to have the RPC address as
> payload
> >>>>>> in the
> >>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
> >>>>>> token is
> >>>>>> >>>> stored as payload of the lock.
> >>>>>> >>>> I think for the design above it would mean having a single
> >>>>>> ConfigMap
> >>>>>> >>>> for both leader lock and leader RPC address discovery.
> >>>>>> >>>>
> >>>>>> >>>> This probably serves as a good design principle in general -
> not
> >>>>>> divide
> >>>>>> >>>> information that is updated together over different resources.
> >>>>>> >>>>
> >>>>>> >>>> Best,
> >>>>>> >>>> Stephan
> >>>>>> >>>>
> >>>>>> >>>>
> >>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
> >>>>>> tonysong820@gmail.com>
> >>>>>> >>>> wrote:
> >>>>>> >>>>
> >>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
> >>>>>> >>>>>
> >>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging
> Kubernetes's
> >>>>>> >>>>> buildtin ConfigMap for Flink's HA services should
> significantly
> >>>>>> reduce the
> >>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
> >>>>>> think this is an
> >>>>>> >>>>> attractive feature for users.
> >>>>>> >>>>>
> >>>>>> >>>>> Concerning the proposed design, I have some questions. Might
> >>>>>> not be
> >>>>>> >>>>> problems, just trying to understand.
> >>>>>> >>>>>
> >>>>>> >>>>> ## Architecture
> >>>>>> >>>>>
> >>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
> >>>>>> contending
> >>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
> >>>>>> ConfigMaps are
> >>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
> >>>>>> becoming leader
> >>>>>> >>>>> (lock for contending leader updated), but still gets the old
> >>>>>> leader's
> >>>>>> >>>>> address when trying to read `leader RPC address`?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > Lock and release
> >>>>>> >>>>>
> >>>>>> >>>>> It seems to me that the owner needs to explicitly release the
> >>>>>> lock so
> >>>>>> >>>>> that other peers can write/remove the stored object. What if
> >>>>>> the previous
> >>>>>> >>>>> owner failed to release the lock (e.g., dead before
> releasing)?
> >>>>>> Would there
> >>>>>> >>>>> be any problem?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > HA data clean up
> >>>>>> >>>>>
> >>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
> >>>>>> <ClusterID>`,
> >>>>>> >>>>> how are the HA dada retained?
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> Thank you~
> >>>>>> >>>>>
> >>>>>> >>>>> Xintong Song
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
> >>>>>> danrtsey.wy@gmail.com>
> >>>>>> >>>>> wrote:
> >>>>>> >>>>>
> >>>>>> >>>>>> Hi devs and users,
> >>>>>> >>>>>>
> >>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
> >>>>>> will
> >>>>>> >>>>>> introduce
> >>>>>> >>>>>> a new native high availability service for Kubernetes.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
> >>>>>> widely
> >>>>>> >>>>>> used
> >>>>>> >>>>>> in production environments. It could be integrated in
> >>>>>> standalone
> >>>>>> >>>>>> cluster,
> >>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
> >>>>>> in K8s
> >>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
> >>>>>> cluster.
> >>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
> >>>>>> >>>>>> election[2]
> >>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
> >>>>>> leverage these
> >>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
> >>>>>> more
> >>>>>> >>>>>> convenient.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
> >>>>>> the new
> >>>>>> >>>>>> introduced KubernetesHaService.
> >>>>>> >>>>>>
> >>>>>> >>>>>> [1].
> >>>>>> >>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> >>>>>> >>>>>> [2].
> >>>>>> >>>>>>
> >>>>>>
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> >>>>>> >>>>>> [3].
> >>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
> >>>>>> >>>>>>
> >>>>>> >>>>>> Looking forward to your feedback.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Best,
> >>>>>> >>>>>> Yang
> >>>>>> >>>>>>
> >>>>>> >>>>>
> >>>>>>
> >>>>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
3. Make sense to me. And we could add a new HA solution "StatefulSet + PV +
FileSystem"
at any time if we need in the future.

Since there are no more open questions, I will start the voting now.
Thanks all for your comments and feedback. Feel feel to continue the
discussion if you get
other concerns.


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年10月1日周四 下午4:52写道:

> 3. We could avoid force deletions from within Flink. If the user does it,
> then we don't give guarantees.
>
> I am fine with your current proposal. +1 for moving forward with it.
>
> Cheers,
> Till
>
> On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:
>
> > 2. Yes. This is exactly what I mean. Storing the HA information relevant
> > to a specific component in a single ConfigMap and ensuring that
> “Get(check
> > the leader)-and-Update(write back to the ConfigMap)” is a transactional
> > operation. Since we only store the job graph stateHandler(not the real
> > data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> > ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> > could we have more than 1000 Flink jobs in a Flink session cluster.
> >
> > 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> > could provide at most one semantics if no manually force-deletion
> > happened[1]. Based on the previous discussion, we have successfully
> avoided
> > the "lock-and-release" in the implementation. So I still insist on using
> > the current Deployment.
> >
> >
> > [1].
> >
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
> >
> >
> > Best,
> > Yang
> >
> > Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
> >
> >> Thanks for the clarifications Yang Wang.
> >>
> >> 2. Keeping the HA information relevant for a component (Dispatcher,
> >> JobManager, ResourceManager) in a single ConfigMap sounds good. We
> should
> >> check that we don't exceed the 1 MB size limit with this approach
> though.
> >> The Dispatcher's ConfigMap would then contain the current leader, the
> >> running jobs and the pointers to the persisted JobGraphs. The
> JobManager's
> >> ConfigMap would then contain the current leader, the pointers to the
> >> checkpoints and the checkpoint ID counter, for example.
> >>
> >> 3. Ah ok, I somehow thought that K8s would give us stronger
> >> guarantees than Yarn in this regard. That's a pity.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
> >>
> >>> Thanks for your explanation. It would be fine if only checking
> >>> leadership & actually write information is atomic.
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
> >>>
> >>>> Thanks till and tison for your comments.
> >>>>
> >>>> @Till Rohrmann <tr...@apache.org>
> >>>> 1. I am afraid we could not do this if we are going to use fabric8
> >>>> Kubernetes client SDK for the leader election. The official
> Kubernetes Java
> >>>> client[1] also could not support it. Unless we implement a new
> >>>> LeaderElector in Flink based on the very basic Kubernetes API. But it
> seems
> >>>> that we could gain too much from this.
> >>>>
> >>>> 2. Yes, the implementation will be a little complicated if we want to
> >>>> completely eliminate the residual job graphs or checkpoints. Inspired
> by
> >>>> your suggestion, another different solution has come into my mind. We
> could
> >>>> use a same ConfigMap storing the JobManager leader, job graph,
> >>>> checkpoint-counter, checkpoint. Each job will have a specific
> ConfigMap for
> >>>> the HA meta storage. Then it will be easier to guarantee that only the
> >>>> leader could write the ConfigMap in a transactional operation. Since
> >>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> >>>> transactional operation.
> >>>>
> >>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
> >>>> However, we still have the chances that two JobManager are running and
> >>>> trying to get/delete a key in the same ConfigMap concurrently.
> Imagine that
> >>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
> >>>> could not be deleted. A new JobManager pod will be launched. We are
> just in
> >>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
> >>>> benefit is we do not need to implement a leader election/retrieval
> service.
> >>>>
> >>>> @tison
> >>>> Actually, I do not think we will have such issue in the Kubernetes HA
> >>>> service. In the Kubernetes LeaderElector[2], we have the leader
> information
> >>>> stored on the annotation of leader ConfigMap. So it would not happen
> the
> >>>> old leader could wrongly override the leader information. Once a
> JobManager
> >>>> want to write his leader information to the ConfigMap, it will check
> >>>> whether it is the leader now. If not, anything will happen. Moreover,
> the
> >>>> Kubernetes Resource Version[3] ensures that no one else has snuck in
> and
> >>>> written a different update while the client was in the process of
> >>>> performing its update.
> >>>>
> >>>>
> >>>> [1].
> >>>>
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> >>>> [2].
> >>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> >>>> <
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >
> >>>> [3].
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
> >>>>
> >>>>
> >>>> Best,
> >>>> Yang
> >>>>
> >>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Generally +1 for a native k8s HA service.
> >>>>>
> >>>>> For leader election & publish leader information, there was a
> >>>>> discussion[1]
> >>>>> pointed out that since these two actions is NOT atomic, there will be
> >>>>> always
> >>>>> edge case where a previous leader overwrite leader information, even
> >>>>> with
> >>>>> versioned write. Versioned write helps on read again if version
> >>>>> mismatches
> >>>>> so if we want version write works, information in the kv pair should
> >>>>> help the
> >>>>> contender reflects whether it is the current leader.
> >>>>>
> >>>>> The idea of writes leader information on contender node or something
> >>>>> equivalent makes sense but the details depends on how it is
> >>>>> implemented.
> >>>>> General problems are that
> >>>>>
> >>>>> 1. TM might be a bit late before it updated correct leader
> information
> >>>>> but
> >>>>> only if the leader election process is short and leadership is stable
> >>>>> at most
> >>>>> time, it won't be a serious issue.
> >>>>> 2. The process TM extract leader information might be a bit more
> >>>>> complex
> >>>>> than directly watching a fixed key.
> >>>>>
> >>>>> Atomic issue can be addressed if one leverages low APIs such as lease
> >>>>> & txn
> >>>>> but it causes more developing efforts. ConfigMap and encapsulated
> >>>>> interface,
> >>>>> thought, provides only a self-consistent mechanism which doesn't
> >>>>> promise
> >>>>> more consistency for extension.
> >>>>>
> >>>>> Best,
> >>>>> tison.
> >>>>>
> >>>>> [1]
> >>>>>
> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
> >>>>>
> >>>>>
> >>>>>
> >>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
> >>>>>
> >>>>>> For 1. I was wondering whether we can't write the leader connection
> >>>>>> information directly when trying to obtain the leadership (trying to
> >>>>>> update
> >>>>>> the leader key with one's own value)? This might be a little detail,
> >>>>>> though.
> >>>>>>
> >>>>>> 2. Alright, so we are having a similar mechanism as we have in
> >>>>>> ZooKeeper
> >>>>>> with the ephemeral lock nodes. I guess that this complicates the
> >>>>>> implementation a bit, unfortunately.
> >>>>>>
> >>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One
> could
> >>>>>> configure a different persistent storage like HDFS or S3 for storing
> >>>>>> the
> >>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
> >>>>>> benefit I
> >>>>>> see is that we avoid having to implement this multi locking
> mechanism
> >>>>>> in
> >>>>>> the ConfigMaps using the annotations because we can be sure that
> >>>>>> there is
> >>>>>> only a single leader at a time if I understood the guarantees of K8s
> >>>>>> correctly.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Till
> >>>>>>
> >>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> > Hi Till, thanks for your valuable feedback.
> >>>>>> >
> >>>>>> > 1. Yes, leader election and storing leader information will use a
> >>>>>> same
> >>>>>> > ConfigMap. When a contender successfully performs a versioned
> >>>>>> annotation
> >>>>>> > update operation to the ConfigMap, it means that it has been
> >>>>>> elected as the
> >>>>>> > leader. And it will write the leader information in the callback
> of
> >>>>>> leader
> >>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
> >>>>>> the
> >>>>>> > leader ConfigMap is wrongly updated.
> >>>>>> >
> >>>>>> > 2. The lock and release is really a valid concern. Actually in
> >>>>>> current
> >>>>>> > design, we could not guarantee that the node who tries to write
> his
> >>>>>> > ownership is the real leader. Who writes later, who is the owner.
> To
> >>>>>> > address this issue, we need to store all the owners of the key.
> >>>>>> Only when
> >>>>>> > the owner is empty, the specific key(means a checkpoint or job
> >>>>>> graph) could
> >>>>>> > be deleted. However, we may have a residual checkpoint or job
> graph
> >>>>>> when
> >>>>>> > the old JobManager crashed exceptionally and do not release the
> >>>>>> lock. To
> >>>>>> > solve this problem completely, we need a timestamp renew mechanism
> >>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help
> us
> >>>>>> to the
> >>>>>> > check the JobManager timeout and then clean up the residual keys.
> >>>>>> >
> >>>>>> > 3. Frankly speaking, I am not against with this solution. However,
> >>>>>> in my
> >>>>>> > opinion, it is more like a temporary proposal. We could use
> >>>>>> StatefulSet to
> >>>>>> > avoid leader election and leader retrieval. But I am not sure
> >>>>>> whether
> >>>>>> > TaskManager could properly handle the situation that same hostname
> >>>>>> with
> >>>>>> > different IPs, because the JobManager failed and relaunched. Also
> >>>>>> we may
> >>>>>> > still have two JobManagers running in some corner cases(e.g.
> >>>>>> kubelet is
> >>>>>> > down but the pod is running). Another concern is we have a strong
> >>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
> >>>>>> But it
> >>>>>> > is not always true especially in self-build Kubernetes cluster.
> >>>>>> Moreover,
> >>>>>> > PV provider should guarantee that each PV could only be mounted
> >>>>>> once. Since
> >>>>>> > the native HA proposal could cover all the functionality of
> >>>>>> StatefulSet
> >>>>>> > proposal, that's why I prefer the former.
> >>>>>> >
> >>>>>> >
> >>>>>> > [1].
> >>>>>> >
> >>>>>>
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >>>>>> >
> >>>>>> > Best,
> >>>>>> > Yang
> >>>>>> >
> >>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
> >>>>>> >
> >>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
> >>>>>> our users
> >>>>>> >> will like a ZooKeeper-less HA setup.
> >>>>>> >>
> >>>>>> >> +1 for not separating the leader information and the leader
> >>>>>> election if
> >>>>>> >> possible. Maybe it is even possible that the contender writes his
> >>>>>> leader
> >>>>>> >> information directly when trying to obtain the leadership by
> >>>>>> performing a
> >>>>>> >> versioned write operation.
> >>>>>> >>
> >>>>>> >> Concerning the lock and release operation I have a question: Can
> >>>>>> there be
> >>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If
> not,
> >>>>>> how can
> >>>>>> >> we ensure that the node which writes his ownership is actually
> the
> >>>>>> leader
> >>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
> >>>>>> problem
> >>>>>> >> (we should probably change it at some point to simply use a
> >>>>>> >> transaction which checks whether the writer is still the leader)
> >>>>>> and
> >>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
> >>>>>> that
> >>>>>> >> there can be multiple owners of a given ZNode at a time. The last
> >>>>>> owner
> >>>>>> >> will then be responsible for the cleanup of the node.
> >>>>>> >>
> >>>>>> >> I see the benefit of your proposal over the stateful set proposal
> >>>>>> because
> >>>>>> >> it can support multiple standby JMs. Given the problem of locking
> >>>>>> key-value
> >>>>>> >> pairs it might be simpler to start with this approach where we
> >>>>>> only have
> >>>>>> >> single JM. This might already add a lot of benefits for our
> users.
> >>>>>> Was
> >>>>>> >> there a specific reason why you discarded this proposal (other
> than
> >>>>>> >> generality)?
> >>>>>> >>
> >>>>>> >> @Uce it would be great to hear your feedback on the proposal
> since
> >>>>>> you
> >>>>>> >> already implemented a K8s based HA service.
> >>>>>> >>
> >>>>>> >> Cheers,
> >>>>>> >> Till
> >>>>>> >>
> >>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey.wy@gmail.com
> >
> >>>>>> wrote:
> >>>>>> >>
> >>>>>> >>> Hi Xintong and Stephan,
> >>>>>> >>>
> >>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
> >>>>>> >>> comments inline.
> >>>>>> >>>
> >>>>>> >>> # Architecture -> One or two ConfigMaps
> >>>>>> >>>
> >>>>>> >>> Both of you are right. One ConfigMap will make the design and
> >>>>>> >>> implementation easier. Actually, in my POC codes,
> >>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
> >>>>>> rest
> >>>>>> >>> server component) for the leader election
> >>>>>> >>> and storage. Once a JobManager win the election, it will update
> >>>>>> the
> >>>>>> >>> ConfigMap with leader address and periodically
> >>>>>> >>> renew the lock annotation to keep as the active leader. I will
> >>>>>> update
> >>>>>> >>> the FLIP document, including the architecture diagram,
> >>>>>> >>> to avoid the misunderstanding.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > Lock and release
> >>>>>> >>>
> >>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
> >>>>>> will be
> >>>>>> >>> deleted by the ZK server automatically when
> >>>>>> >>> the client is timeout. It could happen in a bad network
> >>>>>> environment or
> >>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
> >>>>>> >>> we need to implement a similar mechanism. First, when we want to
> >>>>>> lock a
> >>>>>> >>> specific key in ConfigMap, we will put the owner identify,
> >>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
> >>>>>> annotation
> >>>>>> >>> will be cleaned up when releasing the lock. When
> >>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
> >>>>>> the
> >>>>>> >>> following conditions. If not, the delete operation could not be
> >>>>>> done.
> >>>>>> >>> * Current instance is the owner of the key.
> >>>>>> >>> * The owner annotation is empty, which means the owner has
> >>>>>> released the
> >>>>>> >>> lock.
> >>>>>> >>> * The owner annotation timed out, which usually indicate the
> >>>>>> owner died.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> # HA storage > HA data clean up
> >>>>>> >>>
> >>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
> >>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
> >>>>>> >>> we set owner of the flink-conf configmap, service and
> TaskManager
> >>>>>> pods
> >>>>>> >>> to JobManager Deployment. So when we want to
> >>>>>> >>> destroy a Flink cluster, we just need to delete the
> >>>>>> deployment[2]. For
> >>>>>> >>> the HA related ConfigMaps, we do not set the owner
> >>>>>> >>> so that they could be retained even though we delete the whole
> >>>>>> Flink
> >>>>>> >>> cluster.
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> [1].
> >>>>>> >>>
> >>>>>>
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> >>>>>> >>> [2].
> >>>>>> >>>
> >>>>>>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Best,
> >>>>>> >>> Yang
> >>>>>> >>>
> >>>>>> >>>
> >>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
> >>>>>> >>>
> >>>>>> >>>> This is a very cool feature proposal.
> >>>>>> >>>>
> >>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
> >>>>>> overly
> >>>>>> >>>> complicated to have the Leader RPC address in a different node
> >>>>>> than the
> >>>>>> >>>> LeaderLock. There is extra code needed to make sure these
> >>>>>> converge and the
> >>>>>> >>>> can be temporarily out of sync.
> >>>>>> >>>>
> >>>>>> >>>> A much easier design would be to have the RPC address as
> payload
> >>>>>> in the
> >>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
> >>>>>> token is
> >>>>>> >>>> stored as payload of the lock.
> >>>>>> >>>> I think for the design above it would mean having a single
> >>>>>> ConfigMap
> >>>>>> >>>> for both leader lock and leader RPC address discovery.
> >>>>>> >>>>
> >>>>>> >>>> This probably serves as a good design principle in general -
> not
> >>>>>> divide
> >>>>>> >>>> information that is updated together over different resources.
> >>>>>> >>>>
> >>>>>> >>>> Best,
> >>>>>> >>>> Stephan
> >>>>>> >>>>
> >>>>>> >>>>
> >>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
> >>>>>> tonysong820@gmail.com>
> >>>>>> >>>> wrote:
> >>>>>> >>>>
> >>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
> >>>>>> >>>>>
> >>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging
> Kubernetes's
> >>>>>> >>>>> buildtin ConfigMap for Flink's HA services should
> significantly
> >>>>>> reduce the
> >>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
> >>>>>> think this is an
> >>>>>> >>>>> attractive feature for users.
> >>>>>> >>>>>
> >>>>>> >>>>> Concerning the proposed design, I have some questions. Might
> >>>>>> not be
> >>>>>> >>>>> problems, just trying to understand.
> >>>>>> >>>>>
> >>>>>> >>>>> ## Architecture
> >>>>>> >>>>>
> >>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
> >>>>>> contending
> >>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
> >>>>>> ConfigMaps are
> >>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
> >>>>>> becoming leader
> >>>>>> >>>>> (lock for contending leader updated), but still gets the old
> >>>>>> leader's
> >>>>>> >>>>> address when trying to read `leader RPC address`?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > Lock and release
> >>>>>> >>>>>
> >>>>>> >>>>> It seems to me that the owner needs to explicitly release the
> >>>>>> lock so
> >>>>>> >>>>> that other peers can write/remove the stored object. What if
> >>>>>> the previous
> >>>>>> >>>>> owner failed to release the lock (e.g., dead before
> releasing)?
> >>>>>> Would there
> >>>>>> >>>>> be any problem?
> >>>>>> >>>>>
> >>>>>> >>>>> ## HA storage > HA data clean up
> >>>>>> >>>>>
> >>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
> >>>>>> <ClusterID>`,
> >>>>>> >>>>> how are the HA dada retained?
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> Thank you~
> >>>>>> >>>>>
> >>>>>> >>>>> Xintong Song
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>>
> >>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
> >>>>>> danrtsey.wy@gmail.com>
> >>>>>> >>>>> wrote:
> >>>>>> >>>>>
> >>>>>> >>>>>> Hi devs and users,
> >>>>>> >>>>>>
> >>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
> >>>>>> will
> >>>>>> >>>>>> introduce
> >>>>>> >>>>>> a new native high availability service for Kubernetes.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
> >>>>>> widely
> >>>>>> >>>>>> used
> >>>>>> >>>>>> in production environments. It could be integrated in
> >>>>>> standalone
> >>>>>> >>>>>> cluster,
> >>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
> >>>>>> in K8s
> >>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
> >>>>>> cluster.
> >>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
> >>>>>> >>>>>> election[2]
> >>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
> >>>>>> leverage these
> >>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
> >>>>>> more
> >>>>>> >>>>>> convenient.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
> >>>>>> the new
> >>>>>> >>>>>> introduced KubernetesHaService.
> >>>>>> >>>>>>
> >>>>>> >>>>>> [1].
> >>>>>> >>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> >>>>>> >>>>>> [2].
> >>>>>> >>>>>>
> >>>>>>
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> >>>>>> >>>>>> [3].
> >>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
> >>>>>> >>>>>>
> >>>>>> >>>>>> Looking forward to your feedback.
> >>>>>> >>>>>>
> >>>>>> >>>>>> Best,
> >>>>>> >>>>>> Yang
> >>>>>> >>>>>>
> >>>>>> >>>>>
> >>>>>>
> >>>>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
3. We could avoid force deletions from within Flink. If the user does it,
then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:

> 2. Yes. This is exactly what I mean. Storing the HA information relevant
> to a specific component in a single ConfigMap and ensuring that “Get(check
> the leader)-and-Update(write back to the ConfigMap)” is a transactional
> operation. Since we only store the job graph stateHandler(not the real
> data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> could we have more than 1000 Flink jobs in a Flink session cluster.
>
> 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> could provide at most one semantics if no manually force-deletion
> happened[1]. Based on the previous discussion, we have successfully avoided
> the "lock-and-release" in the implementation. So I still insist on using
> the current Deployment.
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
>
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
>
>> Thanks for the clarifications Yang Wang.
>>
>> 2. Keeping the HA information relevant for a component (Dispatcher,
>> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
>> check that we don't exceed the 1 MB size limit with this approach though.
>> The Dispatcher's ConfigMap would then contain the current leader, the
>> running jobs and the pointers to the persisted JobGraphs. The JobManager's
>> ConfigMap would then contain the current leader, the pointers to the
>> checkpoints and the checkpoint ID counter, for example.
>>
>> 3. Ah ok, I somehow thought that K8s would give us stronger
>> guarantees than Yarn in this regard. That's a pity.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
>>
>>> Thanks for your explanation. It would be fine if only checking
>>> leadership & actually write information is atomic.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>>
>>>> Thanks till and tison for your comments.
>>>>
>>>> @Till Rohrmann <tr...@apache.org>
>>>> 1. I am afraid we could not do this if we are going to use fabric8
>>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>>> client[1] also could not support it. Unless we implement a new
>>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>>> that we could gain too much from this.
>>>>
>>>> 2. Yes, the implementation will be a little complicated if we want to
>>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>>> your suggestion, another different solution has come into my mind. We could
>>>> use a same ConfigMap storing the JobManager leader, job graph,
>>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>>> the HA meta storage. Then it will be easier to guarantee that only the
>>>> leader could write the ConfigMap in a transactional operation. Since
>>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>>> transactional operation.
>>>>
>>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>>> However, we still have the chances that two JobManager are running and
>>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>>> benefit is we do not need to implement a leader election/retrieval service.
>>>>
>>>> @tison
>>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>>> old leader could wrongly override the leader information. Once a JobManager
>>>> want to write his leader information to the ConfigMap, it will check
>>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>>> written a different update while the client was in the process of
>>>> performing its update.
>>>>
>>>>
>>>> [1].
>>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>>> [2].
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>>> [3].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Generally +1 for a native k8s HA service.
>>>>>
>>>>> For leader election & publish leader information, there was a
>>>>> discussion[1]
>>>>> pointed out that since these two actions is NOT atomic, there will be
>>>>> always
>>>>> edge case where a previous leader overwrite leader information, even
>>>>> with
>>>>> versioned write. Versioned write helps on read again if version
>>>>> mismatches
>>>>> so if we want version write works, information in the kv pair should
>>>>> help the
>>>>> contender reflects whether it is the current leader.
>>>>>
>>>>> The idea of writes leader information on contender node or something
>>>>> equivalent makes sense but the details depends on how it is
>>>>> implemented.
>>>>> General problems are that
>>>>>
>>>>> 1. TM might be a bit late before it updated correct leader information
>>>>> but
>>>>> only if the leader election process is short and leadership is stable
>>>>> at most
>>>>> time, it won't be a serious issue.
>>>>> 2. The process TM extract leader information might be a bit more
>>>>> complex
>>>>> than directly watching a fixed key.
>>>>>
>>>>> Atomic issue can be addressed if one leverages low APIs such as lease
>>>>> & txn
>>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>>> interface,
>>>>> thought, provides only a self-consistent mechanism which doesn't
>>>>> promise
>>>>> more consistency for extension.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>>
>>>>>
>>>>>
>>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>>
>>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>>> information directly when trying to obtain the leadership (trying to
>>>>>> update
>>>>>> the leader key with one's own value)? This might be a little detail,
>>>>>> though.
>>>>>>
>>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>>> ZooKeeper
>>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>>> implementation a bit, unfortunately.
>>>>>>
>>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>>> the
>>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>>> benefit I
>>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>>> in
>>>>>> the ConfigMaps using the annotations because we can be sure that
>>>>>> there is
>>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>>> correctly.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Till, thanks for your valuable feedback.
>>>>>> >
>>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>>> same
>>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>>> annotation
>>>>>> > update operation to the ConfigMap, it means that it has been
>>>>>> elected as the
>>>>>> > leader. And it will write the leader information in the callback of
>>>>>> leader
>>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
>>>>>> the
>>>>>> > leader ConfigMap is wrongly updated.
>>>>>> >
>>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>>> current
>>>>>> > design, we could not guarantee that the node who tries to write his
>>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>>> > address this issue, we need to store all the owners of the key.
>>>>>> Only when
>>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>>> graph) could
>>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>>> when
>>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>>> lock. To
>>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>>> to the
>>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>>> >
>>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>>> in my
>>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>>> StatefulSet to
>>>>>> > avoid leader election and leader retrieval. But I am not sure
>>>>>> whether
>>>>>> > TaskManager could properly handle the situation that same hostname
>>>>>> with
>>>>>> > different IPs, because the JobManager failed and relaunched. Also
>>>>>> we may
>>>>>> > still have two JobManagers running in some corner cases(e.g.
>>>>>> kubelet is
>>>>>> > down but the pod is running). Another concern is we have a strong
>>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>>> But it
>>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>>> Moreover,
>>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>>> once. Since
>>>>>> > the native HA proposal could cover all the functionality of
>>>>>> StatefulSet
>>>>>> > proposal, that's why I prefer the former.
>>>>>> >
>>>>>> >
>>>>>> > [1].
>>>>>> >
>>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>>> >
>>>>>> > Best,
>>>>>> > Yang
>>>>>> >
>>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>>> >
>>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
>>>>>> our users
>>>>>> >> will like a ZooKeeper-less HA setup.
>>>>>> >>
>>>>>> >> +1 for not separating the leader information and the leader
>>>>>> election if
>>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>>> leader
>>>>>> >> information directly when trying to obtain the leadership by
>>>>>> performing a
>>>>>> >> versioned write operation.
>>>>>> >>
>>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>>> there be
>>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>>> how can
>>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>>> leader
>>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>>> problem
>>>>>> >> (we should probably change it at some point to simply use a
>>>>>> >> transaction which checks whether the writer is still the leader)
>>>>>> and
>>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>>> that
>>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>>> owner
>>>>>> >> will then be responsible for the cleanup of the node.
>>>>>> >>
>>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>>> because
>>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>>> key-value
>>>>>> >> pairs it might be simpler to start with this approach where we
>>>>>> only have
>>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>>> Was
>>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>>> >> generality)?
>>>>>> >>
>>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>>> you
>>>>>> >> already implemented a K8s based HA service.
>>>>>> >>
>>>>>> >> Cheers,
>>>>>> >> Till
>>>>>> >>
>>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>> Hi Xintong and Stephan,
>>>>>> >>>
>>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>>> >>> comments inline.
>>>>>> >>>
>>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>>> >>>
>>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>>> rest
>>>>>> >>> server component) for the leader election
>>>>>> >>> and storage. Once a JobManager win the election, it will update
>>>>>> the
>>>>>> >>> ConfigMap with leader address and periodically
>>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>>> update
>>>>>> >>> the FLIP document, including the architecture diagram,
>>>>>> >>> to avoid the misunderstanding.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > Lock and release
>>>>>> >>>
>>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>>> will be
>>>>>> >>> deleted by the ZK server automatically when
>>>>>> >>> the client is timeout. It could happen in a bad network
>>>>>> environment or
>>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>>> lock a
>>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>>> annotation
>>>>>> >>> will be cleaned up when releasing the lock. When
>>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
>>>>>> the
>>>>>> >>> following conditions. If not, the delete operation could not be
>>>>>> done.
>>>>>> >>> * Current instance is the owner of the key.
>>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>>> released the
>>>>>> >>> lock.
>>>>>> >>> * The owner annotation timed out, which usually indicate the
>>>>>> owner died.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > HA data clean up
>>>>>> >>>
>>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>>> pods
>>>>>> >>> to JobManager Deployment. So when we want to
>>>>>> >>> destroy a Flink cluster, we just need to delete the
>>>>>> deployment[2]. For
>>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>>> >>> so that they could be retained even though we delete the whole
>>>>>> Flink
>>>>>> >>> cluster.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> [1].
>>>>>> >>>
>>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>>> >>> [2].
>>>>>> >>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Best,
>>>>>> >>> Yang
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>>> >>>
>>>>>> >>>> This is a very cool feature proposal.
>>>>>> >>>>
>>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>>> overly
>>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>>> than the
>>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>>> converge and the
>>>>>> >>>> can be temporarily out of sync.
>>>>>> >>>>
>>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>>> in the
>>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>>> token is
>>>>>> >>>> stored as payload of the lock.
>>>>>> >>>> I think for the design above it would mean having a single
>>>>>> ConfigMap
>>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>>> >>>>
>>>>>> >>>> This probably serves as a good design principle in general - not
>>>>>> divide
>>>>>> >>>> information that is updated together over different resources.
>>>>>> >>>>
>>>>>> >>>> Best,
>>>>>> >>>> Stephan
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>>> tonysong820@gmail.com>
>>>>>> >>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>>> >>>>>
>>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>>> reduce the
>>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
>>>>>> think this is an
>>>>>> >>>>> attractive feature for users.
>>>>>> >>>>>
>>>>>> >>>>> Concerning the proposed design, I have some questions. Might
>>>>>> not be
>>>>>> >>>>> problems, just trying to understand.
>>>>>> >>>>>
>>>>>> >>>>> ## Architecture
>>>>>> >>>>>
>>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>>> contending
>>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>>> ConfigMaps are
>>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>>> becoming leader
>>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>>> leader's
>>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > Lock and release
>>>>>> >>>>>
>>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>>> lock so
>>>>>> >>>>> that other peers can write/remove the stored object. What if
>>>>>> the previous
>>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>>> Would there
>>>>>> >>>>> be any problem?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > HA data clean up
>>>>>> >>>>>
>>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>>> <ClusterID>`,
>>>>>> >>>>> how are the HA dada retained?
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> Thank you~
>>>>>> >>>>>
>>>>>> >>>>> Xintong Song
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>>> danrtsey.wy@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>>> Hi devs and users,
>>>>>> >>>>>>
>>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>>> will
>>>>>> >>>>>> introduce
>>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>>> >>>>>>
>>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>>> widely
>>>>>> >>>>>> used
>>>>>> >>>>>> in production environments. It could be integrated in
>>>>>> standalone
>>>>>> >>>>>> cluster,
>>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>>> in K8s
>>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>>> cluster.
>>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> >>>>>> election[2]
>>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
>>>>>> leverage these
>>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>>> more
>>>>>> >>>>>> convenient.
>>>>>> >>>>>>
>>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>>> the new
>>>>>> >>>>>> introduced KubernetesHaService.
>>>>>> >>>>>>
>>>>>> >>>>>> [1].
>>>>>> >>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> >>>>>> [2].
>>>>>> >>>>>>
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> >>>>>> [3].
>>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>> >>>>>>
>>>>>> >>>>>> Looking forward to your feedback.
>>>>>> >>>>>>
>>>>>> >>>>>> Best,
>>>>>> >>>>>> Yang
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>>
>>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
3. We could avoid force deletions from within Flink. If the user does it,
then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <da...@gmail.com> wrote:

> 2. Yes. This is exactly what I mean. Storing the HA information relevant
> to a specific component in a single ConfigMap and ensuring that “Get(check
> the leader)-and-Update(write back to the ConfigMap)” is a transactional
> operation. Since we only store the job graph stateHandler(not the real
> data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> could we have more than 1000 Flink jobs in a Flink session cluster.
>
> 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> could provide at most one semantics if no manually force-deletion
> happened[1]. Based on the previous discussion, we have successfully avoided
> the "lock-and-release" in the implementation. So I still insist on using
> the current Deployment.
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
>
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:
>
>> Thanks for the clarifications Yang Wang.
>>
>> 2. Keeping the HA information relevant for a component (Dispatcher,
>> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
>> check that we don't exceed the 1 MB size limit with this approach though.
>> The Dispatcher's ConfigMap would then contain the current leader, the
>> running jobs and the pointers to the persisted JobGraphs. The JobManager's
>> ConfigMap would then contain the current leader, the pointers to the
>> checkpoints and the checkpoint ID counter, for example.
>>
>> 3. Ah ok, I somehow thought that K8s would give us stronger
>> guarantees than Yarn in this regard. That's a pity.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
>>
>>> Thanks for your explanation. It would be fine if only checking
>>> leadership & actually write information is atomic.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>>
>>>> Thanks till and tison for your comments.
>>>>
>>>> @Till Rohrmann <tr...@apache.org>
>>>> 1. I am afraid we could not do this if we are going to use fabric8
>>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>>> client[1] also could not support it. Unless we implement a new
>>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>>> that we could gain too much from this.
>>>>
>>>> 2. Yes, the implementation will be a little complicated if we want to
>>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>>> your suggestion, another different solution has come into my mind. We could
>>>> use a same ConfigMap storing the JobManager leader, job graph,
>>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>>> the HA meta storage. Then it will be easier to guarantee that only the
>>>> leader could write the ConfigMap in a transactional operation. Since
>>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>>> transactional operation.
>>>>
>>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>>> However, we still have the chances that two JobManager are running and
>>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>>> benefit is we do not need to implement a leader election/retrieval service.
>>>>
>>>> @tison
>>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>>> old leader could wrongly override the leader information. Once a JobManager
>>>> want to write his leader information to the ConfigMap, it will check
>>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>>> written a different update while the client was in the process of
>>>> performing its update.
>>>>
>>>>
>>>> [1].
>>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>>> [2].
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>>> [3].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Generally +1 for a native k8s HA service.
>>>>>
>>>>> For leader election & publish leader information, there was a
>>>>> discussion[1]
>>>>> pointed out that since these two actions is NOT atomic, there will be
>>>>> always
>>>>> edge case where a previous leader overwrite leader information, even
>>>>> with
>>>>> versioned write. Versioned write helps on read again if version
>>>>> mismatches
>>>>> so if we want version write works, information in the kv pair should
>>>>> help the
>>>>> contender reflects whether it is the current leader.
>>>>>
>>>>> The idea of writes leader information on contender node or something
>>>>> equivalent makes sense but the details depends on how it is
>>>>> implemented.
>>>>> General problems are that
>>>>>
>>>>> 1. TM might be a bit late before it updated correct leader information
>>>>> but
>>>>> only if the leader election process is short and leadership is stable
>>>>> at most
>>>>> time, it won't be a serious issue.
>>>>> 2. The process TM extract leader information might be a bit more
>>>>> complex
>>>>> than directly watching a fixed key.
>>>>>
>>>>> Atomic issue can be addressed if one leverages low APIs such as lease
>>>>> & txn
>>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>>> interface,
>>>>> thought, provides only a self-consistent mechanism which doesn't
>>>>> promise
>>>>> more consistency for extension.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>>
>>>>>
>>>>>
>>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>>
>>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>>> information directly when trying to obtain the leadership (trying to
>>>>>> update
>>>>>> the leader key with one's own value)? This might be a little detail,
>>>>>> though.
>>>>>>
>>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>>> ZooKeeper
>>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>>> implementation a bit, unfortunately.
>>>>>>
>>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>>> the
>>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>>> benefit I
>>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>>> in
>>>>>> the ConfigMaps using the annotations because we can be sure that
>>>>>> there is
>>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>>> correctly.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Till, thanks for your valuable feedback.
>>>>>> >
>>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>>> same
>>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>>> annotation
>>>>>> > update operation to the ConfigMap, it means that it has been
>>>>>> elected as the
>>>>>> > leader. And it will write the leader information in the callback of
>>>>>> leader
>>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
>>>>>> the
>>>>>> > leader ConfigMap is wrongly updated.
>>>>>> >
>>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>>> current
>>>>>> > design, we could not guarantee that the node who tries to write his
>>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>>> > address this issue, we need to store all the owners of the key.
>>>>>> Only when
>>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>>> graph) could
>>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>>> when
>>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>>> lock. To
>>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>>> to the
>>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>>> >
>>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>>> in my
>>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>>> StatefulSet to
>>>>>> > avoid leader election and leader retrieval. But I am not sure
>>>>>> whether
>>>>>> > TaskManager could properly handle the situation that same hostname
>>>>>> with
>>>>>> > different IPs, because the JobManager failed and relaunched. Also
>>>>>> we may
>>>>>> > still have two JobManagers running in some corner cases(e.g.
>>>>>> kubelet is
>>>>>> > down but the pod is running). Another concern is we have a strong
>>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>>> But it
>>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>>> Moreover,
>>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>>> once. Since
>>>>>> > the native HA proposal could cover all the functionality of
>>>>>> StatefulSet
>>>>>> > proposal, that's why I prefer the former.
>>>>>> >
>>>>>> >
>>>>>> > [1].
>>>>>> >
>>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>>> >
>>>>>> > Best,
>>>>>> > Yang
>>>>>> >
>>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>>> >
>>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
>>>>>> our users
>>>>>> >> will like a ZooKeeper-less HA setup.
>>>>>> >>
>>>>>> >> +1 for not separating the leader information and the leader
>>>>>> election if
>>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>>> leader
>>>>>> >> information directly when trying to obtain the leadership by
>>>>>> performing a
>>>>>> >> versioned write operation.
>>>>>> >>
>>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>>> there be
>>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>>> how can
>>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>>> leader
>>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>>> problem
>>>>>> >> (we should probably change it at some point to simply use a
>>>>>> >> transaction which checks whether the writer is still the leader)
>>>>>> and
>>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>>> that
>>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>>> owner
>>>>>> >> will then be responsible for the cleanup of the node.
>>>>>> >>
>>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>>> because
>>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>>> key-value
>>>>>> >> pairs it might be simpler to start with this approach where we
>>>>>> only have
>>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>>> Was
>>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>>> >> generality)?
>>>>>> >>
>>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>>> you
>>>>>> >> already implemented a K8s based HA service.
>>>>>> >>
>>>>>> >> Cheers,
>>>>>> >> Till
>>>>>> >>
>>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>> Hi Xintong and Stephan,
>>>>>> >>>
>>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>>> >>> comments inline.
>>>>>> >>>
>>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>>> >>>
>>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>>> rest
>>>>>> >>> server component) for the leader election
>>>>>> >>> and storage. Once a JobManager win the election, it will update
>>>>>> the
>>>>>> >>> ConfigMap with leader address and periodically
>>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>>> update
>>>>>> >>> the FLIP document, including the architecture diagram,
>>>>>> >>> to avoid the misunderstanding.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > Lock and release
>>>>>> >>>
>>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>>> will be
>>>>>> >>> deleted by the ZK server automatically when
>>>>>> >>> the client is timeout. It could happen in a bad network
>>>>>> environment or
>>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>>> lock a
>>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>>> annotation
>>>>>> >>> will be cleaned up when releasing the lock. When
>>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
>>>>>> the
>>>>>> >>> following conditions. If not, the delete operation could not be
>>>>>> done.
>>>>>> >>> * Current instance is the owner of the key.
>>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>>> released the
>>>>>> >>> lock.
>>>>>> >>> * The owner annotation timed out, which usually indicate the
>>>>>> owner died.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > HA data clean up
>>>>>> >>>
>>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>>> pods
>>>>>> >>> to JobManager Deployment. So when we want to
>>>>>> >>> destroy a Flink cluster, we just need to delete the
>>>>>> deployment[2]. For
>>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>>> >>> so that they could be retained even though we delete the whole
>>>>>> Flink
>>>>>> >>> cluster.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> [1].
>>>>>> >>>
>>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>>> >>> [2].
>>>>>> >>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Best,
>>>>>> >>> Yang
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>>> >>>
>>>>>> >>>> This is a very cool feature proposal.
>>>>>> >>>>
>>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>>> overly
>>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>>> than the
>>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>>> converge and the
>>>>>> >>>> can be temporarily out of sync.
>>>>>> >>>>
>>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>>> in the
>>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>>> token is
>>>>>> >>>> stored as payload of the lock.
>>>>>> >>>> I think for the design above it would mean having a single
>>>>>> ConfigMap
>>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>>> >>>>
>>>>>> >>>> This probably serves as a good design principle in general - not
>>>>>> divide
>>>>>> >>>> information that is updated together over different resources.
>>>>>> >>>>
>>>>>> >>>> Best,
>>>>>> >>>> Stephan
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>>> tonysong820@gmail.com>
>>>>>> >>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>>> >>>>>
>>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>>> reduce the
>>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
>>>>>> think this is an
>>>>>> >>>>> attractive feature for users.
>>>>>> >>>>>
>>>>>> >>>>> Concerning the proposed design, I have some questions. Might
>>>>>> not be
>>>>>> >>>>> problems, just trying to understand.
>>>>>> >>>>>
>>>>>> >>>>> ## Architecture
>>>>>> >>>>>
>>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>>> contending
>>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>>> ConfigMaps are
>>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>>> becoming leader
>>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>>> leader's
>>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > Lock and release
>>>>>> >>>>>
>>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>>> lock so
>>>>>> >>>>> that other peers can write/remove the stored object. What if
>>>>>> the previous
>>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>>> Would there
>>>>>> >>>>> be any problem?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > HA data clean up
>>>>>> >>>>>
>>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>>> <ClusterID>`,
>>>>>> >>>>> how are the HA dada retained?
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> Thank you~
>>>>>> >>>>>
>>>>>> >>>>> Xintong Song
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>>> danrtsey.wy@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>>> Hi devs and users,
>>>>>> >>>>>>
>>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>>> will
>>>>>> >>>>>> introduce
>>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>>> >>>>>>
>>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>>> widely
>>>>>> >>>>>> used
>>>>>> >>>>>> in production environments. It could be integrated in
>>>>>> standalone
>>>>>> >>>>>> cluster,
>>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>>> in K8s
>>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>>> cluster.
>>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> >>>>>> election[2]
>>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
>>>>>> leverage these
>>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>>> more
>>>>>> >>>>>> convenient.
>>>>>> >>>>>>
>>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>>> the new
>>>>>> >>>>>> introduced KubernetesHaService.
>>>>>> >>>>>>
>>>>>> >>>>>> [1].
>>>>>> >>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> >>>>>> [2].
>>>>>> >>>>>>
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> >>>>>> [3].
>>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>> >>>>>>
>>>>>> >>>>>> Looking forward to your feedback.
>>>>>> >>>>>>
>>>>>> >>>>>> Best,
>>>>>> >>>>>> Yang
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>>
>>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
2. Yes. This is exactly what I mean. Storing the HA information relevant to
a specific component in a single ConfigMap and ensuring that “Get(check the
leader)-and-Update(write back to the ConfigMap)” is a transactional
operation. Since we only store the job graph stateHandler(not the real
data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
ConfigMap(the biggest one with multiple jobs). I roughly calculate that
could we have more than 1000 Flink jobs in a Flink session cluster.

3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
could provide at most one semantics if no manually force-deletion
happened[1]. Based on the previous discussion, we have successfully avoided
the "lock-and-release" in the implementation. So I still insist on using
the current Deployment.


[1].
https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:

> Thanks for the clarifications Yang Wang.
>
> 2. Keeping the HA information relevant for a component (Dispatcher,
> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
> check that we don't exceed the 1 MB size limit with this approach though.
> The Dispatcher's ConfigMap would then contain the current leader, the
> running jobs and the pointers to the persisted JobGraphs. The JobManager's
> ConfigMap would then contain the current leader, the pointers to the
> checkpoints and the checkpoint ID counter, for example.
>
> 3. Ah ok, I somehow thought that K8s would give us stronger
> guarantees than Yarn in this regard. That's a pity.
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
>
>> Thanks for your explanation. It would be fine if only checking leadership
>> & actually write information is atomic.
>>
>> Best,
>> tison.
>>
>>
>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>
>>> Thanks till and tison for your comments.
>>>
>>> @Till Rohrmann <tr...@apache.org>
>>> 1. I am afraid we could not do this if we are going to use fabric8
>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>> client[1] also could not support it. Unless we implement a new
>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>> that we could gain too much from this.
>>>
>>> 2. Yes, the implementation will be a little complicated if we want to
>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>> your suggestion, another different solution has come into my mind. We could
>>> use a same ConfigMap storing the JobManager leader, job graph,
>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>> the HA meta storage. Then it will be easier to guarantee that only the
>>> leader could write the ConfigMap in a transactional operation. Since
>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>> transactional operation.
>>>
>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>> However, we still have the chances that two JobManager are running and
>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>> benefit is we do not need to implement a leader election/retrieval service.
>>>
>>> @tison
>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>> old leader could wrongly override the leader information. Once a JobManager
>>> want to write his leader information to the ConfigMap, it will check
>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>> written a different update while the client was in the process of
>>> performing its update.
>>>
>>>
>>> [1].
>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>> [2].
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>> [3].
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>
>>>> Hi,
>>>>
>>>> Generally +1 for a native k8s HA service.
>>>>
>>>> For leader election & publish leader information, there was a
>>>> discussion[1]
>>>> pointed out that since these two actions is NOT atomic, there will be
>>>> always
>>>> edge case where a previous leader overwrite leader information, even
>>>> with
>>>> versioned write. Versioned write helps on read again if version
>>>> mismatches
>>>> so if we want version write works, information in the kv pair should
>>>> help the
>>>> contender reflects whether it is the current leader.
>>>>
>>>> The idea of writes leader information on contender node or something
>>>> equivalent makes sense but the details depends on how it is implemented.
>>>> General problems are that
>>>>
>>>> 1. TM might be a bit late before it updated correct leader information
>>>> but
>>>> only if the leader election process is short and leadership is stable
>>>> at most
>>>> time, it won't be a serious issue.
>>>> 2. The process TM extract leader information might be a bit more complex
>>>> than directly watching a fixed key.
>>>>
>>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>>> txn
>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>> interface,
>>>> thought, provides only a self-consistent mechanism which doesn't promise
>>>> more consistency for extension.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1]
>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>
>>>>
>>>>
>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>
>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>> information directly when trying to obtain the leadership (trying to
>>>>> update
>>>>> the leader key with one's own value)? This might be a little detail,
>>>>> though.
>>>>>
>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>> ZooKeeper
>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>> implementation a bit, unfortunately.
>>>>>
>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>> the
>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>> benefit I
>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>> in
>>>>> the ConfigMaps using the annotations because we can be sure that there
>>>>> is
>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>> correctly.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi Till, thanks for your valuable feedback.
>>>>> >
>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>> same
>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>> annotation
>>>>> > update operation to the ConfigMap, it means that it has been elected
>>>>> as the
>>>>> > leader. And it will write the leader information in the callback of
>>>>> leader
>>>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>>>> > leader ConfigMap is wrongly updated.
>>>>> >
>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>> current
>>>>> > design, we could not guarantee that the node who tries to write his
>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>> > address this issue, we need to store all the owners of the key. Only
>>>>> when
>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>> graph) could
>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>> when
>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>> lock. To
>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>> to the
>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>> >
>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>> in my
>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>> StatefulSet to
>>>>> > avoid leader election and leader retrieval. But I am not sure whether
>>>>> > TaskManager could properly handle the situation that same hostname
>>>>> with
>>>>> > different IPs, because the JobManager failed and relaunched. Also we
>>>>> may
>>>>> > still have two JobManagers running in some corner cases(e.g. kubelet
>>>>> is
>>>>> > down but the pod is running). Another concern is we have a strong
>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>> But it
>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>> Moreover,
>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>> once. Since
>>>>> > the native HA proposal could cover all the functionality of
>>>>> StatefulSet
>>>>> > proposal, that's why I prefer the former.
>>>>> >
>>>>> >
>>>>> > [1].
>>>>> >
>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>> >
>>>>> > Best,
>>>>> > Yang
>>>>> >
>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>> >
>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>>>> users
>>>>> >> will like a ZooKeeper-less HA setup.
>>>>> >>
>>>>> >> +1 for not separating the leader information and the leader
>>>>> election if
>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>> leader
>>>>> >> information directly when trying to obtain the leadership by
>>>>> performing a
>>>>> >> versioned write operation.
>>>>> >>
>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>> there be
>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>> how can
>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>> leader
>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>> problem
>>>>> >> (we should probably change it at some point to simply use a
>>>>> >> transaction which checks whether the writer is still the leader) and
>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>> that
>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>> owner
>>>>> >> will then be responsible for the cleanup of the node.
>>>>> >>
>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>> because
>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>> key-value
>>>>> >> pairs it might be simpler to start with this approach where we only
>>>>> have
>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>> Was
>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>> >> generality)?
>>>>> >>
>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>> you
>>>>> >> already implemented a K8s based HA service.
>>>>> >>
>>>>> >> Cheers,
>>>>> >> Till
>>>>> >>
>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >>> Hi Xintong and Stephan,
>>>>> >>>
>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>> >>> comments inline.
>>>>> >>>
>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>> >>>
>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>> rest
>>>>> >>> server component) for the leader election
>>>>> >>> and storage. Once a JobManager win the election, it will update the
>>>>> >>> ConfigMap with leader address and periodically
>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>> update
>>>>> >>> the FLIP document, including the architecture diagram,
>>>>> >>> to avoid the misunderstanding.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > Lock and release
>>>>> >>>
>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>> will be
>>>>> >>> deleted by the ZK server automatically when
>>>>> >>> the client is timeout. It could happen in a bad network
>>>>> environment or
>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>> lock a
>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>> annotation
>>>>> >>> will be cleaned up when releasing the lock. When
>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>>>> >>> following conditions. If not, the delete operation could not be
>>>>> done.
>>>>> >>> * Current instance is the owner of the key.
>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>> released the
>>>>> >>> lock.
>>>>> >>> * The owner annotation timed out, which usually indicate the owner
>>>>> died.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > HA data clean up
>>>>> >>>
>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>> pods
>>>>> >>> to JobManager Deployment. So when we want to
>>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>>>> For
>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>> >>> so that they could be retained even though we delete the whole
>>>>> Flink
>>>>> >>> cluster.
>>>>> >>>
>>>>> >>>
>>>>> >>> [1].
>>>>> >>>
>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>> >>> [2].
>>>>> >>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>> >>>
>>>>> >>>
>>>>> >>> Best,
>>>>> >>> Yang
>>>>> >>>
>>>>> >>>
>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>> >>>
>>>>> >>>> This is a very cool feature proposal.
>>>>> >>>>
>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>> overly
>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>> than the
>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>> converge and the
>>>>> >>>> can be temporarily out of sync.
>>>>> >>>>
>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>> in the
>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>> token is
>>>>> >>>> stored as payload of the lock.
>>>>> >>>> I think for the design above it would mean having a single
>>>>> ConfigMap
>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>> >>>>
>>>>> >>>> This probably serves as a good design principle in general - not
>>>>> divide
>>>>> >>>> information that is updated together over different resources.
>>>>> >>>>
>>>>> >>>> Best,
>>>>> >>>> Stephan
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>> tonysong820@gmail.com>
>>>>> >>>> wrote:
>>>>> >>>>
>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>> >>>>>
>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>> reduce the
>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>>>> this is an
>>>>> >>>>> attractive feature for users.
>>>>> >>>>>
>>>>> >>>>> Concerning the proposed design, I have some questions. Might not
>>>>> be
>>>>> >>>>> problems, just trying to understand.
>>>>> >>>>>
>>>>> >>>>> ## Architecture
>>>>> >>>>>
>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>> contending
>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>> ConfigMaps are
>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>> becoming leader
>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>> leader's
>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > Lock and release
>>>>> >>>>>
>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>> lock so
>>>>> >>>>> that other peers can write/remove the stored object. What if the
>>>>> previous
>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>> Would there
>>>>> >>>>> be any problem?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > HA data clean up
>>>>> >>>>>
>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>> <ClusterID>`,
>>>>> >>>>> how are the HA dada retained?
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> Thank you~
>>>>> >>>>>
>>>>> >>>>> Xintong Song
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>> danrtsey.wy@gmail.com>
>>>>> >>>>> wrote:
>>>>> >>>>>
>>>>> >>>>>> Hi devs and users,
>>>>> >>>>>>
>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>> will
>>>>> >>>>>> introduce
>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>> >>>>>>
>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>> widely
>>>>> >>>>>> used
>>>>> >>>>>> in production environments. It could be integrated in standalone
>>>>> >>>>>> cluster,
>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>> in K8s
>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>> cluster.
>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>> >>>>>> election[2]
>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>>>> these
>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>> more
>>>>> >>>>>> convenient.
>>>>> >>>>>>
>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>> the new
>>>>> >>>>>> introduced KubernetesHaService.
>>>>> >>>>>>
>>>>> >>>>>> [1].
>>>>> >>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>> >>>>>> [2].
>>>>> >>>>>>
>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>> >>>>>> [3].
>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>> >>>>>>
>>>>> >>>>>> Looking forward to your feedback.
>>>>> >>>>>>
>>>>> >>>>>> Best,
>>>>> >>>>>> Yang
>>>>> >>>>>>
>>>>> >>>>>
>>>>>
>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
2. Yes. This is exactly what I mean. Storing the HA information relevant to
a specific component in a single ConfigMap and ensuring that “Get(check the
leader)-and-Update(write back to the ConfigMap)” is a transactional
operation. Since we only store the job graph stateHandler(not the real
data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
ConfigMap(the biggest one with multiple jobs). I roughly calculate that
could we have more than 1000 Flink jobs in a Flink session cluster.

3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
could provide at most one semantics if no manually force-deletion
happened[1]. Based on the previous discussion, we have successfully avoided
the "lock-and-release" in the implementation. So I still insist on using
the current Deployment.


[1].
https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年9月30日周三 下午11:57写道:

> Thanks for the clarifications Yang Wang.
>
> 2. Keeping the HA information relevant for a component (Dispatcher,
> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
> check that we don't exceed the 1 MB size limit with this approach though.
> The Dispatcher's ConfigMap would then contain the current leader, the
> running jobs and the pointers to the persisted JobGraphs. The JobManager's
> ConfigMap would then contain the current leader, the pointers to the
> checkpoints and the checkpoint ID counter, for example.
>
> 3. Ah ok, I somehow thought that K8s would give us stronger
> guarantees than Yarn in this regard. That's a pity.
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:
>
>> Thanks for your explanation. It would be fine if only checking leadership
>> & actually write information is atomic.
>>
>> Best,
>> tison.
>>
>>
>> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>
>>> Thanks till and tison for your comments.
>>>
>>> @Till Rohrmann <tr...@apache.org>
>>> 1. I am afraid we could not do this if we are going to use fabric8
>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>> client[1] also could not support it. Unless we implement a new
>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>> that we could gain too much from this.
>>>
>>> 2. Yes, the implementation will be a little complicated if we want to
>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>> your suggestion, another different solution has come into my mind. We could
>>> use a same ConfigMap storing the JobManager leader, job graph,
>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>> the HA meta storage. Then it will be easier to guarantee that only the
>>> leader could write the ConfigMap in a transactional operation. Since
>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>> transactional operation.
>>>
>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>> However, we still have the chances that two JobManager are running and
>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>> benefit is we do not need to implement a leader election/retrieval service.
>>>
>>> @tison
>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>> old leader could wrongly override the leader information. Once a JobManager
>>> want to write his leader information to the ConfigMap, it will check
>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>> written a different update while the client was in the process of
>>> performing its update.
>>>
>>>
>>> [1].
>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>> [2].
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>> [3].
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>
>>>> Hi,
>>>>
>>>> Generally +1 for a native k8s HA service.
>>>>
>>>> For leader election & publish leader information, there was a
>>>> discussion[1]
>>>> pointed out that since these two actions is NOT atomic, there will be
>>>> always
>>>> edge case where a previous leader overwrite leader information, even
>>>> with
>>>> versioned write. Versioned write helps on read again if version
>>>> mismatches
>>>> so if we want version write works, information in the kv pair should
>>>> help the
>>>> contender reflects whether it is the current leader.
>>>>
>>>> The idea of writes leader information on contender node or something
>>>> equivalent makes sense but the details depends on how it is implemented.
>>>> General problems are that
>>>>
>>>> 1. TM might be a bit late before it updated correct leader information
>>>> but
>>>> only if the leader election process is short and leadership is stable
>>>> at most
>>>> time, it won't be a serious issue.
>>>> 2. The process TM extract leader information might be a bit more complex
>>>> than directly watching a fixed key.
>>>>
>>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>>> txn
>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>> interface,
>>>> thought, provides only a self-consistent mechanism which doesn't promise
>>>> more consistency for extension.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1]
>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>
>>>>
>>>>
>>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>
>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>> information directly when trying to obtain the leadership (trying to
>>>>> update
>>>>> the leader key with one's own value)? This might be a little detail,
>>>>> though.
>>>>>
>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>> ZooKeeper
>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>> implementation a bit, unfortunately.
>>>>>
>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>> the
>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>> benefit I
>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>> in
>>>>> the ConfigMaps using the annotations because we can be sure that there
>>>>> is
>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>> correctly.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi Till, thanks for your valuable feedback.
>>>>> >
>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>> same
>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>> annotation
>>>>> > update operation to the ConfigMap, it means that it has been elected
>>>>> as the
>>>>> > leader. And it will write the leader information in the callback of
>>>>> leader
>>>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>>>> > leader ConfigMap is wrongly updated.
>>>>> >
>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>> current
>>>>> > design, we could not guarantee that the node who tries to write his
>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>> > address this issue, we need to store all the owners of the key. Only
>>>>> when
>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>> graph) could
>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>> when
>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>> lock. To
>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>> to the
>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>> >
>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>> in my
>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>> StatefulSet to
>>>>> > avoid leader election and leader retrieval. But I am not sure whether
>>>>> > TaskManager could properly handle the situation that same hostname
>>>>> with
>>>>> > different IPs, because the JobManager failed and relaunched. Also we
>>>>> may
>>>>> > still have two JobManagers running in some corner cases(e.g. kubelet
>>>>> is
>>>>> > down but the pod is running). Another concern is we have a strong
>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>> But it
>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>> Moreover,
>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>> once. Since
>>>>> > the native HA proposal could cover all the functionality of
>>>>> StatefulSet
>>>>> > proposal, that's why I prefer the former.
>>>>> >
>>>>> >
>>>>> > [1].
>>>>> >
>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>> >
>>>>> > Best,
>>>>> > Yang
>>>>> >
>>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>> >
>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>>>> users
>>>>> >> will like a ZooKeeper-less HA setup.
>>>>> >>
>>>>> >> +1 for not separating the leader information and the leader
>>>>> election if
>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>> leader
>>>>> >> information directly when trying to obtain the leadership by
>>>>> performing a
>>>>> >> versioned write operation.
>>>>> >>
>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>> there be
>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>> how can
>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>> leader
>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>> problem
>>>>> >> (we should probably change it at some point to simply use a
>>>>> >> transaction which checks whether the writer is still the leader) and
>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>> that
>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>> owner
>>>>> >> will then be responsible for the cleanup of the node.
>>>>> >>
>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>> because
>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>> key-value
>>>>> >> pairs it might be simpler to start with this approach where we only
>>>>> have
>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>> Was
>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>> >> generality)?
>>>>> >>
>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>> you
>>>>> >> already implemented a K8s based HA service.
>>>>> >>
>>>>> >> Cheers,
>>>>> >> Till
>>>>> >>
>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >>> Hi Xintong and Stephan,
>>>>> >>>
>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>> >>> comments inline.
>>>>> >>>
>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>> >>>
>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>> rest
>>>>> >>> server component) for the leader election
>>>>> >>> and storage. Once a JobManager win the election, it will update the
>>>>> >>> ConfigMap with leader address and periodically
>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>> update
>>>>> >>> the FLIP document, including the architecture diagram,
>>>>> >>> to avoid the misunderstanding.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > Lock and release
>>>>> >>>
>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>> will be
>>>>> >>> deleted by the ZK server automatically when
>>>>> >>> the client is timeout. It could happen in a bad network
>>>>> environment or
>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>> lock a
>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>> annotation
>>>>> >>> will be cleaned up when releasing the lock. When
>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>>>> >>> following conditions. If not, the delete operation could not be
>>>>> done.
>>>>> >>> * Current instance is the owner of the key.
>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>> released the
>>>>> >>> lock.
>>>>> >>> * The owner annotation timed out, which usually indicate the owner
>>>>> died.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > HA data clean up
>>>>> >>>
>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>> pods
>>>>> >>> to JobManager Deployment. So when we want to
>>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>>>> For
>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>> >>> so that they could be retained even though we delete the whole
>>>>> Flink
>>>>> >>> cluster.
>>>>> >>>
>>>>> >>>
>>>>> >>> [1].
>>>>> >>>
>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>> >>> [2].
>>>>> >>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>> >>>
>>>>> >>>
>>>>> >>> Best,
>>>>> >>> Yang
>>>>> >>>
>>>>> >>>
>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>> >>>
>>>>> >>>> This is a very cool feature proposal.
>>>>> >>>>
>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>> overly
>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>> than the
>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>> converge and the
>>>>> >>>> can be temporarily out of sync.
>>>>> >>>>
>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>> in the
>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>> token is
>>>>> >>>> stored as payload of the lock.
>>>>> >>>> I think for the design above it would mean having a single
>>>>> ConfigMap
>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>> >>>>
>>>>> >>>> This probably serves as a good design principle in general - not
>>>>> divide
>>>>> >>>> information that is updated together over different resources.
>>>>> >>>>
>>>>> >>>> Best,
>>>>> >>>> Stephan
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>> tonysong820@gmail.com>
>>>>> >>>> wrote:
>>>>> >>>>
>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>> >>>>>
>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>> reduce the
>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>>>> this is an
>>>>> >>>>> attractive feature for users.
>>>>> >>>>>
>>>>> >>>>> Concerning the proposed design, I have some questions. Might not
>>>>> be
>>>>> >>>>> problems, just trying to understand.
>>>>> >>>>>
>>>>> >>>>> ## Architecture
>>>>> >>>>>
>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>> contending
>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>> ConfigMaps are
>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>> becoming leader
>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>> leader's
>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > Lock and release
>>>>> >>>>>
>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>> lock so
>>>>> >>>>> that other peers can write/remove the stored object. What if the
>>>>> previous
>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>> Would there
>>>>> >>>>> be any problem?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > HA data clean up
>>>>> >>>>>
>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>> <ClusterID>`,
>>>>> >>>>> how are the HA dada retained?
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> Thank you~
>>>>> >>>>>
>>>>> >>>>> Xintong Song
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>> danrtsey.wy@gmail.com>
>>>>> >>>>> wrote:
>>>>> >>>>>
>>>>> >>>>>> Hi devs and users,
>>>>> >>>>>>
>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>> will
>>>>> >>>>>> introduce
>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>> >>>>>>
>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>> widely
>>>>> >>>>>> used
>>>>> >>>>>> in production environments. It could be integrated in standalone
>>>>> >>>>>> cluster,
>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>> in K8s
>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>> cluster.
>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>> >>>>>> election[2]
>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>>>> these
>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>> more
>>>>> >>>>>> convenient.
>>>>> >>>>>>
>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>> the new
>>>>> >>>>>> introduced KubernetesHaService.
>>>>> >>>>>>
>>>>> >>>>>> [1].
>>>>> >>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>> >>>>>> [2].
>>>>> >>>>>>
>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>> >>>>>> [3].
>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>> >>>>>>
>>>>> >>>>>> Looking forward to your feedback.
>>>>> >>>>>>
>>>>> >>>>>> Best,
>>>>> >>>>>> Yang
>>>>> >>>>>>
>>>>> >>>>>
>>>>>
>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the clarifications Yang Wang.

2. Keeping the HA information relevant for a component (Dispatcher,
JobManager, ResourceManager) in a single ConfigMap sounds good. We should
check that we don't exceed the 1 MB size limit with this approach though.
The Dispatcher's ConfigMap would then contain the current leader, the
running jobs and the pointers to the persisted JobGraphs. The JobManager's
ConfigMap would then contain the current leader, the pointers to the
checkpoints and the checkpoint ID counter, for example.

3. Ah ok, I somehow thought that K8s would give us stronger guarantees than
Yarn in this regard. That's a pity.

Cheers,
Till

On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:

> Thanks for your explanation. It would be fine if only checking leadership
> & actually write information is atomic.
>
> Best,
> tison.
>
>
> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>
>> Thanks till and tison for your comments.
>>
>> @Till Rohrmann <tr...@apache.org>
>> 1. I am afraid we could not do this if we are going to use fabric8
>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>> client[1] also could not support it. Unless we implement a new
>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>> that we could gain too much from this.
>>
>> 2. Yes, the implementation will be a little complicated if we want to
>> completely eliminate the residual job graphs or checkpoints. Inspired by
>> your suggestion, another different solution has come into my mind. We could
>> use a same ConfigMap storing the JobManager leader, job graph,
>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>> the HA meta storage. Then it will be easier to guarantee that only the
>> leader could write the ConfigMap in a transactional operation. Since
>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>> transactional operation.
>>
>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
>> we still have the chances that two JobManager are running and trying to
>> get/delete a key in the same ConfigMap concurrently. Imagine that the
>> kubelet(like NodeManager in YARN) is down, and then the JobManager could
>> not be deleted. A new JobManager pod will be launched. We are just in the
>> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
>> is we do not need to implement a leader election/retrieval service.
>>
>> @tison
>> Actually, I do not think we will have such issue in the Kubernetes HA
>> service. In the Kubernetes LeaderElector[2], we have the leader information
>> stored on the annotation of leader ConfigMap. So it would not happen the
>> old leader could wrongly override the leader information. Once a JobManager
>> want to write his leader information to the ConfigMap, it will check
>> whether it is the leader now. If not, anything will happen. Moreover, the
>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>> written a different update while the client was in the process of
>> performing its update.
>>
>>
>> [1].
>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>> [2].
>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>> [3].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>
>>
>> Best,
>> Yang
>>
>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>
>>> Hi,
>>>
>>> Generally +1 for a native k8s HA service.
>>>
>>> For leader election & publish leader information, there was a
>>> discussion[1]
>>> pointed out that since these two actions is NOT atomic, there will be
>>> always
>>> edge case where a previous leader overwrite leader information, even with
>>> versioned write. Versioned write helps on read again if version
>>> mismatches
>>> so if we want version write works, information in the kv pair should
>>> help the
>>> contender reflects whether it is the current leader.
>>>
>>> The idea of writes leader information on contender node or something
>>> equivalent makes sense but the details depends on how it is implemented.
>>> General problems are that
>>>
>>> 1. TM might be a bit late before it updated correct leader information
>>> but
>>> only if the leader election process is short and leadership is stable at
>>> most
>>> time, it won't be a serious issue.
>>> 2. The process TM extract leader information might be a bit more complex
>>> than directly watching a fixed key.
>>>
>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>> txn
>>> but it causes more developing efforts. ConfigMap and encapsulated
>>> interface,
>>> thought, provides only a self-consistent mechanism which doesn't promise
>>> more consistency for extension.
>>>
>>> Best,
>>> tison.
>>>
>>> [1]
>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>
>>>
>>>
>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>
>>>> For 1. I was wondering whether we can't write the leader connection
>>>> information directly when trying to obtain the leadership (trying to
>>>> update
>>>> the leader key with one's own value)? This might be a little detail,
>>>> though.
>>>>
>>>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>> implementation a bit, unfortunately.
>>>>
>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>> configure a different persistent storage like HDFS or S3 for storing the
>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>> benefit I
>>>> see is that we avoid having to implement this multi locking mechanism in
>>>> the ConfigMaps using the annotations because we can be sure that there
>>>> is
>>>> only a single leader at a time if I understood the guarantees of K8s
>>>> correctly.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi Till, thanks for your valuable feedback.
>>>> >
>>>> > 1. Yes, leader election and storing leader information will use a same
>>>> > ConfigMap. When a contender successfully performs a versioned
>>>> annotation
>>>> > update operation to the ConfigMap, it means that it has been elected
>>>> as the
>>>> > leader. And it will write the leader information in the callback of
>>>> leader
>>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>>> > leader ConfigMap is wrongly updated.
>>>> >
>>>> > 2. The lock and release is really a valid concern. Actually in current
>>>> > design, we could not guarantee that the node who tries to write his
>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>> > address this issue, we need to store all the owners of the key. Only
>>>> when
>>>> > the owner is empty, the specific key(means a checkpoint or job graph)
>>>> could
>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>> when
>>>> > the old JobManager crashed exceptionally and do not release the lock.
>>>> To
>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>> to the
>>>> > check the JobManager timeout and then clean up the residual keys.
>>>> >
>>>> > 3. Frankly speaking, I am not against with this solution. However, in
>>>> my
>>>> > opinion, it is more like a temporary proposal. We could use
>>>> StatefulSet to
>>>> > avoid leader election and leader retrieval. But I am not sure whether
>>>> > TaskManager could properly handle the situation that same hostname
>>>> with
>>>> > different IPs, because the JobManager failed and relaunched. Also we
>>>> may
>>>> > still have two JobManagers running in some corner cases(e.g. kubelet
>>>> is
>>>> > down but the pod is running). Another concern is we have a strong
>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>> But it
>>>> > is not always true especially in self-build Kubernetes cluster.
>>>> Moreover,
>>>> > PV provider should guarantee that each PV could only be mounted once.
>>>> Since
>>>> > the native HA proposal could cover all the functionality of
>>>> StatefulSet
>>>> > proposal, that's why I prefer the former.
>>>> >
>>>> >
>>>> > [1].
>>>> >
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>> >
>>>> > Best,
>>>> > Yang
>>>> >
>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>> >
>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>>> users
>>>> >> will like a ZooKeeper-less HA setup.
>>>> >>
>>>> >> +1 for not separating the leader information and the leader election
>>>> if
>>>> >> possible. Maybe it is even possible that the contender writes his
>>>> leader
>>>> >> information directly when trying to obtain the leadership by
>>>> performing a
>>>> >> versioned write operation.
>>>> >>
>>>> >> Concerning the lock and release operation I have a question: Can
>>>> there be
>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>> how can
>>>> >> we ensure that the node which writes his ownership is actually the
>>>> leader
>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>> problem
>>>> >> (we should probably change it at some point to simply use a
>>>> >> transaction which checks whether the writer is still the leader) and
>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>> that
>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>> owner
>>>> >> will then be responsible for the cleanup of the node.
>>>> >>
>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>> because
>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>> key-value
>>>> >> pairs it might be simpler to start with this approach where we only
>>>> have
>>>> >> single JM. This might already add a lot of benefits for our users.
>>>> Was
>>>> >> there a specific reason why you discarded this proposal (other than
>>>> >> generality)?
>>>> >>
>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>> you
>>>> >> already implemented a K8s based HA service.
>>>> >>
>>>> >> Cheers,
>>>> >> Till
>>>> >>
>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >>> Hi Xintong and Stephan,
>>>> >>>
>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>> >>> comments inline.
>>>> >>>
>>>> >>> # Architecture -> One or two ConfigMaps
>>>> >>>
>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>> >>> implementation easier. Actually, in my POC codes,
>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>>> >>> server component) for the leader election
>>>> >>> and storage. Once a JobManager win the election, it will update the
>>>> >>> ConfigMap with leader address and periodically
>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>> update
>>>> >>> the FLIP document, including the architecture diagram,
>>>> >>> to avoid the misunderstanding.
>>>> >>>
>>>> >>>
>>>> >>> # HA storage > Lock and release
>>>> >>>
>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>> will be
>>>> >>> deleted by the ZK server automatically when
>>>> >>> the client is timeout. It could happen in a bad network environment
>>>> or
>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>> lock a
>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>> annotation
>>>> >>> will be cleaned up when releasing the lock. When
>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>>> >>> following conditions. If not, the delete operation could not be
>>>> done.
>>>> >>> * Current instance is the owner of the key.
>>>> >>> * The owner annotation is empty, which means the owner has released
>>>> the
>>>> >>> lock.
>>>> >>> * The owner annotation timed out, which usually indicate the owner
>>>> died.
>>>> >>>
>>>> >>>
>>>> >>> # HA storage > HA data clean up
>>>> >>>
>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>> pods
>>>> >>> to JobManager Deployment. So when we want to
>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>>> For
>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>> >>> so that they could be retained even though we delete the whole Flink
>>>> >>> cluster.
>>>> >>>
>>>> >>>
>>>> >>> [1].
>>>> >>>
>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>> >>> [2].
>>>> >>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>> >>>
>>>> >>>
>>>> >>> Best,
>>>> >>> Yang
>>>> >>>
>>>> >>>
>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>> >>>
>>>> >>>> This is a very cool feature proposal.
>>>> >>>>
>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> >>>> complicated to have the Leader RPC address in a different node
>>>> than the
>>>> >>>> LeaderLock. There is extra code needed to make sure these converge
>>>> and the
>>>> >>>> can be temporarily out of sync.
>>>> >>>>
>>>> >>>> A much easier design would be to have the RPC address as payload
>>>> in the
>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>> token is
>>>> >>>> stored as payload of the lock.
>>>> >>>> I think for the design above it would mean having a single
>>>> ConfigMap
>>>> >>>> for both leader lock and leader RPC address discovery.
>>>> >>>>
>>>> >>>> This probably serves as a good design principle in general - not
>>>> divide
>>>> >>>> information that is updated together over different resources.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Stephan
>>>> >>>>
>>>> >>>>
>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>> tonysong820@gmail.com>
>>>> >>>> wrote:
>>>> >>>>
>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>> >>>>>
>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>> reduce the
>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>>> this is an
>>>> >>>>> attractive feature for users.
>>>> >>>>>
>>>> >>>>> Concerning the proposed design, I have some questions. Might not
>>>> be
>>>> >>>>> problems, just trying to understand.
>>>> >>>>>
>>>> >>>>> ## Architecture
>>>> >>>>>
>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>> contending
>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>> ConfigMaps are
>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>> becoming leader
>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>> leader's
>>>> >>>>> address when trying to read `leader RPC address`?
>>>> >>>>>
>>>> >>>>> ## HA storage > Lock and release
>>>> >>>>>
>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>> lock so
>>>> >>>>> that other peers can write/remove the stored object. What if the
>>>> previous
>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>> Would there
>>>> >>>>> be any problem?
>>>> >>>>>
>>>> >>>>> ## HA storage > HA data clean up
>>>> >>>>>
>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>> <ClusterID>`,
>>>> >>>>> how are the HA dada retained?
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> Thank you~
>>>> >>>>>
>>>> >>>>> Xintong Song
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <danrtsey.wy@gmail.com
>>>> >
>>>> >>>>> wrote:
>>>> >>>>>
>>>> >>>>>> Hi devs and users,
>>>> >>>>>>
>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>> will
>>>> >>>>>> introduce
>>>> >>>>>> a new native high availability service for Kubernetes.
>>>> >>>>>>
>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>> widely
>>>> >>>>>> used
>>>> >>>>>> in production environments. It could be integrated in standalone
>>>> >>>>>> cluster,
>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>>>> K8s
>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>> cluster.
>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>> >>>>>> election[2]
>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>>> these
>>>> >>>>>> features and make running HA configured Flink cluster on K8s more
>>>> >>>>>> convenient.
>>>> >>>>>>
>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>>>> new
>>>> >>>>>> introduced KubernetesHaService.
>>>> >>>>>>
>>>> >>>>>> [1].
>>>> >>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>> >>>>>> [2].
>>>> >>>>>>
>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>> >>>>>> [3].
>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>> >>>>>>
>>>> >>>>>> Looking forward to your feedback.
>>>> >>>>>>
>>>> >>>>>> Best,
>>>> >>>>>> Yang
>>>> >>>>>>
>>>> >>>>>
>>>>
>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the clarifications Yang Wang.

2. Keeping the HA information relevant for a component (Dispatcher,
JobManager, ResourceManager) in a single ConfigMap sounds good. We should
check that we don't exceed the 1 MB size limit with this approach though.
The Dispatcher's ConfigMap would then contain the current leader, the
running jobs and the pointers to the persisted JobGraphs. The JobManager's
ConfigMap would then contain the current leader, the pointers to the
checkpoints and the checkpoint ID counter, for example.

3. Ah ok, I somehow thought that K8s would give us stronger guarantees than
Yarn in this regard. That's a pity.

Cheers,
Till

On Wed, Sep 30, 2020 at 10:03 AM tison <wa...@gmail.com> wrote:

> Thanks for your explanation. It would be fine if only checking leadership
> & actually write information is atomic.
>
> Best,
> tison.
>
>
> Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>
>> Thanks till and tison for your comments.
>>
>> @Till Rohrmann <tr...@apache.org>
>> 1. I am afraid we could not do this if we are going to use fabric8
>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>> client[1] also could not support it. Unless we implement a new
>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>> that we could gain too much from this.
>>
>> 2. Yes, the implementation will be a little complicated if we want to
>> completely eliminate the residual job graphs or checkpoints. Inspired by
>> your suggestion, another different solution has come into my mind. We could
>> use a same ConfigMap storing the JobManager leader, job graph,
>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>> the HA meta storage. Then it will be easier to guarantee that only the
>> leader could write the ConfigMap in a transactional operation. Since
>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>> transactional operation.
>>
>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
>> we still have the chances that two JobManager are running and trying to
>> get/delete a key in the same ConfigMap concurrently. Imagine that the
>> kubelet(like NodeManager in YARN) is down, and then the JobManager could
>> not be deleted. A new JobManager pod will be launched. We are just in the
>> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
>> is we do not need to implement a leader election/retrieval service.
>>
>> @tison
>> Actually, I do not think we will have such issue in the Kubernetes HA
>> service. In the Kubernetes LeaderElector[2], we have the leader information
>> stored on the annotation of leader ConfigMap. So it would not happen the
>> old leader could wrongly override the leader information. Once a JobManager
>> want to write his leader information to the ConfigMap, it will check
>> whether it is the leader now. If not, anything will happen. Moreover, the
>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>> written a different update while the client was in the process of
>> performing its update.
>>
>>
>> [1].
>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>> [2].
>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>> [3].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>
>>
>> Best,
>> Yang
>>
>> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>
>>> Hi,
>>>
>>> Generally +1 for a native k8s HA service.
>>>
>>> For leader election & publish leader information, there was a
>>> discussion[1]
>>> pointed out that since these two actions is NOT atomic, there will be
>>> always
>>> edge case where a previous leader overwrite leader information, even with
>>> versioned write. Versioned write helps on read again if version
>>> mismatches
>>> so if we want version write works, information in the kv pair should
>>> help the
>>> contender reflects whether it is the current leader.
>>>
>>> The idea of writes leader information on contender node or something
>>> equivalent makes sense but the details depends on how it is implemented.
>>> General problems are that
>>>
>>> 1. TM might be a bit late before it updated correct leader information
>>> but
>>> only if the leader election process is short and leadership is stable at
>>> most
>>> time, it won't be a serious issue.
>>> 2. The process TM extract leader information might be a bit more complex
>>> than directly watching a fixed key.
>>>
>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>> txn
>>> but it causes more developing efforts. ConfigMap and encapsulated
>>> interface,
>>> thought, provides only a self-consistent mechanism which doesn't promise
>>> more consistency for extension.
>>>
>>> Best,
>>> tison.
>>>
>>> [1]
>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>
>>>
>>>
>>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>
>>>> For 1. I was wondering whether we can't write the leader connection
>>>> information directly when trying to obtain the leadership (trying to
>>>> update
>>>> the leader key with one's own value)? This might be a little detail,
>>>> though.
>>>>
>>>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>> implementation a bit, unfortunately.
>>>>
>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>> configure a different persistent storage like HDFS or S3 for storing the
>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>> benefit I
>>>> see is that we avoid having to implement this multi locking mechanism in
>>>> the ConfigMaps using the annotations because we can be sure that there
>>>> is
>>>> only a single leader at a time if I understood the guarantees of K8s
>>>> correctly.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi Till, thanks for your valuable feedback.
>>>> >
>>>> > 1. Yes, leader election and storing leader information will use a same
>>>> > ConfigMap. When a contender successfully performs a versioned
>>>> annotation
>>>> > update operation to the ConfigMap, it means that it has been elected
>>>> as the
>>>> > leader. And it will write the leader information in the callback of
>>>> leader
>>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>>> > leader ConfigMap is wrongly updated.
>>>> >
>>>> > 2. The lock and release is really a valid concern. Actually in current
>>>> > design, we could not guarantee that the node who tries to write his
>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>> > address this issue, we need to store all the owners of the key. Only
>>>> when
>>>> > the owner is empty, the specific key(means a checkpoint or job graph)
>>>> could
>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>> when
>>>> > the old JobManager crashed exceptionally and do not release the lock.
>>>> To
>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>> to the
>>>> > check the JobManager timeout and then clean up the residual keys.
>>>> >
>>>> > 3. Frankly speaking, I am not against with this solution. However, in
>>>> my
>>>> > opinion, it is more like a temporary proposal. We could use
>>>> StatefulSet to
>>>> > avoid leader election and leader retrieval. But I am not sure whether
>>>> > TaskManager could properly handle the situation that same hostname
>>>> with
>>>> > different IPs, because the JobManager failed and relaunched. Also we
>>>> may
>>>> > still have two JobManagers running in some corner cases(e.g. kubelet
>>>> is
>>>> > down but the pod is running). Another concern is we have a strong
>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>> But it
>>>> > is not always true especially in self-build Kubernetes cluster.
>>>> Moreover,
>>>> > PV provider should guarantee that each PV could only be mounted once.
>>>> Since
>>>> > the native HA proposal could cover all the functionality of
>>>> StatefulSet
>>>> > proposal, that's why I prefer the former.
>>>> >
>>>> >
>>>> > [1].
>>>> >
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>> >
>>>> > Best,
>>>> > Yang
>>>> >
>>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>> >
>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>>> users
>>>> >> will like a ZooKeeper-less HA setup.
>>>> >>
>>>> >> +1 for not separating the leader information and the leader election
>>>> if
>>>> >> possible. Maybe it is even possible that the contender writes his
>>>> leader
>>>> >> information directly when trying to obtain the leadership by
>>>> performing a
>>>> >> versioned write operation.
>>>> >>
>>>> >> Concerning the lock and release operation I have a question: Can
>>>> there be
>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>> how can
>>>> >> we ensure that the node which writes his ownership is actually the
>>>> leader
>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>> problem
>>>> >> (we should probably change it at some point to simply use a
>>>> >> transaction which checks whether the writer is still the leader) and
>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>> that
>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>> owner
>>>> >> will then be responsible for the cleanup of the node.
>>>> >>
>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>> because
>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>> key-value
>>>> >> pairs it might be simpler to start with this approach where we only
>>>> have
>>>> >> single JM. This might already add a lot of benefits for our users.
>>>> Was
>>>> >> there a specific reason why you discarded this proposal (other than
>>>> >> generality)?
>>>> >>
>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>> you
>>>> >> already implemented a K8s based HA service.
>>>> >>
>>>> >> Cheers,
>>>> >> Till
>>>> >>
>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >>> Hi Xintong and Stephan,
>>>> >>>
>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>> >>> comments inline.
>>>> >>>
>>>> >>> # Architecture -> One or two ConfigMaps
>>>> >>>
>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>> >>> implementation easier. Actually, in my POC codes,
>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>>> >>> server component) for the leader election
>>>> >>> and storage. Once a JobManager win the election, it will update the
>>>> >>> ConfigMap with leader address and periodically
>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>> update
>>>> >>> the FLIP document, including the architecture diagram,
>>>> >>> to avoid the misunderstanding.
>>>> >>>
>>>> >>>
>>>> >>> # HA storage > Lock and release
>>>> >>>
>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>> will be
>>>> >>> deleted by the ZK server automatically when
>>>> >>> the client is timeout. It could happen in a bad network environment
>>>> or
>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>> lock a
>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>> annotation
>>>> >>> will be cleaned up when releasing the lock. When
>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>>> >>> following conditions. If not, the delete operation could not be
>>>> done.
>>>> >>> * Current instance is the owner of the key.
>>>> >>> * The owner annotation is empty, which means the owner has released
>>>> the
>>>> >>> lock.
>>>> >>> * The owner annotation timed out, which usually indicate the owner
>>>> died.
>>>> >>>
>>>> >>>
>>>> >>> # HA storage > HA data clean up
>>>> >>>
>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>> pods
>>>> >>> to JobManager Deployment. So when we want to
>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>>> For
>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>> >>> so that they could be retained even though we delete the whole Flink
>>>> >>> cluster.
>>>> >>>
>>>> >>>
>>>> >>> [1].
>>>> >>>
>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>> >>> [2].
>>>> >>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>> >>>
>>>> >>>
>>>> >>> Best,
>>>> >>> Yang
>>>> >>>
>>>> >>>
>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>> >>>
>>>> >>>> This is a very cool feature proposal.
>>>> >>>>
>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> >>>> complicated to have the Leader RPC address in a different node
>>>> than the
>>>> >>>> LeaderLock. There is extra code needed to make sure these converge
>>>> and the
>>>> >>>> can be temporarily out of sync.
>>>> >>>>
>>>> >>>> A much easier design would be to have the RPC address as payload
>>>> in the
>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>> token is
>>>> >>>> stored as payload of the lock.
>>>> >>>> I think for the design above it would mean having a single
>>>> ConfigMap
>>>> >>>> for both leader lock and leader RPC address discovery.
>>>> >>>>
>>>> >>>> This probably serves as a good design principle in general - not
>>>> divide
>>>> >>>> information that is updated together over different resources.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Stephan
>>>> >>>>
>>>> >>>>
>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>> tonysong820@gmail.com>
>>>> >>>> wrote:
>>>> >>>>
>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>> >>>>>
>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>> reduce the
>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>>> this is an
>>>> >>>>> attractive feature for users.
>>>> >>>>>
>>>> >>>>> Concerning the proposed design, I have some questions. Might not
>>>> be
>>>> >>>>> problems, just trying to understand.
>>>> >>>>>
>>>> >>>>> ## Architecture
>>>> >>>>>
>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>> contending
>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>> ConfigMaps are
>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>> becoming leader
>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>> leader's
>>>> >>>>> address when trying to read `leader RPC address`?
>>>> >>>>>
>>>> >>>>> ## HA storage > Lock and release
>>>> >>>>>
>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>> lock so
>>>> >>>>> that other peers can write/remove the stored object. What if the
>>>> previous
>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>> Would there
>>>> >>>>> be any problem?
>>>> >>>>>
>>>> >>>>> ## HA storage > HA data clean up
>>>> >>>>>
>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>> <ClusterID>`,
>>>> >>>>> how are the HA dada retained?
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> Thank you~
>>>> >>>>>
>>>> >>>>> Xintong Song
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <danrtsey.wy@gmail.com
>>>> >
>>>> >>>>> wrote:
>>>> >>>>>
>>>> >>>>>> Hi devs and users,
>>>> >>>>>>
>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>> will
>>>> >>>>>> introduce
>>>> >>>>>> a new native high availability service for Kubernetes.
>>>> >>>>>>
>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>> widely
>>>> >>>>>> used
>>>> >>>>>> in production environments. It could be integrated in standalone
>>>> >>>>>> cluster,
>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>>>> K8s
>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>> cluster.
>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>> >>>>>> election[2]
>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>>> these
>>>> >>>>>> features and make running HA configured Flink cluster on K8s more
>>>> >>>>>> convenient.
>>>> >>>>>>
>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>>>> new
>>>> >>>>>> introduced KubernetesHaService.
>>>> >>>>>>
>>>> >>>>>> [1].
>>>> >>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>> >>>>>> [2].
>>>> >>>>>>
>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>> >>>>>> [3].
>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>> >>>>>>
>>>> >>>>>> Looking forward to your feedback.
>>>> >>>>>>
>>>> >>>>>> Best,
>>>> >>>>>> Yang
>>>> >>>>>>
>>>> >>>>>
>>>>
>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by tison <wa...@gmail.com>.
Thanks for your explanation. It would be fine if only checking leadership &
actually write information is atomic.

Best,
tison.


Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:

> Thanks till and tison for your comments.
>
> @Till Rohrmann <tr...@apache.org>
> 1. I am afraid we could not do this if we are going to use fabric8
> Kubernetes client SDK for the leader election. The official Kubernetes Java
> client[1] also could not support it. Unless we implement a new
> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
> that we could gain too much from this.
>
> 2. Yes, the implementation will be a little complicated if we want to
> completely eliminate the residual job graphs or checkpoints. Inspired by
> your suggestion, another different solution has come into my mind. We could
> use a same ConfigMap storing the JobManager leader, job graph,
> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
> the HA meta storage. Then it will be easier to guarantee that only the
> leader could write the ConfigMap in a transactional operation. Since
> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> transactional operation.
>
> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
> we still have the chances that two JobManager are running and trying to
> get/delete a key in the same ConfigMap concurrently. Imagine that the
> kubelet(like NodeManager in YARN) is down, and then the JobManager could
> not be deleted. A new JobManager pod will be launched. We are just in the
> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
> is we do not need to implement a leader election/retrieval service.
>
> @tison
> Actually, I do not think we will have such issue in the Kubernetes HA
> service. In the Kubernetes LeaderElector[2], we have the leader information
> stored on the annotation of leader ConfigMap. So it would not happen the
> old leader could wrongly override the leader information. Once a JobManager
> want to write his leader information to the ConfigMap, it will check
> whether it is the leader now. If not, anything will happen. Moreover, the
> Kubernetes Resource Version[3] ensures that no one else has snuck in and
> written a different update while the client was in the process of
> performing its update.
>
>
> [1].
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> [2].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
> [3].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>
>
> Best,
> Yang
>
> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>
>> Hi,
>>
>> Generally +1 for a native k8s HA service.
>>
>> For leader election & publish leader information, there was a
>> discussion[1]
>> pointed out that since these two actions is NOT atomic, there will be
>> always
>> edge case where a previous leader overwrite leader information, even with
>> versioned write. Versioned write helps on read again if version mismatches
>> so if we want version write works, information in the kv pair should help
>> the
>> contender reflects whether it is the current leader.
>>
>> The idea of writes leader information on contender node or something
>> equivalent makes sense but the details depends on how it is implemented.
>> General problems are that
>>
>> 1. TM might be a bit late before it updated correct leader information
>> but
>> only if the leader election process is short and leadership is stable at
>> most
>> time, it won't be a serious issue.
>> 2. The process TM extract leader information might be a bit more complex
>> than directly watching a fixed key.
>>
>> Atomic issue can be addressed if one leverages low APIs such as lease &
>> txn
>> but it causes more developing efforts. ConfigMap and encapsulated
>> interface,
>> thought, provides only a self-consistent mechanism which doesn't promise
>> more consistency for extension.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>
>>
>>
>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>
>>> For 1. I was wondering whether we can't write the leader connection
>>> information directly when trying to obtain the leadership (trying to
>>> update
>>> the leader key with one's own value)? This might be a little detail,
>>> though.
>>>
>>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>>> with the ephemeral lock nodes. I guess that this complicates the
>>> implementation a bit, unfortunately.
>>>
>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>> configure a different persistent storage like HDFS or S3 for storing the
>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>> benefit I
>>> see is that we avoid having to implement this multi locking mechanism in
>>> the ConfigMaps using the annotations because we can be sure that there is
>>> only a single leader at a time if I understood the guarantees of K8s
>>> correctly.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>> > Hi Till, thanks for your valuable feedback.
>>> >
>>> > 1. Yes, leader election and storing leader information will use a same
>>> > ConfigMap. When a contender successfully performs a versioned
>>> annotation
>>> > update operation to the ConfigMap, it means that it has been elected
>>> as the
>>> > leader. And it will write the leader information in the callback of
>>> leader
>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>> > leader ConfigMap is wrongly updated.
>>> >
>>> > 2. The lock and release is really a valid concern. Actually in current
>>> > design, we could not guarantee that the node who tries to write his
>>> > ownership is the real leader. Who writes later, who is the owner. To
>>> > address this issue, we need to store all the owners of the key. Only
>>> when
>>> > the owner is empty, the specific key(means a checkpoint or job graph)
>>> could
>>> > be deleted. However, we may have a residual checkpoint or job graph
>>> when
>>> > the old JobManager crashed exceptionally and do not release the lock.
>>> To
>>> > solve this problem completely, we need a timestamp renew mechanism
>>> > for CompletedCheckpointStore and JobGraphStore, which could help us to
>>> the
>>> > check the JobManager timeout and then clean up the residual keys.
>>> >
>>> > 3. Frankly speaking, I am not against with this solution. However, in
>>> my
>>> > opinion, it is more like a temporary proposal. We could use
>>> StatefulSet to
>>> > avoid leader election and leader retrieval. But I am not sure whether
>>> > TaskManager could properly handle the situation that same hostname with
>>> > different IPs, because the JobManager failed and relaunched. Also we
>>> may
>>> > still have two JobManagers running in some corner cases(e.g. kubelet is
>>> > down but the pod is running). Another concern is we have a strong
>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But
>>> it
>>> > is not always true especially in self-build Kubernetes cluster.
>>> Moreover,
>>> > PV provider should guarantee that each PV could only be mounted once.
>>> Since
>>> > the native HA proposal could cover all the functionality of StatefulSet
>>> > proposal, that's why I prefer the former.
>>> >
>>> >
>>> > [1].
>>> >
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>> >
>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>> users
>>> >> will like a ZooKeeper-less HA setup.
>>> >>
>>> >> +1 for not separating the leader information and the leader election
>>> if
>>> >> possible. Maybe it is even possible that the contender writes his
>>> leader
>>> >> information directly when trying to obtain the leadership by
>>> performing a
>>> >> versioned write operation.
>>> >>
>>> >> Concerning the lock and release operation I have a question: Can
>>> there be
>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>> how can
>>> >> we ensure that the node which writes his ownership is actually the
>>> leader
>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>> problem
>>> >> (we should probably change it at some point to simply use a
>>> >> transaction which checks whether the writer is still the leader) and
>>> >> therefore introduced the ephemeral lock nodes. What they allow is that
>>> >> there can be multiple owners of a given ZNode at a time. The last
>>> owner
>>> >> will then be responsible for the cleanup of the node.
>>> >>
>>> >> I see the benefit of your proposal over the stateful set proposal
>>> because
>>> >> it can support multiple standby JMs. Given the problem of locking
>>> key-value
>>> >> pairs it might be simpler to start with this approach where we only
>>> have
>>> >> single JM. This might already add a lot of benefits for our users. Was
>>> >> there a specific reason why you discarded this proposal (other than
>>> >> generality)?
>>> >>
>>> >> @Uce it would be great to hear your feedback on the proposal since you
>>> >> already implemented a K8s based HA service.
>>> >>
>>> >> Cheers,
>>> >> Till
>>> >>
>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>> wrote:
>>> >>
>>> >>> Hi Xintong and Stephan,
>>> >>>
>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>> >>> comments inline.
>>> >>>
>>> >>> # Architecture -> One or two ConfigMaps
>>> >>>
>>> >>> Both of you are right. One ConfigMap will make the design and
>>> >>> implementation easier. Actually, in my POC codes,
>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> >>> server component) for the leader election
>>> >>> and storage. Once a JobManager win the election, it will update the
>>> >>> ConfigMap with leader address and periodically
>>> >>> renew the lock annotation to keep as the active leader. I will update
>>> >>> the FLIP document, including the architecture diagram,
>>> >>> to avoid the misunderstanding.
>>> >>>
>>> >>>
>>> >>> # HA storage > Lock and release
>>> >>>
>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>> will be
>>> >>> deleted by the ZK server automatically when
>>> >>> the client is timeout. It could happen in a bad network environment
>>> or
>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>> >>> we need to implement a similar mechanism. First, when we want to
>>> lock a
>>> >>> specific key in ConfigMap, we will put the owner identify,
>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>> annotation
>>> >>> will be cleaned up when releasing the lock. When
>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>> >>> following conditions. If not, the delete operation could not be done.
>>> >>> * Current instance is the owner of the key.
>>> >>> * The owner annotation is empty, which means the owner has released
>>> the
>>> >>> lock.
>>> >>> * The owner annotation timed out, which usually indicate the owner
>>> died.
>>> >>>
>>> >>>
>>> >>> # HA storage > HA data clean up
>>> >>>
>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>> pods
>>> >>> to JobManager Deployment. So when we want to
>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>> For
>>> >>> the HA related ConfigMaps, we do not set the owner
>>> >>> so that they could be retained even though we delete the whole Flink
>>> >>> cluster.
>>> >>>
>>> >>>
>>> >>> [1].
>>> >>>
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> >>> [2].
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>> >>>
>>> >>>
>>> >>> Best,
>>> >>> Yang
>>> >>>
>>> >>>
>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>> >>>
>>> >>>> This is a very cool feature proposal.
>>> >>>>
>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>> >>>> complicated to have the Leader RPC address in a different node than
>>> the
>>> >>>> LeaderLock. There is extra code needed to make sure these converge
>>> and the
>>> >>>> can be temporarily out of sync.
>>> >>>>
>>> >>>> A much easier design would be to have the RPC address as payload in
>>> the
>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>> token is
>>> >>>> stored as payload of the lock.
>>> >>>> I think for the design above it would mean having a single ConfigMap
>>> >>>> for both leader lock and leader RPC address discovery.
>>> >>>>
>>> >>>> This probably serves as a good design principle in general - not
>>> divide
>>> >>>> information that is updated together over different resources.
>>> >>>>
>>> >>>> Best,
>>> >>>> Stephan
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>> tonysong820@gmail.com>
>>> >>>> wrote:
>>> >>>>
>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>> >>>>>
>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>> reduce the
>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>> this is an
>>> >>>>> attractive feature for users.
>>> >>>>>
>>> >>>>> Concerning the proposed design, I have some questions. Might not be
>>> >>>>> problems, just trying to understand.
>>> >>>>>
>>> >>>>> ## Architecture
>>> >>>>>
>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>> contending
>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>> ConfigMaps are
>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>> becoming leader
>>> >>>>> (lock for contending leader updated), but still gets the old
>>> leader's
>>> >>>>> address when trying to read `leader RPC address`?
>>> >>>>>
>>> >>>>> ## HA storage > Lock and release
>>> >>>>>
>>> >>>>> It seems to me that the owner needs to explicitly release the lock
>>> so
>>> >>>>> that other peers can write/remove the stored object. What if the
>>> previous
>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>> Would there
>>> >>>>> be any problem?
>>> >>>>>
>>> >>>>> ## HA storage > HA data clean up
>>> >>>>>
>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>> <ClusterID>`,
>>> >>>>> how are the HA dada retained?
>>> >>>>>
>>> >>>>>
>>> >>>>> Thank you~
>>> >>>>>
>>> >>>>> Xintong Song
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>>> Hi devs and users,
>>> >>>>>>
>>> >>>>>> I would like to start the discussion about FLIP-144[1], which will
>>> >>>>>> introduce
>>> >>>>>> a new native high availability service for Kubernetes.
>>> >>>>>>
>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>> >>>>>> used
>>> >>>>>> in production environments. It could be integrated in standalone
>>> >>>>>> cluster,
>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>>> K8s
>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>> cluster.
>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>> >>>>>> election[2]
>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>> these
>>> >>>>>> features and make running HA configured Flink cluster on K8s more
>>> >>>>>> convenient.
>>> >>>>>>
>>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>>> new
>>> >>>>>> introduced KubernetesHaService.
>>> >>>>>>
>>> >>>>>> [1].
>>> >>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>> >>>>>> [2].
>>> >>>>>>
>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>> >>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>> >>>>>>
>>> >>>>>> Looking forward to your feedback.
>>> >>>>>>
>>> >>>>>> Best,
>>> >>>>>> Yang
>>> >>>>>>
>>> >>>>>
>>>
>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by tison <wa...@gmail.com>.
Thanks for your explanation. It would be fine if only checking leadership &
actually write information is atomic.

Best,
tison.


Yang Wang <da...@gmail.com> 于2020年9月30日周三 下午3:57写道:

> Thanks till and tison for your comments.
>
> @Till Rohrmann <tr...@apache.org>
> 1. I am afraid we could not do this if we are going to use fabric8
> Kubernetes client SDK for the leader election. The official Kubernetes Java
> client[1] also could not support it. Unless we implement a new
> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
> that we could gain too much from this.
>
> 2. Yes, the implementation will be a little complicated if we want to
> completely eliminate the residual job graphs or checkpoints. Inspired by
> your suggestion, another different solution has come into my mind. We could
> use a same ConfigMap storing the JobManager leader, job graph,
> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
> the HA meta storage. Then it will be easier to guarantee that only the
> leader could write the ConfigMap in a transactional operation. Since
> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> transactional operation.
>
> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
> we still have the chances that two JobManager are running and trying to
> get/delete a key in the same ConfigMap concurrently. Imagine that the
> kubelet(like NodeManager in YARN) is down, and then the JobManager could
> not be deleted. A new JobManager pod will be launched. We are just in the
> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
> is we do not need to implement a leader election/retrieval service.
>
> @tison
> Actually, I do not think we will have such issue in the Kubernetes HA
> service. In the Kubernetes LeaderElector[2], we have the leader information
> stored on the annotation of leader ConfigMap. So it would not happen the
> old leader could wrongly override the leader information. Once a JobManager
> want to write his leader information to the ConfigMap, it will check
> whether it is the leader now. If not, anything will happen. Moreover, the
> Kubernetes Resource Version[3] ensures that no one else has snuck in and
> written a different update while the client was in the process of
> performing its update.
>
>
> [1].
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> [2].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
> [3].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>
>
> Best,
> Yang
>
> tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>
>> Hi,
>>
>> Generally +1 for a native k8s HA service.
>>
>> For leader election & publish leader information, there was a
>> discussion[1]
>> pointed out that since these two actions is NOT atomic, there will be
>> always
>> edge case where a previous leader overwrite leader information, even with
>> versioned write. Versioned write helps on read again if version mismatches
>> so if we want version write works, information in the kv pair should help
>> the
>> contender reflects whether it is the current leader.
>>
>> The idea of writes leader information on contender node or something
>> equivalent makes sense but the details depends on how it is implemented.
>> General problems are that
>>
>> 1. TM might be a bit late before it updated correct leader information
>> but
>> only if the leader election process is short and leadership is stable at
>> most
>> time, it won't be a serious issue.
>> 2. The process TM extract leader information might be a bit more complex
>> than directly watching a fixed key.
>>
>> Atomic issue can be addressed if one leverages low APIs such as lease &
>> txn
>> but it causes more developing efforts. ConfigMap and encapsulated
>> interface,
>> thought, provides only a self-consistent mechanism which doesn't promise
>> more consistency for extension.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>
>>
>>
>> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>
>>> For 1. I was wondering whether we can't write the leader connection
>>> information directly when trying to obtain the leadership (trying to
>>> update
>>> the leader key with one's own value)? This might be a little detail,
>>> though.
>>>
>>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>>> with the ephemeral lock nodes. I guess that this complicates the
>>> implementation a bit, unfortunately.
>>>
>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>> configure a different persistent storage like HDFS or S3 for storing the
>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>> benefit I
>>> see is that we avoid having to implement this multi locking mechanism in
>>> the ConfigMaps using the annotations because we can be sure that there is
>>> only a single leader at a time if I understood the guarantees of K8s
>>> correctly.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>> > Hi Till, thanks for your valuable feedback.
>>> >
>>> > 1. Yes, leader election and storing leader information will use a same
>>> > ConfigMap. When a contender successfully performs a versioned
>>> annotation
>>> > update operation to the ConfigMap, it means that it has been elected
>>> as the
>>> > leader. And it will write the leader information in the callback of
>>> leader
>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>> > leader ConfigMap is wrongly updated.
>>> >
>>> > 2. The lock and release is really a valid concern. Actually in current
>>> > design, we could not guarantee that the node who tries to write his
>>> > ownership is the real leader. Who writes later, who is the owner. To
>>> > address this issue, we need to store all the owners of the key. Only
>>> when
>>> > the owner is empty, the specific key(means a checkpoint or job graph)
>>> could
>>> > be deleted. However, we may have a residual checkpoint or job graph
>>> when
>>> > the old JobManager crashed exceptionally and do not release the lock.
>>> To
>>> > solve this problem completely, we need a timestamp renew mechanism
>>> > for CompletedCheckpointStore and JobGraphStore, which could help us to
>>> the
>>> > check the JobManager timeout and then clean up the residual keys.
>>> >
>>> > 3. Frankly speaking, I am not against with this solution. However, in
>>> my
>>> > opinion, it is more like a temporary proposal. We could use
>>> StatefulSet to
>>> > avoid leader election and leader retrieval. But I am not sure whether
>>> > TaskManager could properly handle the situation that same hostname with
>>> > different IPs, because the JobManager failed and relaunched. Also we
>>> may
>>> > still have two JobManagers running in some corner cases(e.g. kubelet is
>>> > down but the pod is running). Another concern is we have a strong
>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But
>>> it
>>> > is not always true especially in self-build Kubernetes cluster.
>>> Moreover,
>>> > PV provider should guarantee that each PV could only be mounted once.
>>> Since
>>> > the native HA proposal could cover all the functionality of StatefulSet
>>> > proposal, that's why I prefer the former.
>>> >
>>> >
>>> > [1].
>>> >
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>> >
>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>> users
>>> >> will like a ZooKeeper-less HA setup.
>>> >>
>>> >> +1 for not separating the leader information and the leader election
>>> if
>>> >> possible. Maybe it is even possible that the contender writes his
>>> leader
>>> >> information directly when trying to obtain the leadership by
>>> performing a
>>> >> versioned write operation.
>>> >>
>>> >> Concerning the lock and release operation I have a question: Can
>>> there be
>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>> how can
>>> >> we ensure that the node which writes his ownership is actually the
>>> leader
>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>> problem
>>> >> (we should probably change it at some point to simply use a
>>> >> transaction which checks whether the writer is still the leader) and
>>> >> therefore introduced the ephemeral lock nodes. What they allow is that
>>> >> there can be multiple owners of a given ZNode at a time. The last
>>> owner
>>> >> will then be responsible for the cleanup of the node.
>>> >>
>>> >> I see the benefit of your proposal over the stateful set proposal
>>> because
>>> >> it can support multiple standby JMs. Given the problem of locking
>>> key-value
>>> >> pairs it might be simpler to start with this approach where we only
>>> have
>>> >> single JM. This might already add a lot of benefits for our users. Was
>>> >> there a specific reason why you discarded this proposal (other than
>>> >> generality)?
>>> >>
>>> >> @Uce it would be great to hear your feedback on the proposal since you
>>> >> already implemented a K8s based HA service.
>>> >>
>>> >> Cheers,
>>> >> Till
>>> >>
>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>>> wrote:
>>> >>
>>> >>> Hi Xintong and Stephan,
>>> >>>
>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>> >>> comments inline.
>>> >>>
>>> >>> # Architecture -> One or two ConfigMaps
>>> >>>
>>> >>> Both of you are right. One ConfigMap will make the design and
>>> >>> implementation easier. Actually, in my POC codes,
>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> >>> server component) for the leader election
>>> >>> and storage. Once a JobManager win the election, it will update the
>>> >>> ConfigMap with leader address and periodically
>>> >>> renew the lock annotation to keep as the active leader. I will update
>>> >>> the FLIP document, including the architecture diagram,
>>> >>> to avoid the misunderstanding.
>>> >>>
>>> >>>
>>> >>> # HA storage > Lock and release
>>> >>>
>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>> will be
>>> >>> deleted by the ZK server automatically when
>>> >>> the client is timeout. It could happen in a bad network environment
>>> or
>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>> >>> we need to implement a similar mechanism. First, when we want to
>>> lock a
>>> >>> specific key in ConfigMap, we will put the owner identify,
>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>> annotation
>>> >>> will be cleaned up when releasing the lock. When
>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>> >>> following conditions. If not, the delete operation could not be done.
>>> >>> * Current instance is the owner of the key.
>>> >>> * The owner annotation is empty, which means the owner has released
>>> the
>>> >>> lock.
>>> >>> * The owner annotation timed out, which usually indicate the owner
>>> died.
>>> >>>
>>> >>>
>>> >>> # HA storage > HA data clean up
>>> >>>
>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>> pods
>>> >>> to JobManager Deployment. So when we want to
>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>> For
>>> >>> the HA related ConfigMaps, we do not set the owner
>>> >>> so that they could be retained even though we delete the whole Flink
>>> >>> cluster.
>>> >>>
>>> >>>
>>> >>> [1].
>>> >>>
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> >>> [2].
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>> >>>
>>> >>>
>>> >>> Best,
>>> >>> Yang
>>> >>>
>>> >>>
>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>> >>>
>>> >>>> This is a very cool feature proposal.
>>> >>>>
>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>> >>>> complicated to have the Leader RPC address in a different node than
>>> the
>>> >>>> LeaderLock. There is extra code needed to make sure these converge
>>> and the
>>> >>>> can be temporarily out of sync.
>>> >>>>
>>> >>>> A much easier design would be to have the RPC address as payload in
>>> the
>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>> token is
>>> >>>> stored as payload of the lock.
>>> >>>> I think for the design above it would mean having a single ConfigMap
>>> >>>> for both leader lock and leader RPC address discovery.
>>> >>>>
>>> >>>> This probably serves as a good design principle in general - not
>>> divide
>>> >>>> information that is updated together over different resources.
>>> >>>>
>>> >>>> Best,
>>> >>>> Stephan
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>> tonysong820@gmail.com>
>>> >>>> wrote:
>>> >>>>
>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>> >>>>>
>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>> reduce the
>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>> this is an
>>> >>>>> attractive feature for users.
>>> >>>>>
>>> >>>>> Concerning the proposed design, I have some questions. Might not be
>>> >>>>> problems, just trying to understand.
>>> >>>>>
>>> >>>>> ## Architecture
>>> >>>>>
>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>> contending
>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>> ConfigMaps are
>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>> becoming leader
>>> >>>>> (lock for contending leader updated), but still gets the old
>>> leader's
>>> >>>>> address when trying to read `leader RPC address`?
>>> >>>>>
>>> >>>>> ## HA storage > Lock and release
>>> >>>>>
>>> >>>>> It seems to me that the owner needs to explicitly release the lock
>>> so
>>> >>>>> that other peers can write/remove the stored object. What if the
>>> previous
>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>> Would there
>>> >>>>> be any problem?
>>> >>>>>
>>> >>>>> ## HA storage > HA data clean up
>>> >>>>>
>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>> <ClusterID>`,
>>> >>>>> how are the HA dada retained?
>>> >>>>>
>>> >>>>>
>>> >>>>> Thank you~
>>> >>>>>
>>> >>>>> Xintong Song
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>>> Hi devs and users,
>>> >>>>>>
>>> >>>>>> I would like to start the discussion about FLIP-144[1], which will
>>> >>>>>> introduce
>>> >>>>>> a new native high availability service for Kubernetes.
>>> >>>>>>
>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>> >>>>>> used
>>> >>>>>> in production environments. It could be integrated in standalone
>>> >>>>>> cluster,
>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>>> K8s
>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>> cluster.
>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>> >>>>>> election[2]
>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>> these
>>> >>>>>> features and make running HA configured Flink cluster on K8s more
>>> >>>>>> convenient.
>>> >>>>>>
>>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>>> new
>>> >>>>>> introduced KubernetesHaService.
>>> >>>>>>
>>> >>>>>> [1].
>>> >>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>> >>>>>> [2].
>>> >>>>>>
>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>> >>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>> >>>>>>
>>> >>>>>> Looking forward to your feedback.
>>> >>>>>>
>>> >>>>>> Best,
>>> >>>>>> Yang
>>> >>>>>>
>>> >>>>>
>>>
>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
Thanks till and tison for your comments.

@Till Rohrmann <tr...@apache.org>
1. I am afraid we could not do this if we are going to use fabric8
Kubernetes client SDK for the leader election. The official Kubernetes Java
client[1] also could not support it. Unless we implement a new
LeaderElector in Flink based on the very basic Kubernetes API. But it seems
that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to
completely eliminate the residual job graphs or checkpoints. Inspired by
your suggestion, another different solution has come into my mind. We could
use a same ConfigMap storing the JobManager leader, job graph,
checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
the HA meta storage. Then it will be easier to guarantee that only the
leader could write the ConfigMap in a transactional operation. Since
“Get(check the leader)-and-Update(write back to the ConfigMap)” is a
transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
we still have the chances that two JobManager are running and trying to
get/delete a key in the same ConfigMap concurrently. Imagine that the
kubelet(like NodeManager in YARN) is down, and then the JobManager could
not be deleted. A new JobManager pod will be launched. We are just in the
similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA
service. In the Kubernetes LeaderElector[2], we have the leader information
stored on the annotation of leader ConfigMap. So it would not happen the
old leader could wrongly override the leader information. Once a JobManager
want to write his leader information to the ConfigMap, it will check
whether it is the leader now. If not, anything will happen. Moreover, the
Kubernetes Resource Version[3] ensures that no one else has snuck in and
written a different update while the client was in the process of
performing its update.


[1].
https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
[2].
https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
<https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
[3].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion


Best,
Yang

tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:

> Hi,
>
> Generally +1 for a native k8s HA service.
>
> For leader election & publish leader information, there was a discussion[1]
> pointed out that since these two actions is NOT atomic, there will be
> always
> edge case where a previous leader overwrite leader information, even with
> versioned write. Versioned write helps on read again if version mismatches
> so if we want version write works, information in the kv pair should help
> the
> contender reflects whether it is the current leader.
>
> The idea of writes leader information on contender node or something
> equivalent makes sense but the details depends on how it is implemented.
> General problems are that
>
> 1. TM might be a bit late before it updated correct leader information but
> only if the leader election process is short and leadership is stable at
> most
> time, it won't be a serious issue.
> 2. The process TM extract leader information might be a bit more complex
> than directly watching a fixed key.
>
> Atomic issue can be addressed if one leverages low APIs such as lease & txn
> but it causes more developing efforts. ConfigMap and encapsulated
> interface,
> thought, provides only a self-consistent mechanism which doesn't promise
> more consistency for extension.
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>
>
>
> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>
>> For 1. I was wondering whether we can't write the leader connection
>> information directly when trying to obtain the leadership (trying to
>> update
>> the leader key with one's own value)? This might be a little detail,
>> though.
>>
>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>> with the ephemeral lock nodes. I guess that this complicates the
>> implementation a bit, unfortunately.
>>
>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>> configure a different persistent storage like HDFS or S3 for storing the
>> checkpoints and job blobs like in the ZooKeeper case. The current benefit
>> I
>> see is that we avoid having to implement this multi locking mechanism in
>> the ConfigMaps using the annotations because we can be sure that there is
>> only a single leader at a time if I understood the guarantees of K8s
>> correctly.
>>
>> Cheers,
>> Till
>>
>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:
>>
>> > Hi Till, thanks for your valuable feedback.
>> >
>> > 1. Yes, leader election and storing leader information will use a same
>> > ConfigMap. When a contender successfully performs a versioned annotation
>> > update operation to the ConfigMap, it means that it has been elected as
>> the
>> > leader. And it will write the leader information in the callback of
>> leader
>> > elector[1]. The Kubernetes resource version will help us to avoid the
>> > leader ConfigMap is wrongly updated.
>> >
>> > 2. The lock and release is really a valid concern. Actually in current
>> > design, we could not guarantee that the node who tries to write his
>> > ownership is the real leader. Who writes later, who is the owner. To
>> > address this issue, we need to store all the owners of the key. Only
>> when
>> > the owner is empty, the specific key(means a checkpoint or job graph)
>> could
>> > be deleted. However, we may have a residual checkpoint or job graph when
>> > the old JobManager crashed exceptionally and do not release the lock. To
>> > solve this problem completely, we need a timestamp renew mechanism
>> > for CompletedCheckpointStore and JobGraphStore, which could help us to
>> the
>> > check the JobManager timeout and then clean up the residual keys.
>> >
>> > 3. Frankly speaking, I am not against with this solution. However, in my
>> > opinion, it is more like a temporary proposal. We could use StatefulSet
>> to
>> > avoid leader election and leader retrieval. But I am not sure whether
>> > TaskManager could properly handle the situation that same hostname with
>> > different IPs, because the JobManager failed and relaunched. Also we may
>> > still have two JobManagers running in some corner cases(e.g. kubelet is
>> > down but the pod is running). Another concern is we have a strong
>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But
>> it
>> > is not always true especially in self-build Kubernetes cluster.
>> Moreover,
>> > PV provider should guarantee that each PV could only be mounted once.
>> Since
>> > the native HA proposal could cover all the functionality of StatefulSet
>> > proposal, that's why I prefer the former.
>> >
>> >
>> > [1].
>> >
>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>> >
>> > Best,
>> > Yang
>> >
>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>> >
>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>> users
>> >> will like a ZooKeeper-less HA setup.
>> >>
>> >> +1 for not separating the leader information and the leader election if
>> >> possible. Maybe it is even possible that the contender writes his
>> leader
>> >> information directly when trying to obtain the leadership by
>> performing a
>> >> versioned write operation.
>> >>
>> >> Concerning the lock and release operation I have a question: Can there
>> be
>> >> multiple owners for a given key-value pair in a ConfigMap? If not, how
>> can
>> >> we ensure that the node which writes his ownership is actually the
>> leader
>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>> problem
>> >> (we should probably change it at some point to simply use a
>> >> transaction which checks whether the writer is still the leader) and
>> >> therefore introduced the ephemeral lock nodes. What they allow is that
>> >> there can be multiple owners of a given ZNode at a time. The last owner
>> >> will then be responsible for the cleanup of the node.
>> >>
>> >> I see the benefit of your proposal over the stateful set proposal
>> because
>> >> it can support multiple standby JMs. Given the problem of locking
>> key-value
>> >> pairs it might be simpler to start with this approach where we only
>> have
>> >> single JM. This might already add a lot of benefits for our users. Was
>> >> there a specific reason why you discarded this proposal (other than
>> >> generality)?
>> >>
>> >> @Uce it would be great to hear your feedback on the proposal since you
>> >> already implemented a K8s based HA service.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>> wrote:
>> >>
>> >>> Hi Xintong and Stephan,
>> >>>
>> >>> Thanks a lot for your attention on this FLIP. I will address the
>> >>> comments inline.
>> >>>
>> >>> # Architecture -> One or two ConfigMaps
>> >>>
>> >>> Both of you are right. One ConfigMap will make the design and
>> >>> implementation easier. Actually, in my POC codes,
>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>> >>> server component) for the leader election
>> >>> and storage. Once a JobManager win the election, it will update the
>> >>> ConfigMap with leader address and periodically
>> >>> renew the lock annotation to keep as the active leader. I will update
>> >>> the FLIP document, including the architecture diagram,
>> >>> to avoid the misunderstanding.
>> >>>
>> >>>
>> >>> # HA storage > Lock and release
>> >>>
>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will
>> be
>> >>> deleted by the ZK server automatically when
>> >>> the client is timeout. It could happen in a bad network environment or
>> >>> the ZK client crashed exceptionally. For Kubernetes,
>> >>> we need to implement a similar mechanism. First, when we want to lock
>> a
>> >>> specific key in ConfigMap, we will put the owner identify,
>> >>> lease duration, renew time in the ConfigMap annotation. The annotation
>> >>> will be cleaned up when releasing the lock. When
>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>> >>> following conditions. If not, the delete operation could not be done.
>> >>> * Current instance is the owner of the key.
>> >>> * The owner annotation is empty, which means the owner has released
>> the
>> >>> lock.
>> >>> * The owner annotation timed out, which usually indicate the owner
>> died.
>> >>>
>> >>>
>> >>> # HA storage > HA data clean up
>> >>>
>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>> >>> we set owner of the flink-conf configmap, service and TaskManager pods
>> >>> to JobManager Deployment. So when we want to
>> >>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>> >>> the HA related ConfigMaps, we do not set the owner
>> >>> so that they could be retained even though we delete the whole Flink
>> >>> cluster.
>> >>>
>> >>>
>> >>> [1].
>> >>>
>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>> >>> [2].
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>> >>>
>> >>>
>> >>> Best,
>> >>> Yang
>> >>>
>> >>>
>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>> >>>
>> >>>> This is a very cool feature proposal.
>> >>>>
>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>> >>>> complicated to have the Leader RPC address in a different node than
>> the
>> >>>> LeaderLock. There is extra code needed to make sure these converge
>> and the
>> >>>> can be temporarily out of sync.
>> >>>>
>> >>>> A much easier design would be to have the RPC address as payload in
>> the
>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing token
>> is
>> >>>> stored as payload of the lock.
>> >>>> I think for the design above it would mean having a single ConfigMap
>> >>>> for both leader lock and leader RPC address discovery.
>> >>>>
>> >>>> This probably serves as a good design principle in general - not
>> divide
>> >>>> information that is updated together over different resources.
>> >>>>
>> >>>> Best,
>> >>>> Stephan
>> >>>>
>> >>>>
>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <tonysong820@gmail.com
>> >
>> >>>> wrote:
>> >>>>
>> >>>>> Thanks for preparing this FLIP, @Yang.
>> >>>>>
>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>> reduce the
>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>> this is an
>> >>>>> attractive feature for users.
>> >>>>>
>> >>>>> Concerning the proposed design, I have some questions. Might not be
>> >>>>> problems, just trying to understand.
>> >>>>>
>> >>>>> ## Architecture
>> >>>>>
>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>> contending
>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>> ConfigMaps are
>> >>>>> not updated consistently? E.g., a TM learns about a new JM becoming
>> leader
>> >>>>> (lock for contending leader updated), but still gets the old
>> leader's
>> >>>>> address when trying to read `leader RPC address`?
>> >>>>>
>> >>>>> ## HA storage > Lock and release
>> >>>>>
>> >>>>> It seems to me that the owner needs to explicitly release the lock
>> so
>> >>>>> that other peers can write/remove the stored object. What if the
>> previous
>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>> Would there
>> >>>>> be any problem?
>> >>>>>
>> >>>>> ## HA storage > HA data clean up
>> >>>>>
>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>> <ClusterID>`,
>> >>>>> how are the HA dada retained?
>> >>>>>
>> >>>>>
>> >>>>> Thank you~
>> >>>>>
>> >>>>> Xintong Song
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi devs and users,
>> >>>>>>
>> >>>>>> I would like to start the discussion about FLIP-144[1], which will
>> >>>>>> introduce
>> >>>>>> a new native high availability service for Kubernetes.
>> >>>>>>
>> >>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>> >>>>>> used
>> >>>>>> in production environments. It could be integrated in standalone
>> >>>>>> cluster,
>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>> K8s
>> >>>>>> will take additional cost since we need to manage a Zookeeper
>> cluster.
>> >>>>>> In the meantime, K8s has provided some public API for leader
>> >>>>>> election[2]
>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>> these
>> >>>>>> features and make running HA configured Flink cluster on K8s more
>> >>>>>> convenient.
>> >>>>>>
>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>> new
>> >>>>>> introduced KubernetesHaService.
>> >>>>>>
>> >>>>>> [1].
>> >>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>> >>>>>> [2].
>> >>>>>>
>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>> >>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>> >>>>>>
>> >>>>>> Looking forward to your feedback.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Yang
>> >>>>>>
>> >>>>>
>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
Thanks till and tison for your comments.

@Till Rohrmann <tr...@apache.org>
1. I am afraid we could not do this if we are going to use fabric8
Kubernetes client SDK for the leader election. The official Kubernetes Java
client[1] also could not support it. Unless we implement a new
LeaderElector in Flink based on the very basic Kubernetes API. But it seems
that we could gain too much from this.

2. Yes, the implementation will be a little complicated if we want to
completely eliminate the residual job graphs or checkpoints. Inspired by
your suggestion, another different solution has come into my mind. We could
use a same ConfigMap storing the JobManager leader, job graph,
checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
the HA meta storage. Then it will be easier to guarantee that only the
leader could write the ConfigMap in a transactional operation. Since
“Get(check the leader)-and-Update(write back to the ConfigMap)” is a
transactional operation.

3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
we still have the chances that two JobManager are running and trying to
get/delete a key in the same ConfigMap concurrently. Imagine that the
kubelet(like NodeManager in YARN) is down, and then the JobManager could
not be deleted. A new JobManager pod will be launched. We are just in the
similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
is we do not need to implement a leader election/retrieval service.

@tison
Actually, I do not think we will have such issue in the Kubernetes HA
service. In the Kubernetes LeaderElector[2], we have the leader information
stored on the annotation of leader ConfigMap. So it would not happen the
old leader could wrongly override the leader information. Once a JobManager
want to write his leader information to the ConfigMap, it will check
whether it is the leader now. If not, anything will happen. Moreover, the
Kubernetes Resource Version[3] ensures that no one else has snuck in and
written a different update while the client was in the process of
performing its update.


[1].
https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
[2].
https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
<https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
[3].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion


Best,
Yang

tison <wa...@gmail.com> 于2020年9月30日周三 下午3:21写道:

> Hi,
>
> Generally +1 for a native k8s HA service.
>
> For leader election & publish leader information, there was a discussion[1]
> pointed out that since these two actions is NOT atomic, there will be
> always
> edge case where a previous leader overwrite leader information, even with
> versioned write. Versioned write helps on read again if version mismatches
> so if we want version write works, information in the kv pair should help
> the
> contender reflects whether it is the current leader.
>
> The idea of writes leader information on contender node or something
> equivalent makes sense but the details depends on how it is implemented.
> General problems are that
>
> 1. TM might be a bit late before it updated correct leader information but
> only if the leader election process is short and leadership is stable at
> most
> time, it won't be a serious issue.
> 2. The process TM extract leader information might be a bit more complex
> than directly watching a fixed key.
>
> Atomic issue can be addressed if one leverages low APIs such as lease & txn
> but it causes more developing efforts. ConfigMap and encapsulated
> interface,
> thought, provides only a self-consistent mechanism which doesn't promise
> more consistency for extension.
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>
>
>
> Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:
>
>> For 1. I was wondering whether we can't write the leader connection
>> information directly when trying to obtain the leadership (trying to
>> update
>> the leader key with one's own value)? This might be a little detail,
>> though.
>>
>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>> with the ephemeral lock nodes. I guess that this complicates the
>> implementation a bit, unfortunately.
>>
>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>> configure a different persistent storage like HDFS or S3 for storing the
>> checkpoints and job blobs like in the ZooKeeper case. The current benefit
>> I
>> see is that we avoid having to implement this multi locking mechanism in
>> the ConfigMaps using the annotations because we can be sure that there is
>> only a single leader at a time if I understood the guarantees of K8s
>> correctly.
>>
>> Cheers,
>> Till
>>
>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:
>>
>> > Hi Till, thanks for your valuable feedback.
>> >
>> > 1. Yes, leader election and storing leader information will use a same
>> > ConfigMap. When a contender successfully performs a versioned annotation
>> > update operation to the ConfigMap, it means that it has been elected as
>> the
>> > leader. And it will write the leader information in the callback of
>> leader
>> > elector[1]. The Kubernetes resource version will help us to avoid the
>> > leader ConfigMap is wrongly updated.
>> >
>> > 2. The lock and release is really a valid concern. Actually in current
>> > design, we could not guarantee that the node who tries to write his
>> > ownership is the real leader. Who writes later, who is the owner. To
>> > address this issue, we need to store all the owners of the key. Only
>> when
>> > the owner is empty, the specific key(means a checkpoint or job graph)
>> could
>> > be deleted. However, we may have a residual checkpoint or job graph when
>> > the old JobManager crashed exceptionally and do not release the lock. To
>> > solve this problem completely, we need a timestamp renew mechanism
>> > for CompletedCheckpointStore and JobGraphStore, which could help us to
>> the
>> > check the JobManager timeout and then clean up the residual keys.
>> >
>> > 3. Frankly speaking, I am not against with this solution. However, in my
>> > opinion, it is more like a temporary proposal. We could use StatefulSet
>> to
>> > avoid leader election and leader retrieval. But I am not sure whether
>> > TaskManager could properly handle the situation that same hostname with
>> > different IPs, because the JobManager failed and relaunched. Also we may
>> > still have two JobManagers running in some corner cases(e.g. kubelet is
>> > down but the pod is running). Another concern is we have a strong
>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But
>> it
>> > is not always true especially in self-build Kubernetes cluster.
>> Moreover,
>> > PV provider should guarantee that each PV could only be mounted once.
>> Since
>> > the native HA proposal could cover all the functionality of StatefulSet
>> > proposal, that's why I prefer the former.
>> >
>> >
>> > [1].
>> >
>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>> >
>> > Best,
>> > Yang
>> >
>> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>> >
>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>> users
>> >> will like a ZooKeeper-less HA setup.
>> >>
>> >> +1 for not separating the leader information and the leader election if
>> >> possible. Maybe it is even possible that the contender writes his
>> leader
>> >> information directly when trying to obtain the leadership by
>> performing a
>> >> versioned write operation.
>> >>
>> >> Concerning the lock and release operation I have a question: Can there
>> be
>> >> multiple owners for a given key-value pair in a ConfigMap? If not, how
>> can
>> >> we ensure that the node which writes his ownership is actually the
>> leader
>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>> problem
>> >> (we should probably change it at some point to simply use a
>> >> transaction which checks whether the writer is still the leader) and
>> >> therefore introduced the ephemeral lock nodes. What they allow is that
>> >> there can be multiple owners of a given ZNode at a time. The last owner
>> >> will then be responsible for the cleanup of the node.
>> >>
>> >> I see the benefit of your proposal over the stateful set proposal
>> because
>> >> it can support multiple standby JMs. Given the problem of locking
>> key-value
>> >> pairs it might be simpler to start with this approach where we only
>> have
>> >> single JM. This might already add a lot of benefits for our users. Was
>> >> there a specific reason why you discarded this proposal (other than
>> >> generality)?
>> >>
>> >> @Uce it would be great to hear your feedback on the proposal since you
>> >> already implemented a K8s based HA service.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
>> wrote:
>> >>
>> >>> Hi Xintong and Stephan,
>> >>>
>> >>> Thanks a lot for your attention on this FLIP. I will address the
>> >>> comments inline.
>> >>>
>> >>> # Architecture -> One or two ConfigMaps
>> >>>
>> >>> Both of you are right. One ConfigMap will make the design and
>> >>> implementation easier. Actually, in my POC codes,
>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>> >>> server component) for the leader election
>> >>> and storage. Once a JobManager win the election, it will update the
>> >>> ConfigMap with leader address and periodically
>> >>> renew the lock annotation to keep as the active leader. I will update
>> >>> the FLIP document, including the architecture diagram,
>> >>> to avoid the misunderstanding.
>> >>>
>> >>>
>> >>> # HA storage > Lock and release
>> >>>
>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will
>> be
>> >>> deleted by the ZK server automatically when
>> >>> the client is timeout. It could happen in a bad network environment or
>> >>> the ZK client crashed exceptionally. For Kubernetes,
>> >>> we need to implement a similar mechanism. First, when we want to lock
>> a
>> >>> specific key in ConfigMap, we will put the owner identify,
>> >>> lease duration, renew time in the ConfigMap annotation. The annotation
>> >>> will be cleaned up when releasing the lock. When
>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>> >>> following conditions. If not, the delete operation could not be done.
>> >>> * Current instance is the owner of the key.
>> >>> * The owner annotation is empty, which means the owner has released
>> the
>> >>> lock.
>> >>> * The owner annotation timed out, which usually indicate the owner
>> died.
>> >>>
>> >>>
>> >>> # HA storage > HA data clean up
>> >>>
>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>> >>> we set owner of the flink-conf configmap, service and TaskManager pods
>> >>> to JobManager Deployment. So when we want to
>> >>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>> >>> the HA related ConfigMaps, we do not set the owner
>> >>> so that they could be retained even though we delete the whole Flink
>> >>> cluster.
>> >>>
>> >>>
>> >>> [1].
>> >>>
>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>> >>> [2].
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>> >>>
>> >>>
>> >>> Best,
>> >>> Yang
>> >>>
>> >>>
>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>> >>>
>> >>>> This is a very cool feature proposal.
>> >>>>
>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>> >>>> complicated to have the Leader RPC address in a different node than
>> the
>> >>>> LeaderLock. There is extra code needed to make sure these converge
>> and the
>> >>>> can be temporarily out of sync.
>> >>>>
>> >>>> A much easier design would be to have the RPC address as payload in
>> the
>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing token
>> is
>> >>>> stored as payload of the lock.
>> >>>> I think for the design above it would mean having a single ConfigMap
>> >>>> for both leader lock and leader RPC address discovery.
>> >>>>
>> >>>> This probably serves as a good design principle in general - not
>> divide
>> >>>> information that is updated together over different resources.
>> >>>>
>> >>>> Best,
>> >>>> Stephan
>> >>>>
>> >>>>
>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <tonysong820@gmail.com
>> >
>> >>>> wrote:
>> >>>>
>> >>>>> Thanks for preparing this FLIP, @Yang.
>> >>>>>
>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>> reduce the
>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>> this is an
>> >>>>> attractive feature for users.
>> >>>>>
>> >>>>> Concerning the proposed design, I have some questions. Might not be
>> >>>>> problems, just trying to understand.
>> >>>>>
>> >>>>> ## Architecture
>> >>>>>
>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>> contending
>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>> ConfigMaps are
>> >>>>> not updated consistently? E.g., a TM learns about a new JM becoming
>> leader
>> >>>>> (lock for contending leader updated), but still gets the old
>> leader's
>> >>>>> address when trying to read `leader RPC address`?
>> >>>>>
>> >>>>> ## HA storage > Lock and release
>> >>>>>
>> >>>>> It seems to me that the owner needs to explicitly release the lock
>> so
>> >>>>> that other peers can write/remove the stored object. What if the
>> previous
>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>> Would there
>> >>>>> be any problem?
>> >>>>>
>> >>>>> ## HA storage > HA data clean up
>> >>>>>
>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>> <ClusterID>`,
>> >>>>> how are the HA dada retained?
>> >>>>>
>> >>>>>
>> >>>>> Thank you~
>> >>>>>
>> >>>>> Xintong Song
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi devs and users,
>> >>>>>>
>> >>>>>> I would like to start the discussion about FLIP-144[1], which will
>> >>>>>> introduce
>> >>>>>> a new native high availability service for Kubernetes.
>> >>>>>>
>> >>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>> >>>>>> used
>> >>>>>> in production environments. It could be integrated in standalone
>> >>>>>> cluster,
>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>> K8s
>> >>>>>> will take additional cost since we need to manage a Zookeeper
>> cluster.
>> >>>>>> In the meantime, K8s has provided some public API for leader
>> >>>>>> election[2]
>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>> these
>> >>>>>> features and make running HA configured Flink cluster on K8s more
>> >>>>>> convenient.
>> >>>>>>
>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>> new
>> >>>>>> introduced KubernetesHaService.
>> >>>>>>
>> >>>>>> [1].
>> >>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>> >>>>>> [2].
>> >>>>>>
>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>> >>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>> >>>>>>
>> >>>>>> Looking forward to your feedback.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Yang
>> >>>>>>
>> >>>>>
>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by tison <wa...@gmail.com>.
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help
the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but
only if the leader election process is short and leadership is stable at
most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E



Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:

> For 1. I was wondering whether we can't write the leader connection
> information directly when trying to obtain the leadership (trying to update
> the leader key with one's own value)? This might be a little detail,
> though.
>
> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
> with the ephemeral lock nodes. I guess that this complicates the
> implementation a bit, unfortunately.
>
> 3. Wouldn't the StatefulSet solution also work without a PV? One could
> configure a different persistent storage like HDFS or S3 for storing the
> checkpoints and job blobs like in the ZooKeeper case. The current benefit I
> see is that we avoid having to implement this multi locking mechanism in
> the ConfigMaps using the annotations because we can be sure that there is
> only a single leader at a time if I understood the guarantees of K8s
> correctly.
>
> Cheers,
> Till
>
> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:
>
> > Hi Till, thanks for your valuable feedback.
> >
> > 1. Yes, leader election and storing leader information will use a same
> > ConfigMap. When a contender successfully performs a versioned annotation
> > update operation to the ConfigMap, it means that it has been elected as
> the
> > leader. And it will write the leader information in the callback of
> leader
> > elector[1]. The Kubernetes resource version will help us to avoid the
> > leader ConfigMap is wrongly updated.
> >
> > 2. The lock and release is really a valid concern. Actually in current
> > design, we could not guarantee that the node who tries to write his
> > ownership is the real leader. Who writes later, who is the owner. To
> > address this issue, we need to store all the owners of the key. Only when
> > the owner is empty, the specific key(means a checkpoint or job graph)
> could
> > be deleted. However, we may have a residual checkpoint or job graph when
> > the old JobManager crashed exceptionally and do not release the lock. To
> > solve this problem completely, we need a timestamp renew mechanism
> > for CompletedCheckpointStore and JobGraphStore, which could help us to
> the
> > check the JobManager timeout and then clean up the residual keys.
> >
> > 3. Frankly speaking, I am not against with this solution. However, in my
> > opinion, it is more like a temporary proposal. We could use StatefulSet
> to
> > avoid leader election and leader retrieval. But I am not sure whether
> > TaskManager could properly handle the situation that same hostname with
> > different IPs, because the JobManager failed and relaunched. Also we may
> > still have two JobManagers running in some corner cases(e.g. kubelet is
> > down but the pod is running). Another concern is we have a strong
> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> > is not always true especially in self-build Kubernetes cluster. Moreover,
> > PV provider should guarantee that each PV could only be mounted once.
> Since
> > the native HA proposal could cover all the functionality of StatefulSet
> > proposal, that's why I prefer the former.
> >
> >
> > [1].
> >
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >
> > Best,
> > Yang
> >
> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
> >
> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
> users
> >> will like a ZooKeeper-less HA setup.
> >>
> >> +1 for not separating the leader information and the leader election if
> >> possible. Maybe it is even possible that the contender writes his leader
> >> information directly when trying to obtain the leadership by performing
> a
> >> versioned write operation.
> >>
> >> Concerning the lock and release operation I have a question: Can there
> be
> >> multiple owners for a given key-value pair in a ConfigMap? If not, how
> can
> >> we ensure that the node which writes his ownership is actually the
> leader
> >> w/o transactional support from K8s? In ZooKeeper we had the same problem
> >> (we should probably change it at some point to simply use a
> >> transaction which checks whether the writer is still the leader) and
> >> therefore introduced the ephemeral lock nodes. What they allow is that
> >> there can be multiple owners of a given ZNode at a time. The last owner
> >> will then be responsible for the cleanup of the node.
> >>
> >> I see the benefit of your proposal over the stateful set proposal
> because
> >> it can support multiple standby JMs. Given the problem of locking
> key-value
> >> pairs it might be simpler to start with this approach where we only have
> >> single JM. This might already add a lot of benefits for our users. Was
> >> there a specific reason why you discarded this proposal (other than
> >> generality)?
> >>
> >> @Uce it would be great to hear your feedback on the proposal since you
> >> already implemented a K8s based HA service.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
> wrote:
> >>
> >>> Hi Xintong and Stephan,
> >>>
> >>> Thanks a lot for your attention on this FLIP. I will address the
> >>> comments inline.
> >>>
> >>> # Architecture -> One or two ConfigMaps
> >>>
> >>> Both of you are right. One ConfigMap will make the design and
> >>> implementation easier. Actually, in my POC codes,
> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
> >>> server component) for the leader election
> >>> and storage. Once a JobManager win the election, it will update the
> >>> ConfigMap with leader address and periodically
> >>> renew the lock annotation to keep as the active leader. I will update
> >>> the FLIP document, including the architecture diagram,
> >>> to avoid the misunderstanding.
> >>>
> >>>
> >>> # HA storage > Lock and release
> >>>
> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will
> be
> >>> deleted by the ZK server automatically when
> >>> the client is timeout. It could happen in a bad network environment or
> >>> the ZK client crashed exceptionally. For Kubernetes,
> >>> we need to implement a similar mechanism. First, when we want to lock a
> >>> specific key in ConfigMap, we will put the owner identify,
> >>> lease duration, renew time in the ConfigMap annotation. The annotation
> >>> will be cleaned up when releasing the lock. When
> >>> we want to remove a job graph or checkpoints, it should satisfy the
> >>> following conditions. If not, the delete operation could not be done.
> >>> * Current instance is the owner of the key.
> >>> * The owner annotation is empty, which means the owner has released the
> >>> lock.
> >>> * The owner annotation timed out, which usually indicate the owner
> died.
> >>>
> >>>
> >>> # HA storage > HA data clean up
> >>>
> >>> Sorry for that I do not describe how the HA related ConfigMap is
> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
> >>> we set owner of the flink-conf configmap, service and TaskManager pods
> >>> to JobManager Deployment. So when we want to
> >>> destroy a Flink cluster, we just need to delete the deployment[2]. For
> >>> the HA related ConfigMaps, we do not set the owner
> >>> so that they could be retained even though we delete the whole Flink
> >>> cluster.
> >>>
> >>>
> >>> [1].
> >>>
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> >>> [2].
> >>>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
> >>>
> >>>
> >>> Best,
> >>> Yang
> >>>
> >>>
> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
> >>>
> >>>> This is a very cool feature proposal.
> >>>>
> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
> >>>> complicated to have the Leader RPC address in a different node than
> the
> >>>> LeaderLock. There is extra code needed to make sure these converge
> and the
> >>>> can be temporarily out of sync.
> >>>>
> >>>> A much easier design would be to have the RPC address as payload in
> the
> >>>> lock entry (ZNode in ZK), the same way that the leader fencing token
> is
> >>>> stored as payload of the lock.
> >>>> I think for the design above it would mean having a single ConfigMap
> >>>> for both leader lock and leader RPC address discovery.
> >>>>
> >>>> This probably serves as a good design principle in general - not
> divide
> >>>> information that is updated together over different resources.
> >>>>
> >>>> Best,
> >>>> Stephan
> >>>>
> >>>>
> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Thanks for preparing this FLIP, @Yang.
> >>>>>
> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
> >>>>> buildtin ConfigMap for Flink's HA services should significantly
> reduce the
> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
> this is an
> >>>>> attractive feature for users.
> >>>>>
> >>>>> Concerning the proposed design, I have some questions. Might not be
> >>>>> problems, just trying to understand.
> >>>>>
> >>>>> ## Architecture
> >>>>>
> >>>>> Why does the leader election need two ConfigMaps (`lock for
> contending
> >>>>> leader`, and `leader RPC address`)? What happens if the two
> ConfigMaps are
> >>>>> not updated consistently? E.g., a TM learns about a new JM becoming
> leader
> >>>>> (lock for contending leader updated), but still gets the old leader's
> >>>>> address when trying to read `leader RPC address`?
> >>>>>
> >>>>> ## HA storage > Lock and release
> >>>>>
> >>>>> It seems to me that the owner needs to explicitly release the lock so
> >>>>> that other peers can write/remove the stored object. What if the
> previous
> >>>>> owner failed to release the lock (e.g., dead before releasing)?
> Would there
> >>>>> be any problem?
> >>>>>
> >>>>> ## HA storage > HA data clean up
> >>>>>
> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
> >>>>> how are the HA dada retained?
> >>>>>
> >>>>>
> >>>>> Thank you~
> >>>>>
> >>>>> Xintong Song
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi devs and users,
> >>>>>>
> >>>>>> I would like to start the discussion about FLIP-144[1], which will
> >>>>>> introduce
> >>>>>> a new native high availability service for Kubernetes.
> >>>>>>
> >>>>>> Currently, Flink has provided Zookeeper HA service and been widely
> >>>>>> used
> >>>>>> in production environments. It could be integrated in standalone
> >>>>>> cluster,
> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
> >>>>>> will take additional cost since we need to manage a Zookeeper
> cluster.
> >>>>>> In the meantime, K8s has provided some public API for leader
> >>>>>> election[2]
> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
> these
> >>>>>> features and make running HA configured Flink cluster on K8s more
> >>>>>> convenient.
> >>>>>>
> >>>>>> Both the standalone on K8s and native K8s could benefit from the new
> >>>>>> introduced KubernetesHaService.
> >>>>>>
> >>>>>> [1].
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> >>>>>> [2].
> >>>>>>
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> >>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
> >>>>>>
> >>>>>> Looking forward to your feedback.
> >>>>>>
> >>>>>> Best,
> >>>>>> Yang
> >>>>>>
> >>>>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by tison <wa...@gmail.com>.
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help
the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but
only if the leader election process is short and leadership is stable at
most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E



Till Rohrmann <tr...@apache.org> 于2020年9月29日周二 下午9:25写道:

> For 1. I was wondering whether we can't write the leader connection
> information directly when trying to obtain the leadership (trying to update
> the leader key with one's own value)? This might be a little detail,
> though.
>
> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
> with the ephemeral lock nodes. I guess that this complicates the
> implementation a bit, unfortunately.
>
> 3. Wouldn't the StatefulSet solution also work without a PV? One could
> configure a different persistent storage like HDFS or S3 for storing the
> checkpoints and job blobs like in the ZooKeeper case. The current benefit I
> see is that we avoid having to implement this multi locking mechanism in
> the ConfigMaps using the annotations because we can be sure that there is
> only a single leader at a time if I understood the guarantees of K8s
> correctly.
>
> Cheers,
> Till
>
> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:
>
> > Hi Till, thanks for your valuable feedback.
> >
> > 1. Yes, leader election and storing leader information will use a same
> > ConfigMap. When a contender successfully performs a versioned annotation
> > update operation to the ConfigMap, it means that it has been elected as
> the
> > leader. And it will write the leader information in the callback of
> leader
> > elector[1]. The Kubernetes resource version will help us to avoid the
> > leader ConfigMap is wrongly updated.
> >
> > 2. The lock and release is really a valid concern. Actually in current
> > design, we could not guarantee that the node who tries to write his
> > ownership is the real leader. Who writes later, who is the owner. To
> > address this issue, we need to store all the owners of the key. Only when
> > the owner is empty, the specific key(means a checkpoint or job graph)
> could
> > be deleted. However, we may have a residual checkpoint or job graph when
> > the old JobManager crashed exceptionally and do not release the lock. To
> > solve this problem completely, we need a timestamp renew mechanism
> > for CompletedCheckpointStore and JobGraphStore, which could help us to
> the
> > check the JobManager timeout and then clean up the residual keys.
> >
> > 3. Frankly speaking, I am not against with this solution. However, in my
> > opinion, it is more like a temporary proposal. We could use StatefulSet
> to
> > avoid leader election and leader retrieval. But I am not sure whether
> > TaskManager could properly handle the situation that same hostname with
> > different IPs, because the JobManager failed and relaunched. Also we may
> > still have two JobManagers running in some corner cases(e.g. kubelet is
> > down but the pod is running). Another concern is we have a strong
> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> > is not always true especially in self-build Kubernetes cluster. Moreover,
> > PV provider should guarantee that each PV could only be mounted once.
> Since
> > the native HA proposal could cover all the functionality of StatefulSet
> > proposal, that's why I prefer the former.
> >
> >
> > [1].
> >
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
> >
> > Best,
> > Yang
> >
> > Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
> >
> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
> users
> >> will like a ZooKeeper-less HA setup.
> >>
> >> +1 for not separating the leader information and the leader election if
> >> possible. Maybe it is even possible that the contender writes his leader
> >> information directly when trying to obtain the leadership by performing
> a
> >> versioned write operation.
> >>
> >> Concerning the lock and release operation I have a question: Can there
> be
> >> multiple owners for a given key-value pair in a ConfigMap? If not, how
> can
> >> we ensure that the node which writes his ownership is actually the
> leader
> >> w/o transactional support from K8s? In ZooKeeper we had the same problem
> >> (we should probably change it at some point to simply use a
> >> transaction which checks whether the writer is still the leader) and
> >> therefore introduced the ephemeral lock nodes. What they allow is that
> >> there can be multiple owners of a given ZNode at a time. The last owner
> >> will then be responsible for the cleanup of the node.
> >>
> >> I see the benefit of your proposal over the stateful set proposal
> because
> >> it can support multiple standby JMs. Given the problem of locking
> key-value
> >> pairs it might be simpler to start with this approach where we only have
> >> single JM. This might already add a lot of benefits for our users. Was
> >> there a specific reason why you discarded this proposal (other than
> >> generality)?
> >>
> >> @Uce it would be great to hear your feedback on the proposal since you
> >> already implemented a K8s based HA service.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com>
> wrote:
> >>
> >>> Hi Xintong and Stephan,
> >>>
> >>> Thanks a lot for your attention on this FLIP. I will address the
> >>> comments inline.
> >>>
> >>> # Architecture -> One or two ConfigMaps
> >>>
> >>> Both of you are right. One ConfigMap will make the design and
> >>> implementation easier. Actually, in my POC codes,
> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
> >>> server component) for the leader election
> >>> and storage. Once a JobManager win the election, it will update the
> >>> ConfigMap with leader address and periodically
> >>> renew the lock annotation to keep as the active leader. I will update
> >>> the FLIP document, including the architecture diagram,
> >>> to avoid the misunderstanding.
> >>>
> >>>
> >>> # HA storage > Lock and release
> >>>
> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will
> be
> >>> deleted by the ZK server automatically when
> >>> the client is timeout. It could happen in a bad network environment or
> >>> the ZK client crashed exceptionally. For Kubernetes,
> >>> we need to implement a similar mechanism. First, when we want to lock a
> >>> specific key in ConfigMap, we will put the owner identify,
> >>> lease duration, renew time in the ConfigMap annotation. The annotation
> >>> will be cleaned up when releasing the lock. When
> >>> we want to remove a job graph or checkpoints, it should satisfy the
> >>> following conditions. If not, the delete operation could not be done.
> >>> * Current instance is the owner of the key.
> >>> * The owner annotation is empty, which means the owner has released the
> >>> lock.
> >>> * The owner annotation timed out, which usually indicate the owner
> died.
> >>>
> >>>
> >>> # HA storage > HA data clean up
> >>>
> >>> Sorry for that I do not describe how the HA related ConfigMap is
> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
> >>> we set owner of the flink-conf configmap, service and TaskManager pods
> >>> to JobManager Deployment. So when we want to
> >>> destroy a Flink cluster, we just need to delete the deployment[2]. For
> >>> the HA related ConfigMaps, we do not set the owner
> >>> so that they could be retained even though we delete the whole Flink
> >>> cluster.
> >>>
> >>>
> >>> [1].
> >>>
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> >>> [2].
> >>>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
> >>>
> >>>
> >>> Best,
> >>> Yang
> >>>
> >>>
> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
> >>>
> >>>> This is a very cool feature proposal.
> >>>>
> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
> >>>> complicated to have the Leader RPC address in a different node than
> the
> >>>> LeaderLock. There is extra code needed to make sure these converge
> and the
> >>>> can be temporarily out of sync.
> >>>>
> >>>> A much easier design would be to have the RPC address as payload in
> the
> >>>> lock entry (ZNode in ZK), the same way that the leader fencing token
> is
> >>>> stored as payload of the lock.
> >>>> I think for the design above it would mean having a single ConfigMap
> >>>> for both leader lock and leader RPC address discovery.
> >>>>
> >>>> This probably serves as a good design principle in general - not
> divide
> >>>> information that is updated together over different resources.
> >>>>
> >>>> Best,
> >>>> Stephan
> >>>>
> >>>>
> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Thanks for preparing this FLIP, @Yang.
> >>>>>
> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
> >>>>> buildtin ConfigMap for Flink's HA services should significantly
> reduce the
> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
> this is an
> >>>>> attractive feature for users.
> >>>>>
> >>>>> Concerning the proposed design, I have some questions. Might not be
> >>>>> problems, just trying to understand.
> >>>>>
> >>>>> ## Architecture
> >>>>>
> >>>>> Why does the leader election need two ConfigMaps (`lock for
> contending
> >>>>> leader`, and `leader RPC address`)? What happens if the two
> ConfigMaps are
> >>>>> not updated consistently? E.g., a TM learns about a new JM becoming
> leader
> >>>>> (lock for contending leader updated), but still gets the old leader's
> >>>>> address when trying to read `leader RPC address`?
> >>>>>
> >>>>> ## HA storage > Lock and release
> >>>>>
> >>>>> It seems to me that the owner needs to explicitly release the lock so
> >>>>> that other peers can write/remove the stored object. What if the
> previous
> >>>>> owner failed to release the lock (e.g., dead before releasing)?
> Would there
> >>>>> be any problem?
> >>>>>
> >>>>> ## HA storage > HA data clean up
> >>>>>
> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
> >>>>> how are the HA dada retained?
> >>>>>
> >>>>>
> >>>>> Thank you~
> >>>>>
> >>>>> Xintong Song
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi devs and users,
> >>>>>>
> >>>>>> I would like to start the discussion about FLIP-144[1], which will
> >>>>>> introduce
> >>>>>> a new native high availability service for Kubernetes.
> >>>>>>
> >>>>>> Currently, Flink has provided Zookeeper HA service and been widely
> >>>>>> used
> >>>>>> in production environments. It could be integrated in standalone
> >>>>>> cluster,
> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
> >>>>>> will take additional cost since we need to manage a Zookeeper
> cluster.
> >>>>>> In the meantime, K8s has provided some public API for leader
> >>>>>> election[2]
> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
> these
> >>>>>> features and make running HA configured Flink cluster on K8s more
> >>>>>> convenient.
> >>>>>>
> >>>>>> Both the standalone on K8s and native K8s could benefit from the new
> >>>>>> introduced KubernetesHaService.
> >>>>>>
> >>>>>> [1].
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> >>>>>> [2].
> >>>>>>
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> >>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
> >>>>>>
> >>>>>> Looking forward to your feedback.
> >>>>>>
> >>>>>> Best,
> >>>>>> Yang
> >>>>>>
> >>>>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
For 1. I was wondering whether we can't write the leader connection
information directly when trying to obtain the leadership (trying to update
the leader key with one's own value)? This might be a little detail, though.

2. Alright, so we are having a similar mechanism as we have in ZooKeeper
with the ephemeral lock nodes. I guess that this complicates the
implementation a bit, unfortunately.

3. Wouldn't the StatefulSet solution also work without a PV? One could
configure a different persistent storage like HDFS or S3 for storing the
checkpoints and job blobs like in the ZooKeeper case. The current benefit I
see is that we avoid having to implement this multi locking mechanism in
the ConfigMaps using the annotations because we can be sure that there is
only a single leader at a time if I understood the guarantees of K8s
correctly.

Cheers,
Till

On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <da...@gmail.com> wrote:

> Hi Till, thanks for your valuable feedback.
>
> 1. Yes, leader election and storing leader information will use a same
> ConfigMap. When a contender successfully performs a versioned annotation
> update operation to the ConfigMap, it means that it has been elected as the
> leader. And it will write the leader information in the callback of leader
> elector[1]. The Kubernetes resource version will help us to avoid the
> leader ConfigMap is wrongly updated.
>
> 2. The lock and release is really a valid concern. Actually in current
> design, we could not guarantee that the node who tries to write his
> ownership is the real leader. Who writes later, who is the owner. To
> address this issue, we need to store all the owners of the key. Only when
> the owner is empty, the specific key(means a checkpoint or job graph) could
> be deleted. However, we may have a residual checkpoint or job graph when
> the old JobManager crashed exceptionally and do not release the lock. To
> solve this problem completely, we need a timestamp renew mechanism
> for CompletedCheckpointStore and JobGraphStore, which could help us to the
> check the JobManager timeout and then clean up the residual keys.
>
> 3. Frankly speaking, I am not against with this solution. However, in my
> opinion, it is more like a temporary proposal. We could use StatefulSet to
> avoid leader election and leader retrieval. But I am not sure whether
> TaskManager could properly handle the situation that same hostname with
> different IPs, because the JobManager failed and relaunched. Also we may
> still have two JobManagers running in some corner cases(e.g. kubelet is
> down but the pod is running). Another concern is we have a strong
> dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> is not always true especially in self-build Kubernetes cluster. Moreover,
> PV provider should guarantee that each PV could only be mounted once. Since
> the native HA proposal could cover all the functionality of StatefulSet
> proposal, that's why I prefer the former.
>
>
> [1].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:
>
>> Thanks for creating this FLIP Yang Wang. I believe that many of our users
>> will like a ZooKeeper-less HA setup.
>>
>> +1 for not separating the leader information and the leader election if
>> possible. Maybe it is even possible that the contender writes his leader
>> information directly when trying to obtain the leadership by performing a
>> versioned write operation.
>>
>> Concerning the lock and release operation I have a question: Can there be
>> multiple owners for a given key-value pair in a ConfigMap? If not, how can
>> we ensure that the node which writes his ownership is actually the leader
>> w/o transactional support from K8s? In ZooKeeper we had the same problem
>> (we should probably change it at some point to simply use a
>> transaction which checks whether the writer is still the leader) and
>> therefore introduced the ephemeral lock nodes. What they allow is that
>> there can be multiple owners of a given ZNode at a time. The last owner
>> will then be responsible for the cleanup of the node.
>>
>> I see the benefit of your proposal over the stateful set proposal because
>> it can support multiple standby JMs. Given the problem of locking key-value
>> pairs it might be simpler to start with this approach where we only have
>> single JM. This might already add a lot of benefits for our users. Was
>> there a specific reason why you discarded this proposal (other than
>> generality)?
>>
>> @Uce it would be great to hear your feedback on the proposal since you
>> already implemented a K8s based HA service.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com> wrote:
>>
>>> Hi Xintong and Stephan,
>>>
>>> Thanks a lot for your attention on this FLIP. I will address the
>>> comments inline.
>>>
>>> # Architecture -> One or two ConfigMaps
>>>
>>> Both of you are right. One ConfigMap will make the design and
>>> implementation easier. Actually, in my POC codes,
>>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> server component) for the leader election
>>> and storage. Once a JobManager win the election, it will update the
>>> ConfigMap with leader address and periodically
>>> renew the lock annotation to keep as the active leader. I will update
>>> the FLIP document, including the architecture diagram,
>>> to avoid the misunderstanding.
>>>
>>>
>>> # HA storage > Lock and release
>>>
>>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>>> deleted by the ZK server automatically when
>>> the client is timeout. It could happen in a bad network environment or
>>> the ZK client crashed exceptionally. For Kubernetes,
>>> we need to implement a similar mechanism. First, when we want to lock a
>>> specific key in ConfigMap, we will put the owner identify,
>>> lease duration, renew time in the ConfigMap annotation. The annotation
>>> will be cleaned up when releasing the lock. When
>>> we want to remove a job graph or checkpoints, it should satisfy the
>>> following conditions. If not, the delete operation could not be done.
>>> * Current instance is the owner of the key.
>>> * The owner annotation is empty, which means the owner has released the
>>> lock.
>>> * The owner annotation timed out, which usually indicate the owner died.
>>>
>>>
>>> # HA storage > HA data clean up
>>>
>>> Sorry for that I do not describe how the HA related ConfigMap is
>>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> we set owner of the flink-conf configmap, service and TaskManager pods
>>> to JobManager Deployment. So when we want to
>>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>>> the HA related ConfigMaps, we do not set the owner
>>> so that they could be retained even though we delete the whole Flink
>>> cluster.
>>>
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>
>>>> This is a very cool feature proposal.
>>>>
>>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> complicated to have the Leader RPC address in a different node than the
>>>> LeaderLock. There is extra code needed to make sure these converge and the
>>>> can be temporarily out of sync.
>>>>
>>>> A much easier design would be to have the RPC address as payload in the
>>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>>> stored as payload of the lock.
>>>> I think for the design above it would mean having a single ConfigMap
>>>> for both leader lock and leader RPC address discovery.
>>>>
>>>> This probably serves as a good design principle in general - not divide
>>>> information that is updated together over different resources.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks for preparing this FLIP, @Yang.
>>>>>
>>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>>> attractive feature for users.
>>>>>
>>>>> Concerning the proposed design, I have some questions. Might not be
>>>>> problems, just trying to understand.
>>>>>
>>>>> ## Architecture
>>>>>
>>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>>> (lock for contending leader updated), but still gets the old leader's
>>>>> address when trying to read `leader RPC address`?
>>>>>
>>>>> ## HA storage > Lock and release
>>>>>
>>>>> It seems to me that the owner needs to explicitly release the lock so
>>>>> that other peers can write/remove the stored object. What if the previous
>>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>>> be any problem?
>>>>>
>>>>> ## HA storage > HA data clean up
>>>>>
>>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>>> how are the HA dada retained?
>>>>>
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>>> introduce
>>>>>> a new native high availability service for Kubernetes.
>>>>>>
>>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>>>>> used
>>>>>> in production environments. It could be integrated in standalone
>>>>>> cluster,
>>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> election[2]
>>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>>> features and make running HA configured Flink cluster on K8s more
>>>>>> convenient.
>>>>>>
>>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>>> introduced KubernetesHaService.
>>>>>>
>>>>>> [1].
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> [2].
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>>
>>>>>> Looking forward to your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
Hi Till, thanks for your valuable feedback.

1. Yes, leader election and storing leader information will use a same
ConfigMap. When a contender successfully performs a versioned annotation
update operation to the ConfigMap, it means that it has been elected as the
leader. And it will write the leader information in the callback of leader
elector[1]. The Kubernetes resource version will help us to avoid the
leader ConfigMap is wrongly updated.

2. The lock and release is really a valid concern. Actually in current
design, we could not guarantee that the node who tries to write his
ownership is the real leader. Who writes later, who is the owner. To
address this issue, we need to store all the owners of the key. Only when
the owner is empty, the specific key(means a checkpoint or job graph) could
be deleted. However, we may have a residual checkpoint or job graph when
the old JobManager crashed exceptionally and do not release the lock. To
solve this problem completely, we need a timestamp renew mechanism
for CompletedCheckpointStore and JobGraphStore, which could help us to the
check the JobManager timeout and then clean up the residual keys.

3. Frankly speaking, I am not against with this solution. However, in my
opinion, it is more like a temporary proposal. We could use StatefulSet to
avoid leader election and leader retrieval. But I am not sure whether
TaskManager could properly handle the situation that same hostname with
different IPs, because the JobManager failed and relaunched. Also we may
still have two JobManagers running in some corner cases(e.g. kubelet is
down but the pod is running). Another concern is we have a strong
dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
is not always true especially in self-build Kubernetes cluster. Moreover,
PV provider should guarantee that each PV could only be mounted once. Since
the native HA proposal could cover all the functionality of StatefulSet
proposal, that's why I prefer the former.


[1].
https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70

Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:

> Thanks for creating this FLIP Yang Wang. I believe that many of our users
> will like a ZooKeeper-less HA setup.
>
> +1 for not separating the leader information and the leader election if
> possible. Maybe it is even possible that the contender writes his leader
> information directly when trying to obtain the leadership by performing a
> versioned write operation.
>
> Concerning the lock and release operation I have a question: Can there be
> multiple owners for a given key-value pair in a ConfigMap? If not, how can
> we ensure that the node which writes his ownership is actually the leader
> w/o transactional support from K8s? In ZooKeeper we had the same problem
> (we should probably change it at some point to simply use a
> transaction which checks whether the writer is still the leader) and
> therefore introduced the ephemeral lock nodes. What they allow is that
> there can be multiple owners of a given ZNode at a time. The last owner
> will then be responsible for the cleanup of the node.
>
> I see the benefit of your proposal over the stateful set proposal because
> it can support multiple standby JMs. Given the problem of locking key-value
> pairs it might be simpler to start with this approach where we only have
> single JM. This might already add a lot of benefits for our users. Was
> there a specific reason why you discarded this proposal (other than
> generality)?
>
> @Uce it would be great to hear your feedback on the proposal since you
> already implemented a K8s based HA service.
>
> Cheers,
> Till
>
> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi Xintong and Stephan,
>>
>> Thanks a lot for your attention on this FLIP. I will address the comments
>> inline.
>>
>> # Architecture -> One or two ConfigMaps
>>
>> Both of you are right. One ConfigMap will make the design and
>> implementation easier. Actually, in my POC codes,
>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>> server component) for the leader election
>> and storage. Once a JobManager win the election, it will update the
>> ConfigMap with leader address and periodically
>> renew the lock annotation to keep as the active leader. I will update the
>> FLIP document, including the architecture diagram,
>> to avoid the misunderstanding.
>>
>>
>> # HA storage > Lock and release
>>
>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>> deleted by the ZK server automatically when
>> the client is timeout. It could happen in a bad network environment or
>> the ZK client crashed exceptionally. For Kubernetes,
>> we need to implement a similar mechanism. First, when we want to lock a
>> specific key in ConfigMap, we will put the owner identify,
>> lease duration, renew time in the ConfigMap annotation. The annotation
>> will be cleaned up when releasing the lock. When
>> we want to remove a job graph or checkpoints, it should satisfy the
>> following conditions. If not, the delete operation could not be done.
>> * Current instance is the owner of the key.
>> * The owner annotation is empty, which means the owner has released the
>> lock.
>> * The owner annotation timed out, which usually indicate the owner died.
>>
>>
>> # HA storage > HA data clean up
>>
>> Sorry for that I do not describe how the HA related ConfigMap is retained
>> clearly. Benefit from the Kubernetes OwnerReference[1],
>> we set owner of the flink-conf configmap, service and TaskManager pods to
>> JobManager Deployment. So when we want to
>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>> the HA related ConfigMaps, we do not set the owner
>> so that they could be retained even though we delete the whole Flink
>> cluster.
>>
>>
>> [1].
>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>
>>
>> Best,
>> Yang
>>
>>
>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>
>>> This is a very cool feature proposal.
>>>
>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>> complicated to have the Leader RPC address in a different node than the
>>> LeaderLock. There is extra code needed to make sure these converge and the
>>> can be temporarily out of sync.
>>>
>>> A much easier design would be to have the RPC address as payload in the
>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>> stored as payload of the lock.
>>> I think for the design above it would mean having a single ConfigMap for
>>> both leader lock and leader RPC address discovery.
>>>
>>> This probably serves as a good design principle in general - not divide
>>> information that is updated together over different resources.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for preparing this FLIP, @Yang.
>>>>
>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>> attractive feature for users.
>>>>
>>>> Concerning the proposed design, I have some questions. Might not be
>>>> problems, just trying to understand.
>>>>
>>>> ## Architecture
>>>>
>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>> (lock for contending leader updated), but still gets the old leader's
>>>> address when trying to read `leader RPC address`?
>>>>
>>>> ## HA storage > Lock and release
>>>>
>>>> It seems to me that the owner needs to explicitly release the lock so
>>>> that other peers can write/remove the stored object. What if the previous
>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>> be any problem?
>>>>
>>>> ## HA storage > HA data clean up
>>>>
>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>> how are the HA dada retained?
>>>>
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi devs and users,
>>>>>
>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>> introduce
>>>>> a new native high availability service for Kubernetes.
>>>>>
>>>>> Currently, Flink has provided Zookeeper HA service and been widely used
>>>>> in production environments. It could be integrated in standalone
>>>>> cluster,
>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>> In the meantime, K8s has provided some public API for leader
>>>>> election[2]
>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>> features and make running HA configured Flink cluster on K8s more
>>>>> convenient.
>>>>>
>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>> introduced KubernetesHaService.
>>>>>
>>>>> [1].
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>> [2].
>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>
>>>>> Looking forward to your feedback.
>>>>>
>>>>> Best,
>>>>> Yang
>>>>>
>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
Hi Till, thanks for your valuable feedback.

1. Yes, leader election and storing leader information will use a same
ConfigMap. When a contender successfully performs a versioned annotation
update operation to the ConfigMap, it means that it has been elected as the
leader. And it will write the leader information in the callback of leader
elector[1]. The Kubernetes resource version will help us to avoid the
leader ConfigMap is wrongly updated.

2. The lock and release is really a valid concern. Actually in current
design, we could not guarantee that the node who tries to write his
ownership is the real leader. Who writes later, who is the owner. To
address this issue, we need to store all the owners of the key. Only when
the owner is empty, the specific key(means a checkpoint or job graph) could
be deleted. However, we may have a residual checkpoint or job graph when
the old JobManager crashed exceptionally and do not release the lock. To
solve this problem completely, we need a timestamp renew mechanism
for CompletedCheckpointStore and JobGraphStore, which could help us to the
check the JobManager timeout and then clean up the residual keys.

3. Frankly speaking, I am not against with this solution. However, in my
opinion, it is more like a temporary proposal. We could use StatefulSet to
avoid leader election and leader retrieval. But I am not sure whether
TaskManager could properly handle the situation that same hostname with
different IPs, because the JobManager failed and relaunched. Also we may
still have two JobManagers running in some corner cases(e.g. kubelet is
down but the pod is running). Another concern is we have a strong
dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
is not always true especially in self-build Kubernetes cluster. Moreover,
PV provider should guarantee that each PV could only be mounted once. Since
the native HA proposal could cover all the functionality of StatefulSet
proposal, that's why I prefer the former.


[1].
https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70

Best,
Yang

Till Rohrmann <tr...@apache.org> 于2020年9月28日周一 下午9:29写道:

> Thanks for creating this FLIP Yang Wang. I believe that many of our users
> will like a ZooKeeper-less HA setup.
>
> +1 for not separating the leader information and the leader election if
> possible. Maybe it is even possible that the contender writes his leader
> information directly when trying to obtain the leadership by performing a
> versioned write operation.
>
> Concerning the lock and release operation I have a question: Can there be
> multiple owners for a given key-value pair in a ConfigMap? If not, how can
> we ensure that the node which writes his ownership is actually the leader
> w/o transactional support from K8s? In ZooKeeper we had the same problem
> (we should probably change it at some point to simply use a
> transaction which checks whether the writer is still the leader) and
> therefore introduced the ephemeral lock nodes. What they allow is that
> there can be multiple owners of a given ZNode at a time. The last owner
> will then be responsible for the cleanup of the node.
>
> I see the benefit of your proposal over the stateful set proposal because
> it can support multiple standby JMs. Given the problem of locking key-value
> pairs it might be simpler to start with this approach where we only have
> single JM. This might already add a lot of benefits for our users. Was
> there a specific reason why you discarded this proposal (other than
> generality)?
>
> @Uce it would be great to hear your feedback on the proposal since you
> already implemented a K8s based HA service.
>
> Cheers,
> Till
>
> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi Xintong and Stephan,
>>
>> Thanks a lot for your attention on this FLIP. I will address the comments
>> inline.
>>
>> # Architecture -> One or two ConfigMaps
>>
>> Both of you are right. One ConfigMap will make the design and
>> implementation easier. Actually, in my POC codes,
>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>> server component) for the leader election
>> and storage. Once a JobManager win the election, it will update the
>> ConfigMap with leader address and periodically
>> renew the lock annotation to keep as the active leader. I will update the
>> FLIP document, including the architecture diagram,
>> to avoid the misunderstanding.
>>
>>
>> # HA storage > Lock and release
>>
>> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
>> deleted by the ZK server automatically when
>> the client is timeout. It could happen in a bad network environment or
>> the ZK client crashed exceptionally. For Kubernetes,
>> we need to implement a similar mechanism. First, when we want to lock a
>> specific key in ConfigMap, we will put the owner identify,
>> lease duration, renew time in the ConfigMap annotation. The annotation
>> will be cleaned up when releasing the lock. When
>> we want to remove a job graph or checkpoints, it should satisfy the
>> following conditions. If not, the delete operation could not be done.
>> * Current instance is the owner of the key.
>> * The owner annotation is empty, which means the owner has released the
>> lock.
>> * The owner annotation timed out, which usually indicate the owner died.
>>
>>
>> # HA storage > HA data clean up
>>
>> Sorry for that I do not describe how the HA related ConfigMap is retained
>> clearly. Benefit from the Kubernetes OwnerReference[1],
>> we set owner of the flink-conf configmap, service and TaskManager pods to
>> JobManager Deployment. So when we want to
>> destroy a Flink cluster, we just need to delete the deployment[2]. For
>> the HA related ConfigMaps, we do not set the owner
>> so that they could be retained even though we delete the whole Flink
>> cluster.
>>
>>
>> [1].
>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>
>>
>> Best,
>> Yang
>>
>>
>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>
>>> This is a very cool feature proposal.
>>>
>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>> complicated to have the Leader RPC address in a different node than the
>>> LeaderLock. There is extra code needed to make sure these converge and the
>>> can be temporarily out of sync.
>>>
>>> A much easier design would be to have the RPC address as payload in the
>>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>>> stored as payload of the lock.
>>> I think for the design above it would mean having a single ConfigMap for
>>> both leader lock and leader RPC address discovery.
>>>
>>> This probably serves as a good design principle in general - not divide
>>> information that is updated together over different resources.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for preparing this FLIP, @Yang.
>>>>
>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>>> attractive feature for users.
>>>>
>>>> Concerning the proposed design, I have some questions. Might not be
>>>> problems, just trying to understand.
>>>>
>>>> ## Architecture
>>>>
>>>> Why does the leader election need two ConfigMaps (`lock for contending
>>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>>> (lock for contending leader updated), but still gets the old leader's
>>>> address when trying to read `leader RPC address`?
>>>>
>>>> ## HA storage > Lock and release
>>>>
>>>> It seems to me that the owner needs to explicitly release the lock so
>>>> that other peers can write/remove the stored object. What if the previous
>>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>>> be any problem?
>>>>
>>>> ## HA storage > HA data clean up
>>>>
>>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>>> how are the HA dada retained?
>>>>
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi devs and users,
>>>>>
>>>>> I would like to start the discussion about FLIP-144[1], which will
>>>>> introduce
>>>>> a new native high availability service for Kubernetes.
>>>>>
>>>>> Currently, Flink has provided Zookeeper HA service and been widely used
>>>>> in production environments. It could be integrated in standalone
>>>>> cluster,
>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>>> In the meantime, K8s has provided some public API for leader
>>>>> election[2]
>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>>> features and make running HA configured Flink cluster on K8s more
>>>>> convenient.
>>>>>
>>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>>> introduced KubernetesHaService.
>>>>>
>>>>> [1].
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>> [2].
>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>
>>>>> Looking forward to your feedback.
>>>>>
>>>>> Best,
>>>>> Yang
>>>>>
>>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for creating this FLIP Yang Wang. I believe that many of our users
will like a ZooKeeper-less HA setup.

+1 for not separating the leader information and the leader election if
possible. Maybe it is even possible that the contender writes his leader
information directly when trying to obtain the leadership by performing a
versioned write operation.

Concerning the lock and release operation I have a question: Can there be
multiple owners for a given key-value pair in a ConfigMap? If not, how can
we ensure that the node which writes his ownership is actually the leader
w/o transactional support from K8s? In ZooKeeper we had the same problem
(we should probably change it at some point to simply use a
transaction which checks whether the writer is still the leader) and
therefore introduced the ephemeral lock nodes. What they allow is that
there can be multiple owners of a given ZNode at a time. The last owner
will then be responsible for the cleanup of the node.

I see the benefit of your proposal over the stateful set proposal because
it can support multiple standby JMs. Given the problem of locking key-value
pairs it might be simpler to start with this approach where we only have
single JM. This might already add a lot of benefits for our users. Was
there a specific reason why you discarded this proposal (other than
generality)?

@Uce it would be great to hear your feedback on the proposal since you
already implemented a K8s based HA service.

Cheers,
Till

On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com> wrote:

> Hi Xintong and Stephan,
>
> Thanks a lot for your attention on this FLIP. I will address the comments
> inline.
>
> # Architecture -> One or two ConfigMaps
>
> Both of you are right. One ConfigMap will make the design and
> implementation easier. Actually, in my POC codes,
> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
> server component) for the leader election
> and storage. Once a JobManager win the election, it will update the
> ConfigMap with leader address and periodically
> renew the lock annotation to keep as the active leader. I will update the
> FLIP document, including the architecture diagram,
> to avoid the misunderstanding.
>
>
> # HA storage > Lock and release
>
> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
> deleted by the ZK server automatically when
> the client is timeout. It could happen in a bad network environment or the
> ZK client crashed exceptionally. For Kubernetes,
> we need to implement a similar mechanism. First, when we want to lock a
> specific key in ConfigMap, we will put the owner identify,
> lease duration, renew time in the ConfigMap annotation. The annotation
> will be cleaned up when releasing the lock. When
> we want to remove a job graph or checkpoints, it should satisfy the
> following conditions. If not, the delete operation could not be done.
> * Current instance is the owner of the key.
> * The owner annotation is empty, which means the owner has released the
> lock.
> * The owner annotation timed out, which usually indicate the owner died.
>
>
> # HA storage > HA data clean up
>
> Sorry for that I do not describe how the HA related ConfigMap is retained
> clearly. Benefit from the Kubernetes OwnerReference[1],
> we set owner of the flink-conf configmap, service and TaskManager pods to
> JobManager Deployment. So when we want to
> destroy a Flink cluster, we just need to delete the deployment[2]. For the
> HA related ConfigMaps, we do not set the owner
> so that they could be retained even though we delete the whole Flink
> cluster.
>
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>
>
> Best,
> Yang
>
>
> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>
>> This is a very cool feature proposal.
>>
>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>> complicated to have the Leader RPC address in a different node than the
>> LeaderLock. There is extra code needed to make sure these converge and the
>> can be temporarily out of sync.
>>
>> A much easier design would be to have the RPC address as payload in the
>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>> stored as payload of the lock.
>> I think for the design above it would mean having a single ConfigMap for
>> both leader lock and leader RPC address discovery.
>>
>> This probably serves as a good design principle in general - not divide
>> information that is updated together over different resources.
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> Thanks for preparing this FLIP, @Yang.
>>>
>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>> attractive feature for users.
>>>
>>> Concerning the proposed design, I have some questions. Might not be
>>> problems, just trying to understand.
>>>
>>> ## Architecture
>>>
>>> Why does the leader election need two ConfigMaps (`lock for contending
>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>> (lock for contending leader updated), but still gets the old leader's
>>> address when trying to read `leader RPC address`?
>>>
>>> ## HA storage > Lock and release
>>>
>>> It seems to me that the owner needs to explicitly release the lock so
>>> that other peers can write/remove the stored object. What if the previous
>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>> be any problem?
>>>
>>> ## HA storage > HA data clean up
>>>
>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>> how are the HA dada retained?
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi devs and users,
>>>>
>>>> I would like to start the discussion about FLIP-144[1], which will
>>>> introduce
>>>> a new native high availability service for Kubernetes.
>>>>
>>>> Currently, Flink has provided Zookeeper HA service and been widely used
>>>> in production environments. It could be integrated in standalone
>>>> cluster,
>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>> In the meantime, K8s has provided some public API for leader election[2]
>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>> features and make running HA configured Flink cluster on K8s more
>>>> convenient.
>>>>
>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>> introduced KubernetesHaService.
>>>>
>>>> [1].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>> [2].
>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for creating this FLIP Yang Wang. I believe that many of our users
will like a ZooKeeper-less HA setup.

+1 for not separating the leader information and the leader election if
possible. Maybe it is even possible that the contender writes his leader
information directly when trying to obtain the leadership by performing a
versioned write operation.

Concerning the lock and release operation I have a question: Can there be
multiple owners for a given key-value pair in a ConfigMap? If not, how can
we ensure that the node which writes his ownership is actually the leader
w/o transactional support from K8s? In ZooKeeper we had the same problem
(we should probably change it at some point to simply use a
transaction which checks whether the writer is still the leader) and
therefore introduced the ephemeral lock nodes. What they allow is that
there can be multiple owners of a given ZNode at a time. The last owner
will then be responsible for the cleanup of the node.

I see the benefit of your proposal over the stateful set proposal because
it can support multiple standby JMs. Given the problem of locking key-value
pairs it might be simpler to start with this approach where we only have
single JM. This might already add a lot of benefits for our users. Was
there a specific reason why you discarded this proposal (other than
generality)?

@Uce it would be great to hear your feedback on the proposal since you
already implemented a K8s based HA service.

Cheers,
Till

On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <da...@gmail.com> wrote:

> Hi Xintong and Stephan,
>
> Thanks a lot for your attention on this FLIP. I will address the comments
> inline.
>
> # Architecture -> One or two ConfigMaps
>
> Both of you are right. One ConfigMap will make the design and
> implementation easier. Actually, in my POC codes,
> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
> server component) for the leader election
> and storage. Once a JobManager win the election, it will update the
> ConfigMap with leader address and periodically
> renew the lock annotation to keep as the active leader. I will update the
> FLIP document, including the architecture diagram,
> to avoid the misunderstanding.
>
>
> # HA storage > Lock and release
>
> This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
> deleted by the ZK server automatically when
> the client is timeout. It could happen in a bad network environment or the
> ZK client crashed exceptionally. For Kubernetes,
> we need to implement a similar mechanism. First, when we want to lock a
> specific key in ConfigMap, we will put the owner identify,
> lease duration, renew time in the ConfigMap annotation. The annotation
> will be cleaned up when releasing the lock. When
> we want to remove a job graph or checkpoints, it should satisfy the
> following conditions. If not, the delete operation could not be done.
> * Current instance is the owner of the key.
> * The owner annotation is empty, which means the owner has released the
> lock.
> * The owner annotation timed out, which usually indicate the owner died.
>
>
> # HA storage > HA data clean up
>
> Sorry for that I do not describe how the HA related ConfigMap is retained
> clearly. Benefit from the Kubernetes OwnerReference[1],
> we set owner of the flink-conf configmap, service and TaskManager pods to
> JobManager Deployment. So when we want to
> destroy a Flink cluster, we just need to delete the deployment[2]. For the
> HA related ConfigMaps, we do not set the owner
> so that they could be retained even though we delete the whole Flink
> cluster.
>
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>
>
> Best,
> Yang
>
>
> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>
>> This is a very cool feature proposal.
>>
>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>> complicated to have the Leader RPC address in a different node than the
>> LeaderLock. There is extra code needed to make sure these converge and the
>> can be temporarily out of sync.
>>
>> A much easier design would be to have the RPC address as payload in the
>> lock entry (ZNode in ZK), the same way that the leader fencing token is
>> stored as payload of the lock.
>> I think for the design above it would mean having a single ConfigMap for
>> both leader lock and leader RPC address discovery.
>>
>> This probably serves as a good design principle in general - not divide
>> information that is updated together over different resources.
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> Thanks for preparing this FLIP, @Yang.
>>>
>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>> buildtin ConfigMap for Flink's HA services should significantly reduce the
>>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>>> attractive feature for users.
>>>
>>> Concerning the proposed design, I have some questions. Might not be
>>> problems, just trying to understand.
>>>
>>> ## Architecture
>>>
>>> Why does the leader election need two ConfigMaps (`lock for contending
>>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>>> (lock for contending leader updated), but still gets the old leader's
>>> address when trying to read `leader RPC address`?
>>>
>>> ## HA storage > Lock and release
>>>
>>> It seems to me that the owner needs to explicitly release the lock so
>>> that other peers can write/remove the stored object. What if the previous
>>> owner failed to release the lock (e.g., dead before releasing)? Would there
>>> be any problem?
>>>
>>> ## HA storage > HA data clean up
>>>
>>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`,
>>> how are the HA dada retained?
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi devs and users,
>>>>
>>>> I would like to start the discussion about FLIP-144[1], which will
>>>> introduce
>>>> a new native high availability service for Kubernetes.
>>>>
>>>> Currently, Flink has provided Zookeeper HA service and been widely used
>>>> in production environments. It could be integrated in standalone
>>>> cluster,
>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>>> will take additional cost since we need to manage a Zookeeper cluster.
>>>> In the meantime, K8s has provided some public API for leader election[2]
>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>>> features and make running HA configured Flink cluster on K8s more
>>>> convenient.
>>>>
>>>> Both the standalone on K8s and native K8s could benefit from the new
>>>> introduced KubernetesHaService.
>>>>
>>>> [1].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>> [2].
>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
Hi Xintong and Stephan,

Thanks a lot for your attention on this FLIP. I will address the comments
inline.

# Architecture -> One or two ConfigMaps

Both of you are right. One ConfigMap will make the design and
implementation easier. Actually, in my POC codes,
I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest server
component) for the leader election
and storage. Once a JobManager win the election, it will update the
ConfigMap with leader address and periodically
renew the lock annotation to keep as the active leader. I will update the
FLIP document, including the architecture diagram,
to avoid the misunderstanding.


# HA storage > Lock and release

This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
deleted by the ZK server automatically when
the client is timeout. It could happen in a bad network environment or the
ZK client crashed exceptionally. For Kubernetes,
we need to implement a similar mechanism. First, when we want to lock a
specific key in ConfigMap, we will put the owner identify,
lease duration, renew time in the ConfigMap annotation. The annotation will
be cleaned up when releasing the lock. When
we want to remove a job graph or checkpoints, it should satisfy the
following conditions. If not, the delete operation could not be done.
* Current instance is the owner of the key.
* The owner annotation is empty, which means the owner has released the
lock.
* The owner annotation timed out, which usually indicate the owner died.


# HA storage > HA data clean up

Sorry for that I do not describe how the HA related ConfigMap is retained
clearly. Benefit from the Kubernetes OwnerReference[1],
we set owner of the flink-conf configmap, service and TaskManager pods to
JobManager Deployment. So when we want to
destroy a Flink cluster, we just need to delete the deployment[2]. For the
HA related ConfigMaps, we do not set the owner
so that they could be retained even though we delete the whole Flink
cluster.


[1].
https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
[2].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session


Best,
Yang


Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:

> This is a very cool feature proposal.
>
> One lesson-learned from the ZooKeeper-based HA is that it is overly
> complicated to have the Leader RPC address in a different node than the
> LeaderLock. There is extra code needed to make sure these converge and the
> can be temporarily out of sync.
>
> A much easier design would be to have the RPC address as payload in the
> lock entry (ZNode in ZK), the same way that the leader fencing token is
> stored as payload of the lock.
> I think for the design above it would mean having a single ConfigMap for
> both leader lock and leader RPC address discovery.
>
> This probably serves as a good design principle in general - not divide
> information that is updated together over different resources.
>
> Best,
> Stephan
>
>
> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
> wrote:
>
>> Thanks for preparing this FLIP, @Yang.
>>
>> In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin
>> ConfigMap for Flink's HA services should significantly reduce the
>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>> attractive feature for users.
>>
>> Concerning the proposed design, I have some questions. Might not be
>> problems, just trying to understand.
>>
>> ## Architecture
>>
>> Why does the leader election need two ConfigMaps (`lock for contending
>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>> (lock for contending leader updated), but still gets the old leader's
>> address when trying to read `leader RPC address`?
>>
>> ## HA storage > Lock and release
>>
>> It seems to me that the owner needs to explicitly release the lock so
>> that other peers can write/remove the stored object. What if the previous
>> owner failed to release the lock (e.g., dead before releasing)? Would there
>> be any problem?
>>
>> ## HA storage > HA data clean up
>>
>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how
>> are the HA dada retained?
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com> wrote:
>>
>>> Hi devs and users,
>>>
>>> I would like to start the discussion about FLIP-144[1], which will
>>> introduce
>>> a new native high availability service for Kubernetes.
>>>
>>> Currently, Flink has provided Zookeeper HA service and been widely used
>>> in production environments. It could be integrated in standalone cluster,
>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>> will take additional cost since we need to manage a Zookeeper cluster.
>>> In the meantime, K8s has provided some public API for leader election[2]
>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>> features and make running HA configured Flink cluster on K8s more
>>> convenient.
>>>
>>> Both the standalone on K8s and native K8s could benefit from the new
>>> introduced KubernetesHaService.
>>>
>>> [1].
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>> [2].
>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>
>>> Looking forward to your feedback.
>>>
>>> Best,
>>> Yang
>>>
>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Yang Wang <da...@gmail.com>.
Hi Xintong and Stephan,

Thanks a lot for your attention on this FLIP. I will address the comments
inline.

# Architecture -> One or two ConfigMaps

Both of you are right. One ConfigMap will make the design and
implementation easier. Actually, in my POC codes,
I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest server
component) for the leader election
and storage. Once a JobManager win the election, it will update the
ConfigMap with leader address and periodically
renew the lock annotation to keep as the active leader. I will update the
FLIP document, including the architecture diagram,
to avoid the misunderstanding.


# HA storage > Lock and release

This is a valid concern. Since for Zookeeper ephemeral nodes, it will be
deleted by the ZK server automatically when
the client is timeout. It could happen in a bad network environment or the
ZK client crashed exceptionally. For Kubernetes,
we need to implement a similar mechanism. First, when we want to lock a
specific key in ConfigMap, we will put the owner identify,
lease duration, renew time in the ConfigMap annotation. The annotation will
be cleaned up when releasing the lock. When
we want to remove a job graph or checkpoints, it should satisfy the
following conditions. If not, the delete operation could not be done.
* Current instance is the owner of the key.
* The owner annotation is empty, which means the owner has released the
lock.
* The owner annotation timed out, which usually indicate the owner died.


# HA storage > HA data clean up

Sorry for that I do not describe how the HA related ConfigMap is retained
clearly. Benefit from the Kubernetes OwnerReference[1],
we set owner of the flink-conf configmap, service and TaskManager pods to
JobManager Deployment. So when we want to
destroy a Flink cluster, we just need to delete the deployment[2]. For the
HA related ConfigMaps, we do not set the owner
so that they could be retained even though we delete the whole Flink
cluster.


[1].
https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
[2].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session


Best,
Yang


Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:

> This is a very cool feature proposal.
>
> One lesson-learned from the ZooKeeper-based HA is that it is overly
> complicated to have the Leader RPC address in a different node than the
> LeaderLock. There is extra code needed to make sure these converge and the
> can be temporarily out of sync.
>
> A much easier design would be to have the RPC address as payload in the
> lock entry (ZNode in ZK), the same way that the leader fencing token is
> stored as payload of the lock.
> I think for the design above it would mean having a single ConfigMap for
> both leader lock and leader RPC address discovery.
>
> This probably serves as a good design principle in general - not divide
> information that is updated together over different resources.
>
> Best,
> Stephan
>
>
> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com>
> wrote:
>
>> Thanks for preparing this FLIP, @Yang.
>>
>> In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin
>> ConfigMap for Flink's HA services should significantly reduce the
>> maintenance overhead compared to deploying a ZK cluster. I think this is an
>> attractive feature for users.
>>
>> Concerning the proposed design, I have some questions. Might not be
>> problems, just trying to understand.
>>
>> ## Architecture
>>
>> Why does the leader election need two ConfigMaps (`lock for contending
>> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
>> not updated consistently? E.g., a TM learns about a new JM becoming leader
>> (lock for contending leader updated), but still gets the old leader's
>> address when trying to read `leader RPC address`?
>>
>> ## HA storage > Lock and release
>>
>> It seems to me that the owner needs to explicitly release the lock so
>> that other peers can write/remove the stored object. What if the previous
>> owner failed to release the lock (e.g., dead before releasing)? Would there
>> be any problem?
>>
>> ## HA storage > HA data clean up
>>
>> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how
>> are the HA dada retained?
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com> wrote:
>>
>>> Hi devs and users,
>>>
>>> I would like to start the discussion about FLIP-144[1], which will
>>> introduce
>>> a new native high availability service for Kubernetes.
>>>
>>> Currently, Flink has provided Zookeeper HA service and been widely used
>>> in production environments. It could be integrated in standalone cluster,
>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>>> will take additional cost since we need to manage a Zookeeper cluster.
>>> In the meantime, K8s has provided some public API for leader election[2]
>>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>>> features and make running HA configured Flink cluster on K8s more
>>> convenient.
>>>
>>> Both the standalone on K8s and native K8s could benefit from the new
>>> introduced KubernetesHaService.
>>>
>>> [1].
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>> [2].
>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>>
>>> Looking forward to your feedback.
>>>
>>> Best,
>>> Yang
>>>
>>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Stephan Ewen <se...@apache.org>.
This is a very cool feature proposal.

One lesson-learned from the ZooKeeper-based HA is that it is overly
complicated to have the Leader RPC address in a different node than the
LeaderLock. There is extra code needed to make sure these converge and the
can be temporarily out of sync.

A much easier design would be to have the RPC address as payload in the
lock entry (ZNode in ZK), the same way that the leader fencing token is
stored as payload of the lock.
I think for the design above it would mean having a single ConfigMap for
both leader lock and leader RPC address discovery.

This probably serves as a good design principle in general - not divide
information that is updated together over different resources.

Best,
Stephan


On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com> wrote:

> Thanks for preparing this FLIP, @Yang.
>
> In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin
> ConfigMap for Flink's HA services should significantly reduce the
> maintenance overhead compared to deploying a ZK cluster. I think this is an
> attractive feature for users.
>
> Concerning the proposed design, I have some questions. Might not be
> problems, just trying to understand.
>
> ## Architecture
>
> Why does the leader election need two ConfigMaps (`lock for contending
> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
> not updated consistently? E.g., a TM learns about a new JM becoming leader
> (lock for contending leader updated), but still gets the old leader's
> address when trying to read `leader RPC address`?
>
> ## HA storage > Lock and release
>
> It seems to me that the owner needs to explicitly release the lock so that
> other peers can write/remove the stored object. What if the previous owner
> failed to release the lock (e.g., dead before releasing)? Would there be
> any problem?
>
> ## HA storage > HA data clean up
>
> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how
> are the HA dada retained?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi devs and users,
>>
>> I would like to start the discussion about FLIP-144[1], which will
>> introduce
>> a new native high availability service for Kubernetes.
>>
>> Currently, Flink has provided Zookeeper HA service and been widely used
>> in production environments. It could be integrated in standalone cluster,
>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>> will take additional cost since we need to manage a Zookeeper cluster.
>> In the meantime, K8s has provided some public API for leader election[2]
>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>> features and make running HA configured Flink cluster on K8s more
>> convenient.
>>
>> Both the standalone on K8s and native K8s could benefit from the new
>> introduced KubernetesHaService.
>>
>> [1].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>> [2].
>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>
>> Looking forward to your feedback.
>>
>> Best,
>> Yang
>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Stephan Ewen <se...@apache.org>.
This is a very cool feature proposal.

One lesson-learned from the ZooKeeper-based HA is that it is overly
complicated to have the Leader RPC address in a different node than the
LeaderLock. There is extra code needed to make sure these converge and the
can be temporarily out of sync.

A much easier design would be to have the RPC address as payload in the
lock entry (ZNode in ZK), the same way that the leader fencing token is
stored as payload of the lock.
I think for the design above it would mean having a single ConfigMap for
both leader lock and leader RPC address discovery.

This probably serves as a good design principle in general - not divide
information that is updated together over different resources.

Best,
Stephan


On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <to...@gmail.com> wrote:

> Thanks for preparing this FLIP, @Yang.
>
> In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin
> ConfigMap for Flink's HA services should significantly reduce the
> maintenance overhead compared to deploying a ZK cluster. I think this is an
> attractive feature for users.
>
> Concerning the proposed design, I have some questions. Might not be
> problems, just trying to understand.
>
> ## Architecture
>
> Why does the leader election need two ConfigMaps (`lock for contending
> leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
> not updated consistently? E.g., a TM learns about a new JM becoming leader
> (lock for contending leader updated), but still gets the old leader's
> address when trying to read `leader RPC address`?
>
> ## HA storage > Lock and release
>
> It seems to me that the owner needs to explicitly release the lock so that
> other peers can write/remove the stored object. What if the previous owner
> failed to release the lock (e.g., dead before releasing)? Would there be
> any problem?
>
> ## HA storage > HA data clean up
>
> If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how
> are the HA dada retained?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi devs and users,
>>
>> I would like to start the discussion about FLIP-144[1], which will
>> introduce
>> a new native high availability service for Kubernetes.
>>
>> Currently, Flink has provided Zookeeper HA service and been widely used
>> in production environments. It could be integrated in standalone cluster,
>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
>> will take additional cost since we need to manage a Zookeeper cluster.
>> In the meantime, K8s has provided some public API for leader election[2]
>> and configuration storage(i.e. ConfigMap[3]). We could leverage these
>> features and make running HA configured Flink cluster on K8s more
>> convenient.
>>
>> Both the standalone on K8s and native K8s could benefit from the new
>> introduced KubernetesHaService.
>>
>> [1].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>> [2].
>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>
>> Looking forward to your feedback.
>>
>> Best,
>> Yang
>>
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Xintong Song <to...@gmail.com>.
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin
ConfigMap for Flink's HA services should significantly reduce the
maintenance overhead compared to deploying a ZK cluster. I think this is an
attractive feature for users.

Concerning the proposed design, I have some questions. Might not be
problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending
leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
not updated consistently? E.g., a TM learns about a new JM becoming leader
(lock for contending leader updated), but still gets the old leader's
address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that
other peers can write/remove the stored object. What if the previous owner
failed to release the lock (e.g., dead before releasing)? Would there be
any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how
are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com> wrote:

> Hi devs and users,
>
> I would like to start the discussion about FLIP-144[1], which will
> introduce
> a new native high availability service for Kubernetes.
>
> Currently, Flink has provided Zookeeper HA service and been widely used
> in production environments. It could be integrated in standalone cluster,
> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
> will take additional cost since we need to manage a Zookeeper cluster.
> In the meantime, K8s has provided some public API for leader election[2]
> and configuration storage(i.e. ConfigMap[3]). We could leverage these
> features and make running HA configured Flink cluster on K8s more
> convenient.
>
> Both the standalone on K8s and native K8s could benefit from the new
> introduced KubernetesHaService.
>
> [1].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> [2].
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>
> Looking forward to your feedback.
>
> Best,
> Yang
>

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

Posted by Xintong Song <to...@gmail.com>.
Thanks for preparing this FLIP, @Yang.

In general, I'm +1 for this new feature. Leveraging Kubernetes's buildtin
ConfigMap for Flink's HA services should significantly reduce the
maintenance overhead compared to deploying a ZK cluster. I think this is an
attractive feature for users.

Concerning the proposed design, I have some questions. Might not be
problems, just trying to understand.

## Architecture

Why does the leader election need two ConfigMaps (`lock for contending
leader`, and `leader RPC address`)? What happens if the two ConfigMaps are
not updated consistently? E.g., a TM learns about a new JM becoming leader
(lock for contending leader updated), but still gets the old leader's
address when trying to read `leader RPC address`?

## HA storage > Lock and release

It seems to me that the owner needs to explicitly release the lock so that
other peers can write/remove the stored object. What if the previous owner
failed to release the lock (e.g., dead before releasing)? Would there be
any problem?

## HA storage > HA data clean up

If the ConfigMap is destroyed on `kubectl delete deploy <ClusterID>`, how
are the HA dada retained?


Thank you~

Xintong Song



On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <da...@gmail.com> wrote:

> Hi devs and users,
>
> I would like to start the discussion about FLIP-144[1], which will
> introduce
> a new native high availability service for Kubernetes.
>
> Currently, Flink has provided Zookeeper HA service and been widely used
> in production environments. It could be integrated in standalone cluster,
> Yarn, Kubernetes deployments. However, using the Zookeeper HA in K8s
> will take additional cost since we need to manage a Zookeeper cluster.
> In the meantime, K8s has provided some public API for leader election[2]
> and configuration storage(i.e. ConfigMap[3]). We could leverage these
> features and make running HA configured Flink cluster on K8s more
> convenient.
>
> Both the standalone on K8s and native K8s could benefit from the new
> introduced KubernetesHaService.
>
> [1].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> [2].
> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>
> Looking forward to your feedback.
>
> Best,
> Yang
>