You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yang Wang <da...@gmail.com> on 2020/09/01 08:58:00 UTC

Re: FileSystemHaServices and BlobStore

Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on
the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It
is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager
terminated.

However, in your case, benefit from the K8s persistent volume, all the
local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running.
Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover
from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s
deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be
pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader
election/retrieval, jobgraph meta store, checkpoint meta store, running
registry, etc.
It could be used for standalone K8s and native K8s deployment.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang


Khachatryan Roman <kh...@gmail.com> 于2020年8月31日周一 下午8:52写道:

> + dev
>
> Blob store is used for jars, serialized job, and task information and logs.
> You can find some information at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>
>
> I guess in your setup, Flink was able to pick up local files.
> HA setup presumes that Flink can survive the loss of that JM host and its
> local files.
>
> I'm not sure about K8s native setup - probably VoidBlobStore is enough if
> there is a persistent volume.
> But in the general case, FileSystemBlobStore should be used to store files
> on some DFS.
>
>
> Regards,
> Roman
>
>
> On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> > Did test with streaming job and FileSystemHaService using VoidBlobStore
> > (no HA Blob), looks like job was able to recover from both JM restart and
> > TM restart. Any idea in what use cases HA Blob is needed?
> >
> > Thanks,
> > Alexey
> > ------------------------------
> > *From:* Alexey Trenikhun <ye...@msn.com>
> > *Sent:* Friday, August 28, 2020 11:31 AM
> > *To:* Khachatryan Roman <kh...@gmail.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Motivation is to have k8s HA setup without extra component - Zookeeper,
> > see [1]
> >
> > Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> > like  if we start job from savepoint, then persistence of BlobStore is
> > not necessary, but is it needed if we recover from checkpoint?
> >
> > Thanks,
> > Alexey
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-17598
> >
> >
> > ------------------------------
> > *From:* Khachatryan Roman <kh...@gmail.com>
> > *Sent:* Friday, August 28, 2020 9:24 AM
> > *To:* Alexey Trenikhun <ye...@msn.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Hello Alexey,
> >
> > I think you need FileSystemBlobStore as you are implementing HA Services,
> > and BLOBs should be highly available too.
> > However, I'm a bit concerned about the direction in general: it
> > essentially means re-implementing ZK functionality on top of FS.
> > What are the motivation and the use case?
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com> wrote:
> >
> > Hello,
> > I'm thinking about implementing FileSystemHaServices - single leader, but
> > persistent RunningJobRegistry, CheckpointIDCounter,
> > CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> > FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> > BlobStore survive JobManager crash. I see that ZookeeperHaServices use
> FileSystemBlobStore,
> > but not clear is to due to having multiple JobManagers (leader +
> follower)
> > or necessity to preserve BLOBs on restart.
> >
> > Thanks,
> > Alexey
> >
> >
>

Re: FileSystemHaServices and BlobStore

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Yang,
I saw this FLIP, it is very good feature, I think overall for Kubernetes, it is preferred over “StatefulSet + PV + FileSystemHAService” approach, when it will be available we plan to use it. On other hand looks like FileSystemHAService is easier to implement, I thought about contributing to FileSystemHAService, maybe it will be useful to someone else until FLIP-144 will be available

Thanks,
Alexey
________________________________
From: Yang Wang <da...@gmail.com>
Sent: Wednesday, September 16, 2020 8:25 PM
To: Alexey Trenikhun
Cc: dev; Flink User Mail List
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

I have created FLIP-144[1] for the native Kubernetes HA support. Please have a look if you are interested.
Frankly speaking, I am not against the "StatefulSet + PV + FileSystemHAService". Maybe in the future,
we could have the both in Flink.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink

Best,
Yang

Alexey Trenikhun <ye...@msn.com>> 于2020年9月3日周四 下午11:44写道:
Hi Yang,
Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and RunningJobsRegistry

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>>
Sent: Wednesday, September 2, 2020 8:21:20 PM
To: Alexey Trenikhun <ye...@msn.com>>
Cc: dev <de...@flink.apache.org>>; Flink User Mail List <us...@flink.apache.org>>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume + FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage. But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <ye...@msn.com>> 于2020年9月2日周三 上午7:36写道:
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <de...@flink.apache.org>>
Cc: Alexey Trenikhun <ye...@msn.com>>; Flink User Mail List <us...@flink.apache.org>>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang


Khachatryan Roman <kh...@gmail.com>> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com>> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <ye...@msn.com>>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <kh...@gmail.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <kh...@gmail.com>>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <ye...@msn.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com>> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>

Re: FileSystemHaServices and BlobStore

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Yang,
I saw this FLIP, it is very good feature, I think overall for Kubernetes, it is preferred over “StatefulSet + PV + FileSystemHAService” approach, when it will be available we plan to use it. On other hand looks like FileSystemHAService is easier to implement, I thought about contributing to FileSystemHAService, maybe it will be useful to someone else until FLIP-144 will be available

Thanks,
Alexey
________________________________
From: Yang Wang <da...@gmail.com>
Sent: Wednesday, September 16, 2020 8:25 PM
To: Alexey Trenikhun
Cc: dev; Flink User Mail List
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

I have created FLIP-144[1] for the native Kubernetes HA support. Please have a look if you are interested.
Frankly speaking, I am not against the "StatefulSet + PV + FileSystemHAService". Maybe in the future,
we could have the both in Flink.

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink

Best,
Yang

Alexey Trenikhun <ye...@msn.com>> 于2020年9月3日周四 下午11:44写道:
Hi Yang,
Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and RunningJobsRegistry

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>>
Sent: Wednesday, September 2, 2020 8:21:20 PM
To: Alexey Trenikhun <ye...@msn.com>>
Cc: dev <de...@flink.apache.org>>; Flink User Mail List <us...@flink.apache.org>>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume + FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage. But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <ye...@msn.com>> 于2020年9月2日周三 上午7:36写道:
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <de...@flink.apache.org>>
Cc: Alexey Trenikhun <ye...@msn.com>>; Flink User Mail List <us...@flink.apache.org>>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang


Khachatryan Roman <kh...@gmail.com>> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com>> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <ye...@msn.com>>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <kh...@gmail.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <kh...@gmail.com>>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <ye...@msn.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com>> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>

Re: FileSystemHaServices and BlobStore

Posted by Yang Wang <da...@gmail.com>.
Hi Alexey,

I have created FLIP-144[1] for the native Kubernetes HA support. Please
have a look if you are interested.
Frankly speaking, I am not against the "StatefulSet + PV +
FileSystemHAService". Maybe in the future,
we could have the both in Flink.

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink

Best,
Yang

Alexey Trenikhun <ye...@msn.com> 于2020年9月3日周四 下午11:44写道:

> Hi Yang,
> Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and
> RunningJobsRegistry
>
> Thanks,
> Alexey
>
> ------------------------------
> *From:* Yang Wang <da...@gmail.com>
> *Sent:* Wednesday, September 2, 2020 8:21:20 PM
> *To:* Alexey Trenikhun <ye...@msn.com>
> *Cc:* dev <de...@flink.apache.org>; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Thanks for the feedback. You are right. StatefulSet + PersistentVolume +
> FileSystemHaService could be another
> bundle of services for Flink HA support on K8s. The user jars could be
> built into the image or downloaded by init-container
> or mount via the PV. So they do not need to be recovered from HA storage.
> But i think the checkpoint path and counter
> should be persisted so that we could recover from the latest checkpoint.
>
>
> Best,
> Yang
>
> Alexey Trenikhun <ye...@msn.com> 于2020年9月2日周三 上午7:36写道:
>
> Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars,
> RPC messages and log files) only jar files need HA guarantee, and in my
> particular case, job cluster with jar as part of image, it seems doesn't
> matter, I guess it explains why in my test I was able to recover from
> failure even VoidBlobStore. I also use StatefulSet instead of Deployment
>
> Thanks,
> Alexey
>
> ------------------------------
> *From:* Yang Wang <da...@gmail.com>
> *Sent:* Tuesday, September 1, 2020 1:58 AM
> *To:* dev <de...@flink.apache.org>
> *Cc:* Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Glad to hear that your are interested the K8s HA support.
>
> Roman's answer is just on point.
>
> "FileSystemBlobStore" is trying to store the user jars, job graph, etc. on
> the distributed storage(e.g. HDFS, S3, GFS). So when the
> JobManager failover, it could fetch the blob data from remote storage. It
> is very important for standalone and Yarn deployment since
> the local blob store is ephemeral and will be cleaned up after JobManager
> terminated.
>
> However, in your case, benefit from the K8s persistent volume, all the
> local blob data could be recovered after JobManager pod restarted.
> Then you could find that the jobs are recovered and keeps to running.
> Please also remember that the checkpoint meta and counter also
> need to be stored in local file. After then the Flink job could recover
> from the latest checkpoint successfully.
>
> > About the FileSystemHaService
> I am a little skeptical about this feature. Currently, we are using K8s
> deployment for the JobManager. And it is not always guaranteed only
> one JobManager is running. For example, the kubelet is down and never be
> pulled up again. I am trying to work on another ticket "native K8s HA"[1],
> in which we will get a fully functional HA service, including leader
> election/retrieval, jobgraph meta store, checkpoint meta store, running
> registry, etc.
> It could be used for standalone K8s and native K8s deployment.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-12884
>
> Best,
> Yang
>
>
> Khachatryan Roman <kh...@gmail.com> 于2020年8月31日周一 下午8:52写道:
>
> + dev
>
> Blob store is used for jars, serialized job, and task information and logs.
> You can find some information at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>
>
> I guess in your setup, Flink was able to pick up local files.
> HA setup presumes that Flink can survive the loss of that JM host and its
> local files.
>
> I'm not sure about K8s native setup - probably VoidBlobStore is enough if
> there is a persistent volume.
> But in the general case, FileSystemBlobStore should be used to store files
> on some DFS.
>
>
> Regards,
> Roman
>
>
> On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> > Did test with streaming job and FileSystemHaService using VoidBlobStore
> > (no HA Blob), looks like job was able to recover from both JM restart and
> > TM restart. Any idea in what use cases HA Blob is needed?
> >
> > Thanks,
> > Alexey
> > ------------------------------
> > *From:* Alexey Trenikhun <ye...@msn.com>
> > *Sent:* Friday, August 28, 2020 11:31 AM
> > *To:* Khachatryan Roman <kh...@gmail.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Motivation is to have k8s HA setup without extra component - Zookeeper,
> > see [1]
> >
> > Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> > like  if we start job from savepoint, then persistence of BlobStore is
> > not necessary, but is it needed if we recover from checkpoint?
> >
> > Thanks,
> > Alexey
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-17598
> >
> >
> > ------------------------------
> > *From:* Khachatryan Roman <kh...@gmail.com>
> > *Sent:* Friday, August 28, 2020 9:24 AM
> > *To:* Alexey Trenikhun <ye...@msn.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Hello Alexey,
> >
> > I think you need FileSystemBlobStore as you are implementing HA Services,
> > and BLOBs should be highly available too.
> > However, I'm a bit concerned about the direction in general: it
> > essentially means re-implementing ZK functionality on top of FS.
> > What are the motivation and the use case?
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com> wrote:
> >
> > Hello,
> > I'm thinking about implementing FileSystemHaServices - single leader, but
> > persistent RunningJobRegistry, CheckpointIDCounter,
> > CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> > FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> > BlobStore survive JobManager crash. I see that ZookeeperHaServices use
> FileSystemBlobStore,
> > but not clear is to due to having multiple JobManagers (leader +
> follower)
> > or necessity to preserve BLOBs on restart.
> >
> > Thanks,
> > Alexey
> >
> >
>
>

Re: FileSystemHaServices and BlobStore

Posted by Yang Wang <da...@gmail.com>.
Hi Alexey,

I have created FLIP-144[1] for the native Kubernetes HA support. Please
have a look if you are interested.
Frankly speaking, I am not against the "StatefulSet + PV +
FileSystemHAService". Maybe in the future,
we could have the both in Flink.

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink

Best,
Yang

Alexey Trenikhun <ye...@msn.com> 于2020年9月3日周四 下午11:44写道:

> Hi Yang,
> Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and
> RunningJobsRegistry
>
> Thanks,
> Alexey
>
> ------------------------------
> *From:* Yang Wang <da...@gmail.com>
> *Sent:* Wednesday, September 2, 2020 8:21:20 PM
> *To:* Alexey Trenikhun <ye...@msn.com>
> *Cc:* dev <de...@flink.apache.org>; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Thanks for the feedback. You are right. StatefulSet + PersistentVolume +
> FileSystemHaService could be another
> bundle of services for Flink HA support on K8s. The user jars could be
> built into the image or downloaded by init-container
> or mount via the PV. So they do not need to be recovered from HA storage.
> But i think the checkpoint path and counter
> should be persisted so that we could recover from the latest checkpoint.
>
>
> Best,
> Yang
>
> Alexey Trenikhun <ye...@msn.com> 于2020年9月2日周三 上午7:36写道:
>
> Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars,
> RPC messages and log files) only jar files need HA guarantee, and in my
> particular case, job cluster with jar as part of image, it seems doesn't
> matter, I guess it explains why in my test I was able to recover from
> failure even VoidBlobStore. I also use StatefulSet instead of Deployment
>
> Thanks,
> Alexey
>
> ------------------------------
> *From:* Yang Wang <da...@gmail.com>
> *Sent:* Tuesday, September 1, 2020 1:58 AM
> *To:* dev <de...@flink.apache.org>
> *Cc:* Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Glad to hear that your are interested the K8s HA support.
>
> Roman's answer is just on point.
>
> "FileSystemBlobStore" is trying to store the user jars, job graph, etc. on
> the distributed storage(e.g. HDFS, S3, GFS). So when the
> JobManager failover, it could fetch the blob data from remote storage. It
> is very important for standalone and Yarn deployment since
> the local blob store is ephemeral and will be cleaned up after JobManager
> terminated.
>
> However, in your case, benefit from the K8s persistent volume, all the
> local blob data could be recovered after JobManager pod restarted.
> Then you could find that the jobs are recovered and keeps to running.
> Please also remember that the checkpoint meta and counter also
> need to be stored in local file. After then the Flink job could recover
> from the latest checkpoint successfully.
>
> > About the FileSystemHaService
> I am a little skeptical about this feature. Currently, we are using K8s
> deployment for the JobManager. And it is not always guaranteed only
> one JobManager is running. For example, the kubelet is down and never be
> pulled up again. I am trying to work on another ticket "native K8s HA"[1],
> in which we will get a fully functional HA service, including leader
> election/retrieval, jobgraph meta store, checkpoint meta store, running
> registry, etc.
> It could be used for standalone K8s and native K8s deployment.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-12884
>
> Best,
> Yang
>
>
> Khachatryan Roman <kh...@gmail.com> 于2020年8月31日周一 下午8:52写道:
>
> + dev
>
> Blob store is used for jars, serialized job, and task information and logs.
> You can find some information at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>
>
> I guess in your setup, Flink was able to pick up local files.
> HA setup presumes that Flink can survive the loss of that JM host and its
> local files.
>
> I'm not sure about K8s native setup - probably VoidBlobStore is enough if
> there is a persistent volume.
> But in the general case, FileSystemBlobStore should be used to store files
> on some DFS.
>
>
> Regards,
> Roman
>
>
> On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> > Did test with streaming job and FileSystemHaService using VoidBlobStore
> > (no HA Blob), looks like job was able to recover from both JM restart and
> > TM restart. Any idea in what use cases HA Blob is needed?
> >
> > Thanks,
> > Alexey
> > ------------------------------
> > *From:* Alexey Trenikhun <ye...@msn.com>
> > *Sent:* Friday, August 28, 2020 11:31 AM
> > *To:* Khachatryan Roman <kh...@gmail.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Motivation is to have k8s HA setup without extra component - Zookeeper,
> > see [1]
> >
> > Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> > like  if we start job from savepoint, then persistence of BlobStore is
> > not necessary, but is it needed if we recover from checkpoint?
> >
> > Thanks,
> > Alexey
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-17598
> >
> >
> > ------------------------------
> > *From:* Khachatryan Roman <kh...@gmail.com>
> > *Sent:* Friday, August 28, 2020 9:24 AM
> > *To:* Alexey Trenikhun <ye...@msn.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Hello Alexey,
> >
> > I think you need FileSystemBlobStore as you are implementing HA Services,
> > and BLOBs should be highly available too.
> > However, I'm a bit concerned about the direction in general: it
> > essentially means re-implementing ZK functionality on top of FS.
> > What are the motivation and the use case?
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com> wrote:
> >
> > Hello,
> > I'm thinking about implementing FileSystemHaServices - single leader, but
> > persistent RunningJobRegistry, CheckpointIDCounter,
> > CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> > FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> > BlobStore survive JobManager crash. I see that ZookeeperHaServices use
> FileSystemBlobStore,
> > but not clear is to due to having multiple JobManagers (leader +
> follower)
> > or necessity to preserve BLOBs on restart.
> >
> > Thanks,
> > Alexey
> >
> >
>
>

Re: FileSystemHaServices and BlobStore

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Yang,
Yes, I’ve persisted CompletedCheckpointStore, CheckpointIDCounter and RunningJobsRegistry

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>
Sent: Wednesday, September 2, 2020 8:21:20 PM
To: Alexey Trenikhun <ye...@msn.com>
Cc: dev <de...@flink.apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume + FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage. But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <ye...@msn.com>> 于2020年9月2日周三 上午7:36写道:
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <de...@flink.apache.org>>
Cc: Alexey Trenikhun <ye...@msn.com>>; Flink User Mail List <us...@flink.apache.org>>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang


Khachatryan Roman <kh...@gmail.com>> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com>> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <ye...@msn.com>>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <kh...@gmail.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <kh...@gmail.com>>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <ye...@msn.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com>> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>

Re: FileSystemHaServices and BlobStore

Posted by Yang Wang <da...@gmail.com>.
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume +
FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be
built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage.
But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <ye...@msn.com> 于2020年9月2日周三 上午7:36写道:

> Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars,
> RPC messages and log files) only jar files need HA guarantee, and in my
> particular case, job cluster with jar as part of image, it seems doesn't
> matter, I guess it explains why in my test I was able to recover from
> failure even VoidBlobStore. I also use StatefulSet instead of Deployment
>
> Thanks,
> Alexey
>
> ------------------------------
> *From:* Yang Wang <da...@gmail.com>
> *Sent:* Tuesday, September 1, 2020 1:58 AM
> *To:* dev <de...@flink.apache.org>
> *Cc:* Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Glad to hear that your are interested the K8s HA support.
>
> Roman's answer is just on point.
>
> "FileSystemBlobStore" is trying to store the user jars, job graph, etc. on
> the distributed storage(e.g. HDFS, S3, GFS). So when the
> JobManager failover, it could fetch the blob data from remote storage. It
> is very important for standalone and Yarn deployment since
> the local blob store is ephemeral and will be cleaned up after JobManager
> terminated.
>
> However, in your case, benefit from the K8s persistent volume, all the
> local blob data could be recovered after JobManager pod restarted.
> Then you could find that the jobs are recovered and keeps to running.
> Please also remember that the checkpoint meta and counter also
> need to be stored in local file. After then the Flink job could recover
> from the latest checkpoint successfully.
>
> > About the FileSystemHaService
> I am a little skeptical about this feature. Currently, we are using K8s
> deployment for the JobManager. And it is not always guaranteed only
> one JobManager is running. For example, the kubelet is down and never be
> pulled up again. I am trying to work on another ticket "native K8s HA"[1],
> in which we will get a fully functional HA service, including leader
> election/retrieval, jobgraph meta store, checkpoint meta store, running
> registry, etc.
> It could be used for standalone K8s and native K8s deployment.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-12884
>
> Best,
> Yang
>
>
> Khachatryan Roman <kh...@gmail.com> 于2020年8月31日周一 下午8:52写道:
>
> + dev
>
> Blob store is used for jars, serialized job, and task information and logs.
> You can find some information at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>
>
> I guess in your setup, Flink was able to pick up local files.
> HA setup presumes that Flink can survive the loss of that JM host and its
> local files.
>
> I'm not sure about K8s native setup - probably VoidBlobStore is enough if
> there is a persistent volume.
> But in the general case, FileSystemBlobStore should be used to store files
> on some DFS.
>
>
> Regards,
> Roman
>
>
> On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> > Did test with streaming job and FileSystemHaService using VoidBlobStore
> > (no HA Blob), looks like job was able to recover from both JM restart and
> > TM restart. Any idea in what use cases HA Blob is needed?
> >
> > Thanks,
> > Alexey
> > ------------------------------
> > *From:* Alexey Trenikhun <ye...@msn.com>
> > *Sent:* Friday, August 28, 2020 11:31 AM
> > *To:* Khachatryan Roman <kh...@gmail.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Motivation is to have k8s HA setup without extra component - Zookeeper,
> > see [1]
> >
> > Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> > like  if we start job from savepoint, then persistence of BlobStore is
> > not necessary, but is it needed if we recover from checkpoint?
> >
> > Thanks,
> > Alexey
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-17598
> >
> >
> > ------------------------------
> > *From:* Khachatryan Roman <kh...@gmail.com>
> > *Sent:* Friday, August 28, 2020 9:24 AM
> > *To:* Alexey Trenikhun <ye...@msn.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Hello Alexey,
> >
> > I think you need FileSystemBlobStore as you are implementing HA Services,
> > and BLOBs should be highly available too.
> > However, I'm a bit concerned about the direction in general: it
> > essentially means re-implementing ZK functionality on top of FS.
> > What are the motivation and the use case?
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com> wrote:
> >
> > Hello,
> > I'm thinking about implementing FileSystemHaServices - single leader, but
> > persistent RunningJobRegistry, CheckpointIDCounter,
> > CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> > FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> > BlobStore survive JobManager crash. I see that ZookeeperHaServices use
> FileSystemBlobStore,
> > but not clear is to due to having multiple JobManagers (leader +
> follower)
> > or necessity to preserve BLOBs on restart.
> >
> > Thanks,
> > Alexey
> >
> >
>
>

Re: FileSystemHaServices and BlobStore

Posted by Yang Wang <da...@gmail.com>.
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume +
FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be
built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage.
But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun <ye...@msn.com> 于2020年9月2日周三 上午7:36写道:

> Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars,
> RPC messages and log files) only jar files need HA guarantee, and in my
> particular case, job cluster with jar as part of image, it seems doesn't
> matter, I guess it explains why in my test I was able to recover from
> failure even VoidBlobStore. I also use StatefulSet instead of Deployment
>
> Thanks,
> Alexey
>
> ------------------------------
> *From:* Yang Wang <da...@gmail.com>
> *Sent:* Tuesday, September 1, 2020 1:58 AM
> *To:* dev <de...@flink.apache.org>
> *Cc:* Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Glad to hear that your are interested the K8s HA support.
>
> Roman's answer is just on point.
>
> "FileSystemBlobStore" is trying to store the user jars, job graph, etc. on
> the distributed storage(e.g. HDFS, S3, GFS). So when the
> JobManager failover, it could fetch the blob data from remote storage. It
> is very important for standalone and Yarn deployment since
> the local blob store is ephemeral and will be cleaned up after JobManager
> terminated.
>
> However, in your case, benefit from the K8s persistent volume, all the
> local blob data could be recovered after JobManager pod restarted.
> Then you could find that the jobs are recovered and keeps to running.
> Please also remember that the checkpoint meta and counter also
> need to be stored in local file. After then the Flink job could recover
> from the latest checkpoint successfully.
>
> > About the FileSystemHaService
> I am a little skeptical about this feature. Currently, we are using K8s
> deployment for the JobManager. And it is not always guaranteed only
> one JobManager is running. For example, the kubelet is down and never be
> pulled up again. I am trying to work on another ticket "native K8s HA"[1],
> in which we will get a fully functional HA service, including leader
> election/retrieval, jobgraph meta store, checkpoint meta store, running
> registry, etc.
> It could be used for standalone K8s and native K8s deployment.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-12884
>
> Best,
> Yang
>
>
> Khachatryan Roman <kh...@gmail.com> 于2020年8月31日周一 下午8:52写道:
>
> + dev
>
> Blob store is used for jars, serialized job, and task information and logs.
> You can find some information at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>
>
> I guess in your setup, Flink was able to pick up local files.
> HA setup presumes that Flink can survive the loss of that JM host and its
> local files.
>
> I'm not sure about K8s native setup - probably VoidBlobStore is enough if
> there is a persistent volume.
> But in the general case, FileSystemBlobStore should be used to store files
> on some DFS.
>
>
> Regards,
> Roman
>
>
> On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> > Did test with streaming job and FileSystemHaService using VoidBlobStore
> > (no HA Blob), looks like job was able to recover from both JM restart and
> > TM restart. Any idea in what use cases HA Blob is needed?
> >
> > Thanks,
> > Alexey
> > ------------------------------
> > *From:* Alexey Trenikhun <ye...@msn.com>
> > *Sent:* Friday, August 28, 2020 11:31 AM
> > *To:* Khachatryan Roman <kh...@gmail.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Motivation is to have k8s HA setup without extra component - Zookeeper,
> > see [1]
> >
> > Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> > like  if we start job from savepoint, then persistence of BlobStore is
> > not necessary, but is it needed if we recover from checkpoint?
> >
> > Thanks,
> > Alexey
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-17598
> >
> >
> > ------------------------------
> > *From:* Khachatryan Roman <kh...@gmail.com>
> > *Sent:* Friday, August 28, 2020 9:24 AM
> > *To:* Alexey Trenikhun <ye...@msn.com>
> > *Cc:* Flink User Mail List <us...@flink.apache.org>
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Hello Alexey,
> >
> > I think you need FileSystemBlobStore as you are implementing HA Services,
> > and BLOBs should be highly available too.
> > However, I'm a bit concerned about the direction in general: it
> > essentially means re-implementing ZK functionality on top of FS.
> > What are the motivation and the use case?
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com> wrote:
> >
> > Hello,
> > I'm thinking about implementing FileSystemHaServices - single leader, but
> > persistent RunningJobRegistry, CheckpointIDCounter,
> > CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> > FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> > BlobStore survive JobManager crash. I see that ZookeeperHaServices use
> FileSystemBlobStore,
> > but not clear is to due to having multiple JobManagers (leader +
> follower)
> > or necessity to preserve BLOBs on restart.
> >
> > Thanks,
> > Alexey
> >
> >
>
>

Re: FileSystemHaServices and BlobStore

Posted by Alexey Trenikhun <ye...@msn.com>.
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <de...@flink.apache.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang


Khachatryan Roman <kh...@gmail.com>> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com>> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <ye...@msn.com>>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <kh...@gmail.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <kh...@gmail.com>>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <ye...@msn.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com>> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>

Re: FileSystemHaServices and BlobStore

Posted by Alexey Trenikhun <ye...@msn.com>.
Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars, RPC messages and log files) only jar files need HA guarantee, and in my particular case, job cluster with jar as part of image, it seems doesn't matter, I guess it explains why in my test I was able to recover from failure even VoidBlobStore. I also use StatefulSet instead of Deployment

Thanks,
Alexey

________________________________
From: Yang Wang <da...@gmail.com>
Sent: Tuesday, September 1, 2020 1:58 AM
To: dev <de...@flink.apache.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: FileSystemHaServices and BlobStore

Hi Alexey,

Glad to hear that your are interested the K8s HA support.

Roman's answer is just on point.

"FileSystemBlobStore" is trying to store the user jars, job graph, etc. on the distributed storage(e.g. HDFS, S3, GFS). So when the
JobManager failover, it could fetch the blob data from remote storage. It is very important for standalone and Yarn deployment since
the local blob store is ephemeral and will be cleaned up after JobManager terminated.

However, in your case, benefit from the K8s persistent volume, all the local blob data could be recovered after JobManager pod restarted.
Then you could find that the jobs are recovered and keeps to running. Please also remember that the checkpoint meta and counter also
need to be stored in local file. After then the Flink job could recover from the latest checkpoint successfully.

> About the FileSystemHaService
I am a little skeptical about this feature. Currently, we are using K8s deployment for the JobManager. And it is not always guaranteed only
one JobManager is running. For example, the kubelet is down and never be pulled up again. I am trying to work on another ticket "native K8s HA"[1],
in which we will get a fully functional HA service, including leader election/retrieval, jobgraph meta store, checkpoint meta store, running registry, etc.
It could be used for standalone K8s and native K8s deployment.


[1]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang


Khachatryan Roman <kh...@gmail.com>> 于2020年8月31日周一 下午8:52写道:
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun <ye...@msn.com>> wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> ------------------------------
> *From:* Alexey Trenikhun <ye...@msn.com>>
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman <kh...@gmail.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> ------------------------------
> *From:* Khachatryan Roman <kh...@gmail.com>>
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun <ye...@msn.com>>
> *Cc:* Flink User Mail List <us...@flink.apache.org>>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun <ye...@msn.com>> wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>