You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sean Hester <se...@bettercloud.com> on 2019/09/24 14:02:07 UTC

Challenges Deploying Flink With Savepoints On Kubernetes

hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
when deploying Flink jobs to start from savepoints using the job-cluster
mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of
Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all
long-running streaming jobs, all essentially acting as microservices. we're
using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a
savepoint to replay recent events, i.e. when we've enhanced the job logic
or fixed a bug. but after the deployment we want to have the job resume
it's "long-running" behavior, where any unplanned restarts resume from the
latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes
deployment includes the savepoint argument in the configuration. if the Job
Manager container(s) have an unplanned restart, when they come back up they
will start from the savepoint instead of resuming from the latest
checkpoint. everything is working as configured, but that's not exactly
what we want. we want the savepoint argument to be transient somehow (only
used during the initial deployment), but Kubernetes doesn't really support
the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code
in the jobs or custom logic in the container (i.e. a custom entrypoint
script that records that the configured savepoint has already been used in
a file on a persistent volume or GCS, and potentially when/why/by which
deployment). but these seem like unexpected and hacky solutions. before we
head down that road i wanted to ask:

   - is this is already a solved problem that i've missed?
   - is this issue already on the community's radar?

thanks in advance!

-- 
*Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com> <http://www.bettercloud.com>
*Altitude 2019 in San Francisco | Sept. 23 - 25*
It’s not just an IT conference, it’s “a complete learning and networking
experience”
<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
One of the way you should do is, have a separate cluster job manager
program in kubernetes, which is actually managing jobs. So that you can
decouple the job control. While restarting the job, make sure to follow the
below steps:

a) First job manager takes save point by killing the job and notes down the
save point path by using the save point rest api
b) After  that job manager starts the new job by supplying the save point
path. So that it starts from the latest save point.

So that you no need to rely on yaml configuration.

Also above steps helps only for manual restart of the flink job.
There are another 2 cases possible:

case 1 => Your job restarts by it self with the help of flink cluster, then
latest check point is going to take care of the job state, no need to worry
about
case 2 => Your job is failed. Then state is lost. To overcome this, as per
the documentation best thing is: Take periodic save points. So that while
restarting the job from crashes,
provide the argument of latest save point path  as argument to your job
manager program.

So the key is, have a seprate job manager of flink jobs so that you will
have the flexibility

Regards
Bhaskar


On Wed, Sep 25, 2019 at 6:38 PM Sean Hester <se...@bettercloud.com>
wrote:

> thanks for all replies! i'll definitely take a look at the Flink k8s
> Operator project.
>
> i'll try to restate the issue to clarify. this issue is specific to
> starting a job from a savepoint in job-cluster mode. in these cases the Job
> Manager container is configured to run a single Flink job at start-up. the
> savepoint needs to be provided as an argument to the entrypoint. the Flink
> documentation for this approach is here:
>
>
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>
> the issue is that taking this approach means that the job will *always*
> start from the savepoint provided as the start argument in the Kubernetes
> YAML. this includes unplanned restarts of the job manager, but we'd really
> prefer any *unplanned* restarts resume for the most recent checkpoint
> instead of restarting from the configured savepoint. so in a sense we want
> the savepoint argument to be transient, only being used during the initial
> deployment, but this runs counter to the design of Kubernetes which always
> wants to restore a deployment to the "goal state" as defined in the YAML.
>
> i hope this helps. if you want more details please let me know, and thanks
> again for your time.
>
>
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>
>> I think I overlooked it. Good point. I am using Redis to save the path to
>> my savepoint, I might be able to set a TTL to avoid such issue.
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Hi Hao,
>>>
>>> I think he's exactly talking about the usecase where the JM/TM restart
>>> and they come back up from the latest savepoint which might be stale by
>>> that time.
>>>
>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>
>>>> We always make a savepoint before we shutdown the job-cluster. So the
>>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>>> it can resume well.
>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>>> uncaught exception, etc.
>>>>
>>>> Maybe I do not understand your use case well, I do not see a need to
>>>> start from checkpoint after a bug fix.
>>>> From what I know, currently you can use checkpoint as a savepoint as
>>>> well
>>>>
>>>> Hao Sun
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> AFAIK there's currently nothing implemented to solve this problem, but
>>>>> working on a possible fix can be implemented on top of
>>>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>>>> is an issue I've been thinking about as well.
>>>>>
>>>>> Yuval
>>>>>
>>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>>> sean.hester@bettercloud.com> wrote:
>>>>>
>>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>>>> cases when deploying Flink jobs to start from savepoints using the
>>>>>> job-cluster mode in Kubernetes.
>>>>>>
>>>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>>>> are all long-running streaming jobs, all essentially acting as
>>>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>>>
>>>>>> we have a number of use cases where we want to restart jobs from a
>>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>>>> latest checkpoint.
>>>>>>
>>>>>> the issue we run into is that any obvious/standard/idiomatic
>>>>>> Kubernetes deployment includes the savepoint argument in the configuration.
>>>>>> if the Job Manager container(s) have an unplanned restart, when they come
>>>>>> back up they will start from the savepoint instead of resuming from the
>>>>>> latest checkpoint. everything is working as configured, but that's not
>>>>>> exactly what we want. we want the savepoint argument to be transient
>>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't
>>>>>> really support the concept of transient configuration.
>>>>>>
>>>>>> i can see a couple of potential solutions that either involve custom
>>>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>>>> script that records that the configured savepoint has already been used in
>>>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>>>> head down that road i wanted to ask:
>>>>>>
>>>>>>    - is this is already a solved problem that i've missed?
>>>>>>    - is this issue already on the community's radar?
>>>>>>
>>>>>> thanks in advance!
>>>>>>
>>>>>> --
>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>> networking experience”
>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov.
>>>>>
>>>>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com> <http://www.bettercloud.com>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
Thanks you till. We will try to shift to latest flink version.

Regards
Bhaskar

On Mon, Oct 14, 2019 at 7:43 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Vijay,
>
> Flink usually writes first the checkpoint data to disk and then writes the
> pointer to the files to ZooKeeper. Hence, if you see a ZooKeeper entry,
> then the files should be there. I assume that there is no other process
> accessing and potentially removing files from the checkpoint directories,
> right?
>
> Have you tried to run one of the latest Flink versions? Flink 1.6.2 is no
> longer actively supported by the community.
>
> Cheers,
> Till
>
> On Fri, Oct 11, 2019 at 11:39 AM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
>> Apart from these we have other environment and there check point worked
>> fine in HA mode with complete cluster restart. But one of the job we are
>> seeing an issue, in zookeeper the check point path is retrieved and its
>> unable to find the check point path in persistent storage. I am wondering
>> why this would happen first of all?
>> Is there any sync issue between file writing over persistent path and
>> file registration with HA service? For example check point has been
>> registered in zookeeper but has not been written yet while restarting the
>> cluster?  I suspect this kind of problem can happen. We are using flink
>> 1.6.2 in production. Is this an issue already known before and fixed
>> recently
>>
>> Regards
>> Bhaskar
>>
>> On Fri, Oct 11, 2019 at 2:08 PM Vijay Bhaskar <bh...@gmail.com>
>> wrote:
>>
>>> We are seeing below logs in production sometime ago, after that we
>>> stopped HA. Do you people think HA is enabled properly from the below logs?
>>>
>>> Regards
>>> Bhaskar
>>>
>>> 2019-09-24 17:40:17,675 INFO
>>>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>>> Starting ZooKeeperLeaderElectionService
>>> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
>>> 2019-09-24 17:40:17,675 INFO
>>>  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>>  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>>> 2019-09-24 17:40:20,975 WARN  akka.remote.transport.netty.NettyTransport
>>>                    - Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:20,976 WARN  akka.remote.ReliableDeliverySupervisor
>>>                        - Association with remote system [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:23,976 WARN  akka.remote.transport.netty.NettyTransport
>>>                    - Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:23,977 WARN  akka.remote.ReliableDeliverySupervisor
>>>                        - Association with remote system [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:26,982 WARN  akka.remote.transport.netty.NettyTransport
>>>                    - Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:26,983 WARN  akka.remote.ReliableDeliverySupervisor
>>>                        - Association with remote system [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:29,988 WARN  akka.remote.transport.netty.NettyTransport
>>>                    - Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:29,988 WARN  akka.remote.ReliableDeliverySupervisor
>>>                        - Association with remote system [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:32,994 WARN  akka.remote.transport.netty.NettyTransport
>>>                    - Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:32,995 WARN  akka.remote.ReliableDeliverySupervisor
>>>                        - Association with remote system [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:36,000 WARN  akka.remote.transport.netty.NettyTransport
>>>                    - Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>> 2019-09-24 17:40:36,001 WARN  akka.remote.ReliableDeliverySupervisor
>>>                        - Association with remote system [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>>> has failed, address is now gated for [50] ms. Reason: [Association failed
>>> with [akka.tcp:
>>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>>> Caused by: [No route to host]
>>> 2019-09-24 17:40:39,006 WARN  akka.remote.transport.netty.NettyTransport
>>>                    - Remote connection to [null] failed with
>>> java.net.NoRouteToHostException: No route to host
>>>
>>> On Fri, Oct 11, 2019 at 9:39 AM Yun Tang <my...@live.com> wrote:
>>>
>>>> Hi Hao
>>>>
>>>> It seems that I misunderstood the background of usage for your cases.
>>>> High availability configuration targets for fault tolerance not for general
>>>> development evolution. If you want to change your job topology, just follow
>>>> the general rule to restore from savepoint/checkpoint, do not rely on HA to
>>>> do job migration things.
>>>>
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Hao Sun <ha...@zendesk.com>
>>>> *Sent:* Friday, October 11, 2019 8:33
>>>> *To:* Yun Tang <my...@live.com>
>>>> *Cc:* Vijay Bhaskar <bh...@gmail.com>; Yang Wang <
>>>> danrtsey.wy@gmail.com>; Sean Hester <se...@bettercloud.com>;
>>>> Aleksandar Mastilovic <am...@sightmachine.com>; Yuval Itzchakov <
>>>> yuvalos@gmail.com>; user <us...@flink.apache.org>
>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>>
>>>> Yep I know that option. That's where get me confused as well. In a HA
>>>> setup, where do I supply this option (allowNonRestoredState)?
>>>> This option requires a savepoint path when I start a flink job I
>>>> remember. And HA does not require the path
>>>>
>>>> Hao Sun
>>>>
>>>>
>>>> On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <my...@live.com> wrote:
>>>>
>>>> Just a minor supplement @Hao Sun <ha...@zendesk.com>, if you decided
>>>> to drop a operator, don't forget to add --allowNonRestoredState
>>>> (short: -n) option [1]
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>>>>
>>>> Best
>>>> Yun Tang
>>>>
>>>> ------------------------------
>>>> *From:* Vijay Bhaskar <bh...@gmail.com>
>>>> *Sent:* Thursday, October 10, 2019 19:24
>>>> *To:* Yang Wang <da...@gmail.com>
>>>> *Cc:* Sean Hester <se...@bettercloud.com>; Aleksandar Mastilovic
>>>> <am...@sightmachine.com>; Yun Tang <my...@live.com>; Hao Sun <
>>>> hasun@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <
>>>> user@flink.apache.org>
>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>>
>>>> Thanks Yang. We will try and let you know if any issues arise
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>>
>>>> @ Hao Sun,
>>>> I have made a confirmation that even we change parallelism and/or
>>>> modify operators, add new operators,
>>>> the flink cluster could also recover from latest checkpoint.
>>>>
>>>> @ Vijay
>>>> a) Some individual jobmanager/taskmanager crashed
>>>> exceptionally(someother jobmanagers
>>>> and taskmanagers are alive), it could recover from the latest
>>>> checkpoint.
>>>> b) All jobmanagers and taskmanagers fails, it could still recover from
>>>> the latest checkpoint if the cluster-id
>>>> is not changed.
>>>>
>>>> When we enable the HA, The meta of jobgraph and checkpoint is saved on
>>>> zookeeper and the real files are save
>>>> on high-availability storage(HDFS). So when the flink application is
>>>> submitted again with same cluster-id, it could
>>>> recover jobs and checkpoint from zookeeper. I think it has been
>>>> supported for a long time. Maybe you could have a
>>>> try with flink-1.8 or 1.9.
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>>
>>>> Vijay Bhaskar <bh...@gmail.com> 于2019年10月10日周四 下午2:26写道:
>>>>
>>>> Thanks Yang and Sean. I have couple of questions:
>>>>
>>>> 1) Suppose the scenario of , bringing back entire cluster,
>>>>      a) In that case, at least one job manager out of HA group should
>>>> be up and running right? or
>>>>      b) All the job managers fails, then also this works? In that case
>>>> please let me know the procedure/share the documentation?
>>>>          How to start from previous check point?
>>>>          What Flink version onwards this feature is stable?
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>>
>>>> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:
>>>>
>>>> Hi Vijay,
>>>>
>>>> If you are using HA solution, i think you do not need to specify the
>>>> savepoint. Instead the checkpoint is used.
>>>> The checkpoint is done automatically and periodically based on your
>>>> configuration.When the
>>>> jobmanager/taskmanager fails or the whole cluster crashes, it could
>>>> always recover from the latest
>>>> checkpoint. Does this meed your requirement?
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>>>>
>>>> Vijay,
>>>>
>>>> That is my understanding as well: the HA solution only solves the
>>>> problem up to the point all job managers fail/restart at the same time.
>>>> That's where my original concern was.
>>>>
>>>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job
>>>> Managers per cluster--as long as they are all deployed to separate GKE
>>>> nodes--would provide a very high uptime/low failure rate, at least on
>>>> paper. It's a promising enough option that we're going to run in HA for a
>>>> month or two and monitor results before we put in any extra work to
>>>> customize the savepoint start-up behavior.
>>>>
>>>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
>>>> wrote:
>>>>
>>>> I don't think HA will help to recover from cluster crash, for that we
>>>> should take periodic savepoint right? Please correct me in case i am wrong
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <
>>>> bhaskar.ebay77@gmail.com> wrote:
>>>>
>>>> Suppose my cluster got crashed and need to bring up the entire cluster
>>>> back? Does HA still helps to run the cluster from latest save point?
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <
>>>> sean.hester@bettercloud.com> wrote:
>>>>
>>>> thanks to everyone for all the replies.
>>>>
>>>> i think the original concern here with "just" relying on the HA option
>>>> is that there are some disaster recovery and data center migration use
>>>> cases where the continuity of the job managers is difficult to preserve.
>>>> but those are admittedly very edgy use cases. i think it's definitely worth
>>>> reviewing the SLAs with our site reliability engineers to see how likely it
>>>> would be to completely lose all job managers under an HA configuration.
>>>> that small a risk might be acceptable/preferable to a one-off solution.
>>>>
>>>> @Aleksander, would love to learn more about Zookeeper-less HA. i
>>>> think i spotted a thread somewhere between Till and someone (perhaps you)
>>>> about that. feel free to DM me.
>>>>
>>>> thanks again to everyone!
>>>>
>>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi, Aleksandar
>>>>
>>>> Savepoint option in standalone job cluster is optional. If you want to
>>>> always recover
>>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>>> could use the
>>>> high-availability configuration. Make sure the cluster-id is not
>>>> changed, i think the job
>>>> could recover both at exceptionally crash and restart by expectation.
>>>>
>>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also
>>>> have an zookeeper-less high-availability implementation[1].
>>>> Maybe we could have some discussion and contribute this useful feature
>>>> to the community.
>>>>
>>>> [1].
>>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>>> 上午4:11写道:
>>>>
>>>> Would you guys (Flink devs) be interested in our solution for
>>>> zookeeper-less HA? I could ask the managers how they feel about
>>>> open-sourcing the improvement.
>>>>
>>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>>
>>>> As Aleksandar said, k8s with HA configuration could solve your problem.
>>>> There already have some discussion about how to implement such HA in k8s if
>>>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>>>> Currently, you might only have to choose zookeeper as high-availability
>>>> service.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>>
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>>> *Sent:* Thursday, September 26, 2019 1:57
>>>> *To:* Sean Hester <se...@bettercloud.com>
>>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>>>> user <us...@flink.apache.org>
>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>>
>>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>>> left off if you don’t provide a Savepoint.
>>>>
>>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>>>> wrote:
>>>>
>>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>>> Operator project.
>>>>
>>>> i'll try to restate the issue to clarify. this issue is specific to
>>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>>> Manager container is configured to run a single Flink job at start-up. the
>>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>>> documentation for this approach is here:
>>>>
>>>>
>>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>>
>>>> the issue is that taking this approach means that the job will *always*
>>>>  start from the savepoint provided as the start argument in the
>>>> Kubernetes YAML. this includes unplanned restarts of the job manager, but
>>>> we'd really prefer any *unplanned* restarts resume for the most recent
>>>> checkpoint instead of restarting from the configured savepoint. so in a
>>>> sense we want the savepoint argument to be transient, only being used
>>>> during the initial deployment, but this runs counter to the design of
>>>> Kubernetes which always wants to restore a deployment to the "goal state"
>>>> as defined in the YAML.
>>>>
>>>> i hope this helps. if you want more details please let me know, and
>>>> thanks again for your time.
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>>
>>>> I think I overlooked it. Good point. I am using Redis to save the path
>>>> to my savepoint, I might be able to set a TTL to avoid such issue.
>>>>
>>>> Hao Sun
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Hao,
>>>>
>>>> I think he's exactly talking about the usecase where the JM/TM restart
>>>> and they come back up from the latest savepoint which might be stale by
>>>> that time.
>>>>
>>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>>
>>>> We always make a savepoint before we shutdown the job-cluster. So the
>>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>>> it can resume well.
>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>>> uncaught exception, etc.
>>>>
>>>> Maybe I do not understand your use case well, I do not see a need to
>>>> start from checkpoint after a bug fix.
>>>> From what I know, currently you can use checkpoint as a savepoint as
>>>> well
>>>>
>>>> Hao Sun
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>> AFAIK there's currently nothing implemented to solve this problem, but
>>>> working on a possible fix can be implemented on top of
>>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>>> is an issue I've been thinking about as well.
>>>>
>>>> Yuval
>>>>
>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>> sean.hester@bettercloud.com> wrote:
>>>>
>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>> cases when deploying Flink jobs to start from savepoints using the
>>>> job-cluster mode in Kubernetes.
>>>>
>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>> are all long-running streaming jobs, all essentially acting as
>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>
>>>> we have a number of use cases where we want to restart jobs from a
>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>> latest checkpoint.
>>>>
>>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>>> deployment includes the savepoint argument in the configuration. if the Job
>>>> Manager container(s) have an unplanned restart, when they come back up they
>>>> will start from the savepoint instead of resuming from the latest
>>>> checkpoint. everything is working as configured, but that's not exactly
>>>> what we want. we want the savepoint argument to be transient somehow (only
>>>> used during the initial deployment), but Kubernetes doesn't really support
>>>> the concept of transient configuration.
>>>>
>>>> i can see a couple of potential solutions that either involve custom
>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>> script that records that the configured savepoint has already been used in
>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>> head down that road i wanted to ask:
>>>>
>>>>    - is this is already a solved problem that i've missed?
>>>>    - is this issue already on the community's radar?
>>>>
>>>> thanks in advance!
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov.
>>>>
>>>>
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>> *Introducing the BetterCloud Integration Center *
>>>> Automate actions across every app and own SaaSOps
>>>> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>>>>
>>>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Till Rohrmann <tr...@apache.org>.
Hi Vijay,

Flink usually writes first the checkpoint data to disk and then writes the
pointer to the files to ZooKeeper. Hence, if you see a ZooKeeper entry,
then the files should be there. I assume that there is no other process
accessing and potentially removing files from the checkpoint directories,
right?

Have you tried to run one of the latest Flink versions? Flink 1.6.2 is no
longer actively supported by the community.

Cheers,
Till

On Fri, Oct 11, 2019 at 11:39 AM Vijay Bhaskar <bh...@gmail.com>
wrote:

> Apart from these we have other environment and there check point worked
> fine in HA mode with complete cluster restart. But one of the job we are
> seeing an issue, in zookeeper the check point path is retrieved and its
> unable to find the check point path in persistent storage. I am wondering
> why this would happen first of all?
> Is there any sync issue between file writing over persistent path and file
> registration with HA service? For example check point has been registered
> in zookeeper but has not been written yet while restarting the cluster?  I
> suspect this kind of problem can happen. We are using flink 1.6.2 in
> production. Is this an issue already known before and fixed recently
>
> Regards
> Bhaskar
>
> On Fri, Oct 11, 2019 at 2:08 PM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
>> We are seeing below logs in production sometime ago, after that we
>> stopped HA. Do you people think HA is enabled properly from the below logs?
>>
>> Regards
>> Bhaskar
>>
>> 2019-09-24 17:40:17,675 INFO
>>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>> Starting ZooKeeperLeaderElectionService
>> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
>> 2019-09-24 17:40:17,675 INFO
>>  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>> 2019-09-24 17:40:20,975 WARN  akka.remote.transport.netty.NettyTransport
>>                    - Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:20,976 WARN  akka.remote.ReliableDeliverySupervisor
>>                        - Association with remote system [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:23,976 WARN  akka.remote.transport.netty.NettyTransport
>>                    - Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:23,977 WARN  akka.remote.ReliableDeliverySupervisor
>>                        - Association with remote system [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:26,982 WARN  akka.remote.transport.netty.NettyTransport
>>                    - Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:26,983 WARN  akka.remote.ReliableDeliverySupervisor
>>                        - Association with remote system [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:29,988 WARN  akka.remote.transport.netty.NettyTransport
>>                    - Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:29,988 WARN  akka.remote.ReliableDeliverySupervisor
>>                        - Association with remote system [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:32,994 WARN  akka.remote.transport.netty.NettyTransport
>>                    - Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:32,995 WARN  akka.remote.ReliableDeliverySupervisor
>>                        - Association with remote system [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:36,000 WARN  akka.remote.transport.netty.NettyTransport
>>                    - Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>> 2019-09-24 17:40:36,001 WARN  akka.remote.ReliableDeliverySupervisor
>>                        - Association with remote system [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
>> has failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp:
>> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
>> Caused by: [No route to host]
>> 2019-09-24 17:40:39,006 WARN  akka.remote.transport.netty.NettyTransport
>>                    - Remote connection to [null] failed with
>> java.net.NoRouteToHostException: No route to host
>>
>> On Fri, Oct 11, 2019 at 9:39 AM Yun Tang <my...@live.com> wrote:
>>
>>> Hi Hao
>>>
>>> It seems that I misunderstood the background of usage for your cases.
>>> High availability configuration targets for fault tolerance not for general
>>> development evolution. If you want to change your job topology, just follow
>>> the general rule to restore from savepoint/checkpoint, do not rely on HA to
>>> do job migration things.
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Hao Sun <ha...@zendesk.com>
>>> *Sent:* Friday, October 11, 2019 8:33
>>> *To:* Yun Tang <my...@live.com>
>>> *Cc:* Vijay Bhaskar <bh...@gmail.com>; Yang Wang <
>>> danrtsey.wy@gmail.com>; Sean Hester <se...@bettercloud.com>;
>>> Aleksandar Mastilovic <am...@sightmachine.com>; Yuval Itzchakov <
>>> yuvalos@gmail.com>; user <us...@flink.apache.org>
>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>
>>> Yep I know that option. That's where get me confused as well. In a HA
>>> setup, where do I supply this option (allowNonRestoredState)?
>>> This option requires a savepoint path when I start a flink job I
>>> remember. And HA does not require the path
>>>
>>> Hao Sun
>>>
>>>
>>> On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <my...@live.com> wrote:
>>>
>>> Just a minor supplement @Hao Sun <ha...@zendesk.com>, if you decided to
>>> drop a operator, don't forget to add --allowNonRestoredState (short: -n)
>>> option [1]
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>>>
>>> Best
>>> Yun Tang
>>>
>>> ------------------------------
>>> *From:* Vijay Bhaskar <bh...@gmail.com>
>>> *Sent:* Thursday, October 10, 2019 19:24
>>> *To:* Yang Wang <da...@gmail.com>
>>> *Cc:* Sean Hester <se...@bettercloud.com>; Aleksandar Mastilovic <
>>> amastilovic@sightmachine.com>; Yun Tang <my...@live.com>; Hao Sun <
>>> hasun@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <
>>> user@flink.apache.org>
>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>
>>> Thanks Yang. We will try and let you know if any issues arise
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com> wrote:
>>>
>>> @ Hao Sun,
>>> I have made a confirmation that even we change parallelism and/or modify
>>> operators, add new operators,
>>> the flink cluster could also recover from latest checkpoint.
>>>
>>> @ Vijay
>>> a) Some individual jobmanager/taskmanager crashed
>>> exceptionally(someother jobmanagers
>>> and taskmanagers are alive), it could recover from the latest checkpoint.
>>> b) All jobmanagers and taskmanagers fails, it could still recover from
>>> the latest checkpoint if the cluster-id
>>> is not changed.
>>>
>>> When we enable the HA, The meta of jobgraph and checkpoint is saved on
>>> zookeeper and the real files are save
>>> on high-availability storage(HDFS). So when the flink application is
>>> submitted again with same cluster-id, it could
>>> recover jobs and checkpoint from zookeeper. I think it has been
>>> supported for a long time. Maybe you could have a
>>> try with flink-1.8 or 1.9.
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Vijay Bhaskar <bh...@gmail.com> 于2019年10月10日周四 下午2:26写道:
>>>
>>> Thanks Yang and Sean. I have couple of questions:
>>>
>>> 1) Suppose the scenario of , bringing back entire cluster,
>>>      a) In that case, at least one job manager out of HA group should be
>>> up and running right? or
>>>      b) All the job managers fails, then also this works? In that case
>>> please let me know the procedure/share the documentation?
>>>          How to start from previous check point?
>>>          What Flink version onwards this feature is stable?
>>>
>>> Regards
>>> Bhaskar
>>>
>>>
>>> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>> Hi Vijay,
>>>
>>> If you are using HA solution, i think you do not need to specify the
>>> savepoint. Instead the checkpoint is used.
>>> The checkpoint is done automatically and periodically based on your
>>> configuration.When the
>>> jobmanager/taskmanager fails or the whole cluster crashes, it could
>>> always recover from the latest
>>> checkpoint. Does this meed your requirement?
>>>
>>> Best,
>>> Yang
>>>
>>> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>>>
>>> Vijay,
>>>
>>> That is my understanding as well: the HA solution only solves the
>>> problem up to the point all job managers fail/restart at the same time.
>>> That's where my original concern was.
>>>
>>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job
>>> Managers per cluster--as long as they are all deployed to separate GKE
>>> nodes--would provide a very high uptime/low failure rate, at least on
>>> paper. It's a promising enough option that we're going to run in HA for a
>>> month or two and monitor results before we put in any extra work to
>>> customize the savepoint start-up behavior.
>>>
>>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
>>> wrote:
>>>
>>> I don't think HA will help to recover from cluster crash, for that we
>>> should take periodic savepoint right? Please correct me in case i am wrong
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
>>> wrote:
>>>
>>> Suppose my cluster got crashed and need to bring up the entire cluster
>>> back? Does HA still helps to run the cluster from latest save point?
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
>>> wrote:
>>>
>>> thanks to everyone for all the replies.
>>>
>>> i think the original concern here with "just" relying on the HA option
>>> is that there are some disaster recovery and data center migration use
>>> cases where the continuity of the job managers is difficult to preserve.
>>> but those are admittedly very edgy use cases. i think it's definitely worth
>>> reviewing the SLAs with our site reliability engineers to see how likely it
>>> would be to completely lose all job managers under an HA configuration.
>>> that small a risk might be acceptable/preferable to a one-off solution.
>>>
>>> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
>>> spotted a thread somewhere between Till and someone (perhaps you) about
>>> that. feel free to DM me.
>>>
>>> thanks again to everyone!
>>>
>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>> Hi, Aleksandar
>>>
>>> Savepoint option in standalone job cluster is optional. If you want to
>>> always recover
>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>> could use the
>>> high-availability configuration. Make sure the cluster-id is not
>>> changed, i think the job
>>> could recover both at exceptionally crash and restart by expectation.
>>>
>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also have
>>> an zookeeper-less high-availability implementation[1].
>>> Maybe we could have some discussion and contribute this useful feature
>>> to the community.
>>>
>>> [1].
>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>
>>> Best,
>>> Yang
>>>
>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>> 上午4:11写道:
>>>
>>> Would you guys (Flink devs) be interested in our solution for
>>> zookeeper-less HA? I could ask the managers how they feel about
>>> open-sourcing the improvement.
>>>
>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>
>>> As Aleksandar said, k8s with HA configuration could solve your problem.
>>> There already have some discussion about how to implement such HA in k8s if
>>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>>> Currently, you might only have to choose zookeeper as high-availability
>>> service.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>> *Sent:* Thursday, September 26, 2019 1:57
>>> *To:* Sean Hester <se...@bettercloud.com>
>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>>> user <us...@flink.apache.org>
>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>
>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>> left off if you don’t provide a Savepoint.
>>>
>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>>> wrote:
>>>
>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>> Operator project.
>>>
>>> i'll try to restate the issue to clarify. this issue is specific to
>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>> Manager container is configured to run a single Flink job at start-up. the
>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>> documentation for this approach is here:
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>
>>> the issue is that taking this approach means that the job will *always* start
>>> from the savepoint provided as the start argument in the Kubernetes YAML.
>>> this includes unplanned restarts of the job manager, but we'd really prefer
>>> any *unplanned* restarts resume for the most recent checkpoint instead
>>> of restarting from the configured savepoint. so in a sense we want the
>>> savepoint argument to be transient, only being used during the initial
>>> deployment, but this runs counter to the design of Kubernetes which always
>>> wants to restore a deployment to the "goal state" as defined in the YAML.
>>>
>>> i hope this helps. if you want more details please let me know, and
>>> thanks again for your time.
>>>
>>>
>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>
>>> I think I overlooked it. Good point. I am using Redis to save the path
>>> to my savepoint, I might be able to set a TTL to avoid such issue.
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>> Hi Hao,
>>>
>>> I think he's exactly talking about the usecase where the JM/TM restart
>>> and they come back up from the latest savepoint which might be stale by
>>> that time.
>>>
>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>
>>> We always make a savepoint before we shutdown the job-cluster. So the
>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>> it can resume well.
>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>> uncaught exception, etc.
>>>
>>> Maybe I do not understand your use case well, I do not see a need to
>>> start from checkpoint after a bug fix.
>>> From what I know, currently you can use checkpoint as a savepoint as well
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>> AFAIK there's currently nothing implemented to solve this problem, but
>>> working on a possible fix can be implemented on top of
>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>> is an issue I've been thinking about as well.
>>>
>>> Yuval
>>>
>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
>>> wrote:
>>>
>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>>> when deploying Flink jobs to start from savepoints using the job-cluster
>>> mode in Kubernetes.
>>>
>>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>>> all long-running streaming jobs, all essentially acting as microservices.
>>> we're using Helm charts to configure all of our deployments.
>>>
>>> we have a number of use cases where we want to restart jobs from a
>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>> or fixed a bug. but after the deployment we want to have the job resume
>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>> latest checkpoint.
>>>
>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>> deployment includes the savepoint argument in the configuration. if the Job
>>> Manager container(s) have an unplanned restart, when they come back up they
>>> will start from the savepoint instead of resuming from the latest
>>> checkpoint. everything is working as configured, but that's not exactly
>>> what we want. we want the savepoint argument to be transient somehow (only
>>> used during the initial deployment), but Kubernetes doesn't really support
>>> the concept of transient configuration.
>>>
>>> i can see a couple of potential solutions that either involve custom
>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>> script that records that the configured savepoint has already been used in
>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>> deployment). but these seem like unexpected and hacky solutions. before we
>>> head down that road i wanted to ask:
>>>
>>>    - is this is already a solved problem that i've missed?
>>>    - is this issue already on the community's radar?
>>>
>>> thanks in advance!
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>>
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>
>>>
>>>
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>
>>>
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>> *Introducing the BetterCloud Integration Center *
>>> Automate actions across every app and own SaaSOps
>>> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>>>
>>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
Apart from these we have other environment and there check point worked
fine in HA mode with complete cluster restart. But one of the job we are
seeing an issue, in zookeeper the check point path is retrieved and its
unable to find the check point path in persistent storage. I am wondering
why this would happen first of all?
Is there any sync issue between file writing over persistent path and file
registration with HA service? For example check point has been registered
in zookeeper but has not been written yet while restarting the cluster?  I
suspect this kind of problem can happen. We are using flink 1.6.2 in
production. Is this an issue already known before and fixed recently

Regards
Bhaskar

On Fri, Oct 11, 2019 at 2:08 PM Vijay Bhaskar <bh...@gmail.com>
wrote:

> We are seeing below logs in production sometime ago, after that we stopped
> HA. Do you people think HA is enabled properly from the below logs?
>
> Regards
> Bhaskar
>
> 2019-09-24 17:40:17,675 INFO
>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
> 2019-09-24 17:40:17,675 INFO
>  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
> 2019-09-24 17:40:20,975 WARN  akka.remote.transport.netty.NettyTransport
>                    - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
> 2019-09-24 17:40:20,976 WARN  akka.remote.ReliableDeliverySupervisor
>                        - Association with remote system [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
> Caused by: [No route to host]
> 2019-09-24 17:40:23,976 WARN  akka.remote.transport.netty.NettyTransport
>                    - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
> 2019-09-24 17:40:23,977 WARN  akka.remote.ReliableDeliverySupervisor
>                        - Association with remote system [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
> Caused by: [No route to host]
> 2019-09-24 17:40:26,982 WARN  akka.remote.transport.netty.NettyTransport
>                    - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
> 2019-09-24 17:40:26,983 WARN  akka.remote.ReliableDeliverySupervisor
>                        - Association with remote system [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
> Caused by: [No route to host]
> 2019-09-24 17:40:29,988 WARN  akka.remote.transport.netty.NettyTransport
>                    - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
> 2019-09-24 17:40:29,988 WARN  akka.remote.ReliableDeliverySupervisor
>                        - Association with remote system [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
> Caused by: [No route to host]
> 2019-09-24 17:40:32,994 WARN  akka.remote.transport.netty.NettyTransport
>                    - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
> 2019-09-24 17:40:32,995 WARN  akka.remote.ReliableDeliverySupervisor
>                        - Association with remote system [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
> Caused by: [No route to host]
> 2019-09-24 17:40:36,000 WARN  akka.remote.transport.netty.NettyTransport
>                    - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
> 2019-09-24 17:40:36,001 WARN  akka.remote.ReliableDeliverySupervisor
>                        - Association with remote system [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp:
> //flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
> Caused by: [No route to host]
> 2019-09-24 17:40:39,006 WARN  akka.remote.transport.netty.NettyTransport
>                    - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
>
> On Fri, Oct 11, 2019 at 9:39 AM Yun Tang <my...@live.com> wrote:
>
>> Hi Hao
>>
>> It seems that I misunderstood the background of usage for your cases.
>> High availability configuration targets for fault tolerance not for general
>> development evolution. If you want to change your job topology, just follow
>> the general rule to restore from savepoint/checkpoint, do not rely on HA to
>> do job migration things.
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Hao Sun <ha...@zendesk.com>
>> *Sent:* Friday, October 11, 2019 8:33
>> *To:* Yun Tang <my...@live.com>
>> *Cc:* Vijay Bhaskar <bh...@gmail.com>; Yang Wang <
>> danrtsey.wy@gmail.com>; Sean Hester <se...@bettercloud.com>;
>> Aleksandar Mastilovic <am...@sightmachine.com>; Yuval Itzchakov <
>> yuvalos@gmail.com>; user <us...@flink.apache.org>
>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>
>> Yep I know that option. That's where get me confused as well. In a HA
>> setup, where do I supply this option (allowNonRestoredState)?
>> This option requires a savepoint path when I start a flink job I
>> remember. And HA does not require the path
>>
>> Hao Sun
>>
>>
>> On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <my...@live.com> wrote:
>>
>> Just a minor supplement @Hao Sun <ha...@zendesk.com>, if you decided to
>> drop a operator, don't forget to add --allowNonRestoredState (short: -n)
>> option [1]
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>>
>> Best
>> Yun Tang
>>
>> ------------------------------
>> *From:* Vijay Bhaskar <bh...@gmail.com>
>> *Sent:* Thursday, October 10, 2019 19:24
>> *To:* Yang Wang <da...@gmail.com>
>> *Cc:* Sean Hester <se...@bettercloud.com>; Aleksandar Mastilovic <
>> amastilovic@sightmachine.com>; Yun Tang <my...@live.com>; Hao Sun <
>> hasun@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <
>> user@flink.apache.org>
>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>
>> Thanks Yang. We will try and let you know if any issues arise
>>
>> Regards
>> Bhaskar
>>
>> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com> wrote:
>>
>> @ Hao Sun,
>> I have made a confirmation that even we change parallelism and/or modify
>> operators, add new operators,
>> the flink cluster could also recover from latest checkpoint.
>>
>> @ Vijay
>> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
>> jobmanagers
>> and taskmanagers are alive), it could recover from the latest checkpoint.
>> b) All jobmanagers and taskmanagers fails, it could still recover from
>> the latest checkpoint if the cluster-id
>> is not changed.
>>
>> When we enable the HA, The meta of jobgraph and checkpoint is saved on
>> zookeeper and the real files are save
>> on high-availability storage(HDFS). So when the flink application is
>> submitted again with same cluster-id, it could
>> recover jobs and checkpoint from zookeeper. I think it has been supported
>> for a long time. Maybe you could have a
>> try with flink-1.8 or 1.9.
>>
>> Best,
>> Yang
>>
>>
>> Vijay Bhaskar <bh...@gmail.com> 于2019年10月10日周四 下午2:26写道:
>>
>> Thanks Yang and Sean. I have couple of questions:
>>
>> 1) Suppose the scenario of , bringing back entire cluster,
>>      a) In that case, at least one job manager out of HA group should be
>> up and running right? or
>>      b) All the job managers fails, then also this works? In that case
>> please let me know the procedure/share the documentation?
>>          How to start from previous check point?
>>          What Flink version onwards this feature is stable?
>>
>> Regards
>> Bhaskar
>>
>>
>> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:
>>
>> Hi Vijay,
>>
>> If you are using HA solution, i think you do not need to specify the
>> savepoint. Instead the checkpoint is used.
>> The checkpoint is done automatically and periodically based on your
>> configuration.When the
>> jobmanager/taskmanager fails or the whole cluster crashes, it could
>> always recover from the latest
>> checkpoint. Does this meed your requirement?
>>
>> Best,
>> Yang
>>
>> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>>
>> Vijay,
>>
>> That is my understanding as well: the HA solution only solves the problem
>> up to the point all job managers fail/restart at the same time. That's
>> where my original concern was.
>>
>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
>> per cluster--as long as they are all deployed to separate GKE nodes--would
>> provide a very high uptime/low failure rate, at least on paper. It's a
>> promising enough option that we're going to run in HA for a month or two
>> and monitor results before we put in any extra work to customize the
>> savepoint start-up behavior.
>>
>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
>> wrote:
>>
>> I don't think HA will help to recover from cluster crash, for that we
>> should take periodic savepoint right? Please correct me in case i am wrong
>>
>> Regards
>> Bhaskar
>>
>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
>> wrote:
>>
>> Suppose my cluster got crashed and need to bring up the entire cluster
>> back? Does HA still helps to run the cluster from latest save point?
>>
>> Regards
>> Bhaskar
>>
>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
>> wrote:
>>
>> thanks to everyone for all the replies.
>>
>> i think the original concern here with "just" relying on the HA option is
>> that there are some disaster recovery and data center migration use cases
>> where the continuity of the job managers is difficult to preserve. but
>> those are admittedly very edgy use cases. i think it's definitely worth
>> reviewing the SLAs with our site reliability engineers to see how likely it
>> would be to completely lose all job managers under an HA configuration.
>> that small a risk might be acceptable/preferable to a one-off solution.
>>
>> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
>> spotted a thread somewhere between Till and someone (perhaps you) about
>> that. feel free to DM me.
>>
>> thanks again to everyone!
>>
>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:
>>
>> Hi, Aleksandar
>>
>> Savepoint option in standalone job cluster is optional. If you want to
>> always recover
>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>> could use the
>> high-availability configuration. Make sure the cluster-id is not changed,
>> i think the job
>> could recover both at exceptionally crash and restart by expectation.
>>
>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also have
>> an zookeeper-less high-availability implementation[1].
>> Maybe we could have some discussion and contribute this useful feature to
>> the community.
>>
>> [1].
>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>
>> Best,
>> Yang
>>
>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>> 上午4:11写道:
>>
>> Would you guys (Flink devs) be interested in our solution for
>> zookeeper-less HA? I could ask the managers how they feel about
>> open-sourcing the improvement.
>>
>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>
>> As Aleksandar said, k8s with HA configuration could solve your problem.
>> There already have some discussion about how to implement such HA in k8s if
>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>> Currently, you might only have to choose zookeeper as high-availability
>> service.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>> *Sent:* Thursday, September 26, 2019 1:57
>> *To:* Sean Hester <se...@bettercloud.com>
>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>> user <us...@flink.apache.org>
>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>
>> Can’t you simply use JobManager in HA mode? It would pick up where it
>> left off if you don’t provide a Savepoint.
>>
>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>> wrote:
>>
>> thanks for all replies! i'll definitely take a look at the Flink k8s
>> Operator project.
>>
>> i'll try to restate the issue to clarify. this issue is specific to
>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>> Manager container is configured to run a single Flink job at start-up. the
>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>> documentation for this approach is here:
>>
>>
>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>
>> the issue is that taking this approach means that the job will *always* start
>> from the savepoint provided as the start argument in the Kubernetes YAML.
>> this includes unplanned restarts of the job manager, but we'd really prefer
>> any *unplanned* restarts resume for the most recent checkpoint instead
>> of restarting from the configured savepoint. so in a sense we want the
>> savepoint argument to be transient, only being used during the initial
>> deployment, but this runs counter to the design of Kubernetes which always
>> wants to restore a deployment to the "goal state" as defined in the YAML.
>>
>> i hope this helps. if you want more details please let me know, and
>> thanks again for your time.
>>
>>
>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>
>> I think I overlooked it. Good point. I am using Redis to save the path to
>> my savepoint, I might be able to set a TTL to avoid such issue.
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>> Hi Hao,
>>
>> I think he's exactly talking about the usecase where the JM/TM restart
>> and they come back up from the latest savepoint which might be stale by
>> that time.
>>
>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>
>> We always make a savepoint before we shutdown the job-cluster. So the
>> savepoint is always the latest. When we fix a bug or change the job graph,
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>> uncaught exception, etc.
>>
>> Maybe I do not understand your use case well, I do not see a need to
>> start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>> AFAIK there's currently nothing implemented to solve this problem, but
>> working on a possible fix can be implemented on top of
>> https://github.com/lyft/flinkk8soperator which already has a pretty
>> fancy state machine for rolling upgrades. I'd love to be involved as this
>> is an issue I've been thinking about as well.
>>
>> Yuval
>>
>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
>> wrote:
>>
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>> when deploying Flink jobs to start from savepoints using the job-cluster
>> mode in Kubernetes.
>>
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>> all long-running streaming jobs, all essentially acting as microservices.
>> we're using Helm charts to configure all of our deployments.
>>
>> we have a number of use cases where we want to restart jobs from a
>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>> or fixed a bug. but after the deployment we want to have the job resume
>> it's "long-running" behavior, where any unplanned restarts resume from the
>> latest checkpoint.
>>
>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>> deployment includes the savepoint argument in the configuration. if the Job
>> Manager container(s) have an unplanned restart, when they come back up they
>> will start from the savepoint instead of resuming from the latest
>> checkpoint. everything is working as configured, but that's not exactly
>> what we want. we want the savepoint argument to be transient somehow (only
>> used during the initial deployment), but Kubernetes doesn't really support
>> the concept of transient configuration.
>>
>> i can see a couple of potential solutions that either involve custom code
>> in the jobs or custom logic in the container (i.e. a custom entrypoint
>> script that records that the configured savepoint has already been used in
>> a file on a persistent volume or GCS, and potentially when/why/by which
>> deployment). but these seem like unexpected and hacky solutions. before we
>> head down that road i wanted to ask:
>>
>>    - is this is already a solved problem that i've missed?
>>    - is this issue already on the community's radar?
>>
>> thanks in advance!
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience”
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>>
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience”
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>>
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience”
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>> *Introducing the BetterCloud Integration Center *
>> Automate actions across every app and own SaaSOps
>> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>>
>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
We are seeing below logs in production sometime ago, after that we stopped
HA. Do you people think HA is enabled properly from the below logs?

Regards
Bhaskar

2019-09-24 17:40:17,675 INFO
 org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
Starting ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
2019-09-24 17:40:17,675 INFO
 org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-09-24 17:40:20,975 WARN  akka.remote.transport.netty.NettyTransport
                   - Remote connection to [null] failed with
java.net.NoRouteToHostException: No route to host
2019-09-24 17:40:20,976 WARN  akka.remote.ReliableDeliverySupervisor
                       - Association with remote system [akka.tcp:
//flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
Caused by: [No route to host]
2019-09-24 17:40:23,976 WARN  akka.remote.transport.netty.NettyTransport
                   - Remote connection to [null] failed with
java.net.NoRouteToHostException: No route to host
2019-09-24 17:40:23,977 WARN  akka.remote.ReliableDeliverySupervisor
                       - Association with remote system [akka.tcp:
//flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
Caused by: [No route to host]
2019-09-24 17:40:26,982 WARN  akka.remote.transport.netty.NettyTransport
                   - Remote connection to [null] failed with
java.net.NoRouteToHostException: No route to host
2019-09-24 17:40:26,983 WARN  akka.remote.ReliableDeliverySupervisor
                       - Association with remote system [akka.tcp:
//flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
Caused by: [No route to host]
2019-09-24 17:40:29,988 WARN  akka.remote.transport.netty.NettyTransport
                   - Remote connection to [null] failed with
java.net.NoRouteToHostException: No route to host
2019-09-24 17:40:29,988 WARN  akka.remote.ReliableDeliverySupervisor
                       - Association with remote system [akka.tcp:
//flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
Caused by: [No route to host]
2019-09-24 17:40:32,994 WARN  akka.remote.transport.netty.NettyTransport
                   - Remote connection to [null] failed with
java.net.NoRouteToHostException: No route to host
2019-09-24 17:40:32,995 WARN  akka.remote.ReliableDeliverySupervisor
                       - Association with remote system [akka.tcp:
//flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
Caused by: [No route to host]
2019-09-24 17:40:36,000 WARN  akka.remote.transport.netty.NettyTransport
                   - Remote connection to [null] failed with
java.net.NoRouteToHostException: No route to host
2019-09-24 17:40:36,001 WARN  akka.remote.ReliableDeliverySupervisor
                       - Association with remote system [akka.tcp:
//flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@service-flink-jobmanager-1.cloudsecure.svc.cluster.local:6123]]
Caused by: [No route to host]
2019-09-24 17:40:39,006 WARN  akka.remote.transport.netty.NettyTransport
                   - Remote connection to [null] failed with
java.net.NoRouteToHostException: No route to host

On Fri, Oct 11, 2019 at 9:39 AM Yun Tang <my...@live.com> wrote:

> Hi Hao
>
> It seems that I misunderstood the background of usage for your cases. High
> availability configuration targets for fault tolerance not for general
> development evolution. If you want to change your job topology, just follow
> the general rule to restore from savepoint/checkpoint, do not rely on HA to
> do job migration things.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Hao Sun <ha...@zendesk.com>
> *Sent:* Friday, October 11, 2019 8:33
> *To:* Yun Tang <my...@live.com>
> *Cc:* Vijay Bhaskar <bh...@gmail.com>; Yang Wang <
> danrtsey.wy@gmail.com>; Sean Hester <se...@bettercloud.com>;
> Aleksandar Mastilovic <am...@sightmachine.com>; Yuval Itzchakov <
> yuvalos@gmail.com>; user <us...@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Yep I know that option. That's where get me confused as well. In a HA
> setup, where do I supply this option (allowNonRestoredState)?
> This option requires a savepoint path when I start a flink job I remember.
> And HA does not require the path
>
> Hao Sun
>
>
> On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <my...@live.com> wrote:
>
> Just a minor supplement @Hao Sun <ha...@zendesk.com>, if you decided to
> drop a operator, don't forget to add --allowNonRestoredState (short: -n)
> option [1]
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Vijay Bhaskar <bh...@gmail.com>
> *Sent:* Thursday, October 10, 2019 19:24
> *To:* Yang Wang <da...@gmail.com>
> *Cc:* Sean Hester <se...@bettercloud.com>; Aleksandar Mastilovic <
> amastilovic@sightmachine.com>; Yun Tang <my...@live.com>; Hao Sun <
> hasun@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <
> user@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Thanks Yang. We will try and let you know if any issues arise
>
> Regards
> Bhaskar
>
> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com> wrote:
>
> @ Hao Sun,
> I have made a confirmation that even we change parallelism and/or modify
> operators, add new operators,
> the flink cluster could also recover from latest checkpoint.
>
> @ Vijay
> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
> jobmanagers
> and taskmanagers are alive), it could recover from the latest checkpoint.
> b) All jobmanagers and taskmanagers fails, it could still recover from the
> latest checkpoint if the cluster-id
> is not changed.
>
> When we enable the HA, The meta of jobgraph and checkpoint is saved on
> zookeeper and the real files are save
> on high-availability storage(HDFS). So when the flink application is
> submitted again with same cluster-id, it could
> recover jobs and checkpoint from zookeeper. I think it has been supported
> for a long time. Maybe you could have a
> try with flink-1.8 or 1.9.
>
> Best,
> Yang
>
>
> Vijay Bhaskar <bh...@gmail.com> 于2019年10月10日周四 下午2:26写道:
>
> Thanks Yang and Sean. I have couple of questions:
>
> 1) Suppose the scenario of , bringing back entire cluster,
>      a) In that case, at least one job manager out of HA group should be
> up and running right? or
>      b) All the job managers fails, then also this works? In that case
> please let me know the procedure/share the documentation?
>          How to start from previous check point?
>          What Flink version onwards this feature is stable?
>
> Regards
> Bhaskar
>
>
> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:
>
> Hi Vijay,
>
> If you are using HA solution, i think you do not need to specify the
> savepoint. Instead the checkpoint is used.
> The checkpoint is done automatically and periodically based on your
> configuration.When the
> jobmanager/taskmanager fails or the whole cluster crashes, it could always
> recover from the latest
> checkpoint. Does this meed your requirement?
>
> Best,
> Yang
>
> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>
> Vijay,
>
> That is my understanding as well: the HA solution only solves the problem
> up to the point all job managers fail/restart at the same time. That's
> where my original concern was.
>
> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
> per cluster--as long as they are all deployed to separate GKE nodes--would
> provide a very high uptime/low failure rate, at least on paper. It's a
> promising enough option that we're going to run in HA for a month or two
> and monitor results before we put in any extra work to customize the
> savepoint start-up behavior.
>
> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
> I don't think HA will help to recover from cluster crash, for that we
> should take periodic savepoint right? Please correct me in case i am wrong
>
> Regards
> Bhaskar
>
> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
> wrote:
>
> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very edgy use cases. i think it's definitely worth
> reviewing the SLAs with our site reliability engineers to see how likely it
> would be to completely lose all job managers under an HA configuration.
> that small a risk might be acceptable/preferable to a one-off solution.
>
> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
> spotted a thread somewhere between Till and someone (perhaps you) about
> that. feel free to DM me.
>
> thanks again to everyone!
>
> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:
>
> Hi, Aleksandar
>
> Savepoint option in standalone job cluster is optional. If you want to
> always recover
> from the latest checkpoint, just as Aleksandar and Yun Tang said you could
> use the
> high-availability configuration. Make sure the cluster-id is not changed,
> i think the job
> could recover both at exceptionally crash and restart by expectation.
>
> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also have
> an zookeeper-less high-availability implementation[1].
> Maybe we could have some discussion and contribute this useful feature to
> the community.
>
> [1].
> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>
> Best,
> Yang
>
> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
> 上午4:11写道:
>
> Would you guys (Flink devs) be interested in our solution for
> zookeeper-less HA? I could ask the managers how they feel about
> open-sourcing the improvement.
>
> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>
> As Aleksandar said, k8s with HA configuration could solve your problem.
> There already have some discussion about how to implement such HA in k8s if
> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
> Currently, you might only have to choose zookeeper as high-availability
> service.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11105
> [2] https://issues.apache.org/jira/browse/FLINK-12884
>
> Best
> Yun Tang
> ------------------------------
> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
> *Sent:* Thursday, September 26, 2019 1:57
> *To:* Sean Hester <se...@bettercloud.com>
> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
> user <us...@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Can’t you simply use JobManager in HA mode? It would pick up where it left
> off if you don’t provide a Savepoint.
>
> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
> wrote:
>
> thanks for all replies! i'll definitely take a look at the Flink k8s
> Operator project.
>
> i'll try to restate the issue to clarify. this issue is specific to
> starting a job from a savepoint in job-cluster mode. in these cases the Job
> Manager container is configured to run a single Flink job at start-up. the
> savepoint needs to be provided as an argument to the entrypoint. the Flink
> documentation for this approach is here:
>
>
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>
> the issue is that taking this approach means that the job will *always* start
> from the savepoint provided as the start argument in the Kubernetes YAML.
> this includes unplanned restarts of the job manager, but we'd really prefer
> any *unplanned* restarts resume for the most recent checkpoint instead of
> restarting from the configured savepoint. so in a sense we want the
> savepoint argument to be transient, only being used during the initial
> deployment, but this runs counter to the design of Kubernetes which always
> wants to restore a deployment to the "goal state" as defined in the YAML.
>
> i hope this helps. if you want more details please let me know, and thanks
> again for your time.
>
>
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>
> I think I overlooked it. Good point. I am using Redis to save the path to
> my savepoint, I might be able to set a TTL to avoid such issue.
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>
> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
>
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator which already has a pretty fancy
> state machine for rolling upgrades. I'd love to be involved as this is an
> issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
> wrote:
>
> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> when deploying Flink jobs to start from savepoints using the job-cluster
> mode in Kubernetes.
>
> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> all long-running streaming jobs, all essentially acting as microservices.
> we're using Helm charts to configure all of our deployments.
>
> we have a number of use cases where we want to restart jobs from a
> savepoint to replay recent events, i.e. when we've enhanced the job logic
> or fixed a bug. but after the deployment we want to have the job resume
> it's "long-running" behavior, where any unplanned restarts resume from the
> latest checkpoint.
>
> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> deployment includes the savepoint argument in the configuration. if the Job
> Manager container(s) have an unplanned restart, when they come back up they
> will start from the savepoint instead of resuming from the latest
> checkpoint. everything is working as configured, but that's not exactly
> what we want. we want the savepoint argument to be transient somehow (only
> used during the initial deployment), but Kubernetes doesn't really support
> the concept of transient configuration.
>
> i can see a couple of potential solutions that either involve custom code
> in the jobs or custom logic in the container (i.e. a custom entrypoint
> script that records that the configured savepoint has already been used in
> a file on a persistent volume or GCS, and potentially when/why/by which
> deployment). but these seem like unexpected and hacky solutions. before we
> head down that road i wanted to ask:
>
>    - is this is already a solved problem that i've missed?
>    - is this issue already on the community's radar?
>
> thanks in advance!
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com> <http://www.bettercloud.com>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com> <http://www.bettercloud.com>
> *Introducing the BetterCloud Integration Center *
> Automate actions across every app and own SaaSOps
> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yun Tang <my...@live.com>.
Hi Hao

It seems that I misunderstood the background of usage for your cases. High availability configuration targets for fault tolerance not for general development evolution. If you want to change your job topology, just follow the general rule to restore from savepoint/checkpoint, do not rely on HA to do job migration things.

Best
Yun Tang
________________________________
From: Hao Sun <ha...@zendesk.com>
Sent: Friday, October 11, 2019 8:33
To: Yun Tang <my...@live.com>
Cc: Vijay Bhaskar <bh...@gmail.com>; Yang Wang <da...@gmail.com>; Sean Hester <se...@bettercloud.com>; Aleksandar Mastilovic <am...@sightmachine.com>; Yuval Itzchakov <yu...@gmail.com>; user <us...@flink.apache.org>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yep I know that option. That's where get me confused as well. In a HA setup, where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember. And HA does not require the path

Hao Sun


On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <my...@live.com>> wrote:
Just a minor supplement @Hao Sun<ma...@zendesk.com>, if you decided to drop a operator, don't forget to add --allowNonRestoredState (short: -n) option [1]


[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang

________________________________
From: Vijay Bhaskar <bh...@gmail.com>>
Sent: Thursday, October 10, 2019 19:24
To: Yang Wang <da...@gmail.com>>
Cc: Sean Hester <se...@bettercloud.com>>; Aleksandar Mastilovic <am...@sightmachine.com>>; Yun Tang <my...@live.com>>; Hao Sun <ha...@zendesk.com>>; Yuval Itzchakov <yu...@gmail.com>>; user <us...@flink.apache.org>>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com>> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar <bh...@gmail.com>> 于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
     a) In that case, at least one job manager out of HA group should be up and running right? or
     b) All the job managers fails, then also this works? In that case please let me know the procedure/share the documentation?
         How to start from previous check point?
         What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com>> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <se...@bettercloud.com>> 于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was.

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com>> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the
high-availability configuration. Make sure the cluster-id is not changed, i think the job
could recover both at exceptionally crash and restart by expectation.

@Aleksandar Mastilovic<ma...@sightmachine.com>, we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.

[1]. https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit

Best,
Yang

Aleksandar Mastilovic <am...@sightmachine.com>> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com>> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.

[1] https://issues.apache.org/jira/browse/FLINK-11105
[2] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang
________________________________
From: Aleksandar Mastilovic <am...@sightmachine.com>>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <se...@bettercloud.com>>
Cc: Hao Sun <ha...@zendesk.com>>; Yuval Itzchakov <yu...@gmail.com>>; user <us...@flink.apache.org>>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:

https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com>> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com>> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:

  *   is this is already a solved problem that i've missed?
  *   is this issue already on the community's radar?

thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com/>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com/>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



--
Best Regards,
Yuval Itzchakov.


--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com/>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com/>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com>
Introducing the BetterCloud Integration Center
Automate actions across every app and own SaaSOps<https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Hao Sun <ha...@zendesk.com>.
Yep I know that option. That's where get me confused as well. In a HA
setup, where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember.
And HA does not require the path

Hao Sun


On Thu, Oct 10, 2019 at 11:16 AM Yun Tang <my...@live.com> wrote:

> Just a minor supplement @Hao Sun <ha...@zendesk.com>, if you decided to
> drop a operator, don't forget to add --allowNonRestoredState (short: -n)
> option [1]
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state>
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Vijay Bhaskar <bh...@gmail.com>
> *Sent:* Thursday, October 10, 2019 19:24
> *To:* Yang Wang <da...@gmail.com>
> *Cc:* Sean Hester <se...@bettercloud.com>; Aleksandar Mastilovic <
> amastilovic@sightmachine.com>; Yun Tang <my...@live.com>; Hao Sun <
> hasun@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <
> user@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Thanks Yang. We will try and let you know if any issues arise
>
> Regards
> Bhaskar
>
> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com> wrote:
>
> @ Hao Sun,
> I have made a confirmation that even we change parallelism and/or modify
> operators, add new operators,
> the flink cluster could also recover from latest checkpoint.
>
> @ Vijay
> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
> jobmanagers
> and taskmanagers are alive), it could recover from the latest checkpoint.
> b) All jobmanagers and taskmanagers fails, it could still recover from the
> latest checkpoint if the cluster-id
> is not changed.
>
> When we enable the HA, The meta of jobgraph and checkpoint is saved on
> zookeeper and the real files are save
> on high-availability storage(HDFS). So when the flink application is
> submitted again with same cluster-id, it could
> recover jobs and checkpoint from zookeeper. I think it has been supported
> for a long time. Maybe you could have a
> try with flink-1.8 or 1.9.
>
> Best,
> Yang
>
>
> Vijay Bhaskar <bh...@gmail.com> 于2019年10月10日周四 下午2:26写道:
>
> Thanks Yang and Sean. I have couple of questions:
>
> 1) Suppose the scenario of , bringing back entire cluster,
>      a) In that case, at least one job manager out of HA group should be
> up and running right? or
>      b) All the job managers fails, then also this works? In that case
> please let me know the procedure/share the documentation?
>          How to start from previous check point?
>          What Flink version onwards this feature is stable?
>
> Regards
> Bhaskar
>
>
> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:
>
> Hi Vijay,
>
> If you are using HA solution, i think you do not need to specify the
> savepoint. Instead the checkpoint is used.
> The checkpoint is done automatically and periodically based on your
> configuration.When the
> jobmanager/taskmanager fails or the whole cluster crashes, it could always
> recover from the latest
> checkpoint. Does this meed your requirement?
>
> Best,
> Yang
>
> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>
> Vijay,
>
> That is my understanding as well: the HA solution only solves the problem
> up to the point all job managers fail/restart at the same time. That's
> where my original concern was.
>
> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
> per cluster--as long as they are all deployed to separate GKE nodes--would
> provide a very high uptime/low failure rate, at least on paper. It's a
> promising enough option that we're going to run in HA for a month or two
> and monitor results before we put in any extra work to customize the
> savepoint start-up behavior.
>
> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
> I don't think HA will help to recover from cluster crash, for that we
> should take periodic savepoint right? Please correct me in case i am wrong
>
> Regards
> Bhaskar
>
> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
> wrote:
>
> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very edgy use cases. i think it's definitely worth
> reviewing the SLAs with our site reliability engineers to see how likely it
> would be to completely lose all job managers under an HA configuration.
> that small a risk might be acceptable/preferable to a one-off solution.
>
> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
> spotted a thread somewhere between Till and someone (perhaps you) about
> that. feel free to DM me.
>
> thanks again to everyone!
>
> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:
>
> Hi, Aleksandar
>
> Savepoint option in standalone job cluster is optional. If you want to
> always recover
> from the latest checkpoint, just as Aleksandar and Yun Tang said you could
> use the
> high-availability configuration. Make sure the cluster-id is not changed,
> i think the job
> could recover both at exceptionally crash and restart by expectation.
>
> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also have
> an zookeeper-less high-availability implementation[1].
> Maybe we could have some discussion and contribute this useful feature to
> the community.
>
> [1].
> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
> <https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit>
>
> Best,
> Yang
>
> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
> 上午4:11写道:
>
> Would you guys (Flink devs) be interested in our solution for
> zookeeper-less HA? I could ask the managers how they feel about
> open-sourcing the improvement.
>
> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>
> As Aleksandar said, k8s with HA configuration could solve your problem.
> There already have some discussion about how to implement such HA in k8s if
> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
> Currently, you might only have to choose zookeeper as high-availability
> service.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11105
> <https://issues.apache.org/jira/browse/FLINK-11105>
> [2] https://issues.apache.org/jira/browse/FLINK-12884
> <https://issues.apache.org/jira/browse/FLINK-12884>
>
> Best
> Yun Tang
> ------------------------------
> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
> *Sent:* Thursday, September 26, 2019 1:57
> *To:* Sean Hester <se...@bettercloud.com>
> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
> user <us...@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Can’t you simply use JobManager in HA mode? It would pick up where it left
> off if you don’t provide a Savepoint.
>
> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
> wrote:
>
> thanks for all replies! i'll definitely take a look at the Flink k8s
> Operator project.
>
> i'll try to restate the issue to clarify. this issue is specific to
> starting a job from a savepoint in job-cluster mode. in these cases the Job
> Manager container is configured to run a single Flink job at start-up. the
> savepoint needs to be provided as an argument to the entrypoint. the Flink
> documentation for this approach is here:
>
>
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
> <https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint>
>
> the issue is that taking this approach means that the job will *always* start
> from the savepoint provided as the start argument in the Kubernetes YAML.
> this includes unplanned restarts of the job manager, but we'd really prefer
> any *unplanned* restarts resume for the most recent checkpoint instead of
> restarting from the configured savepoint. so in a sense we want the
> savepoint argument to be transient, only being used during the initial
> deployment, but this runs counter to the design of Kubernetes which always
> wants to restore a deployment to the "goal state" as defined in the YAML.
>
> i hope this helps. if you want more details please let me know, and thanks
> again for your time.
>
>
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>
> I think I overlooked it. Good point. I am using Redis to save the path to
> my savepoint, I might be able to set a TTL to avoid such issue.
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>
> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
>
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator
> <https://github.com/lyft/flinkk8soperator> which already
> has a pretty fancy state machine for rolling upgrades. I'd love to be
> involved as this is an issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
> wrote:
>
> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> when deploying Flink jobs to start from savepoints using the job-cluster
> mode in Kubernetes.
>
> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> all long-running streaming jobs, all essentially acting as microservices.
> we're using Helm charts to configure all of our deployments.
>
> we have a number of use cases where we want to restart jobs from a
> savepoint to replay recent events, i.e. when we've enhanced the job logic
> or fixed a bug. but after the deployment we want to have the job resume
> it's "long-running" behavior, where any unplanned restarts resume from the
> latest checkpoint.
>
> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> deployment includes the savepoint argument in the configuration. if the Job
> Manager container(s) have an unplanned restart, when they come back up they
> will start from the savepoint instead of resuming from the latest
> checkpoint. everything is working as configured, but that's not exactly
> what we want. we want the savepoint argument to be transient somehow (only
> used during the initial deployment), but Kubernetes doesn't really support
> the concept of transient configuration.
>
> i can see a couple of potential solutions that either involve custom code
> in the jobs or custom logic in the container (i.e. a custom entrypoint
> script that records that the configured savepoint has already been used in
> a file on a persistent volume or GCS, and potentially when/why/by which
> deployment). but these seem like unexpected and hacky solutions. before we
> head down that road i wanted to ask:
>
>    - is this is already a solved problem that i've missed?
>    - is this issue already on the community's radar?
>
> thanks in advance!
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com/>
> <http://www.bettercloud.com/>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com/>
> <http://www.bettercloud.com/>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com>
> <http://www.bettercloud.com>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com>
> <http://www.bettercloud.com>
> *Introducing the BetterCloud Integration Center *
> Automate actions across every app and own SaaSOps
> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yun Tang <my...@live.com>.
Just a minor supplement @Hao Sun<ma...@zendesk.com>, if you decided to drop a operator, don't forget to add --allowNonRestoredState (short: -n) option [1]


[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang

________________________________
From: Vijay Bhaskar <bh...@gmail.com>
Sent: Thursday, October 10, 2019 19:24
To: Yang Wang <da...@gmail.com>
Cc: Sean Hester <se...@bettercloud.com>; Aleksandar Mastilovic <am...@sightmachine.com>; Yun Tang <my...@live.com>; Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <us...@flink.apache.org>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com>> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar <bh...@gmail.com>> 于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
     a) In that case, at least one job manager out of HA group should be up and running right? or
     b) All the job managers fails, then also this works? In that case please let me know the procedure/share the documentation?
         How to start from previous check point?
         What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com>> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <se...@bettercloud.com>> 于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to the point all job managers fail/restart at the same time. That's where my original concern was.

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per cluster--as long as they are all deployed to separate GKE nodes--would provide a very high uptime/low failure rate, at least on paper. It's a promising enough option that we're going to run in HA for a month or two and monitor results before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>> wrote:
I don't think HA will help to recover from cluster crash, for that we should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that there are some disaster recovery and data center migration use cases where the continuity of the job managers is difficult to preserve. but those are admittedly very edgy use cases. i think it's definitely worth reviewing the SLAs with our site reliability engineers to see how likely it would be to completely lose all job managers under an HA configuration. that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i spotted a thread somewhere between Till and someone (perhaps you) about that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com>> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always recover
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use the
high-availability configuration. Make sure the cluster-id is not changed, i think the job
could recover both at exceptionally crash and restart by expectation.

@Aleksandar Mastilovic<ma...@sightmachine.com>, we are also have an zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to the community.

[1]. https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit

Best,
Yang

Aleksandar Mastilovic <am...@sightmachine.com>> 于2019年9月26日周四 上午4:11写道:
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com>> wrote:

As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.

[1] https://issues.apache.org/jira/browse/FLINK-11105
[2] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang
________________________________
From: Aleksandar Mastilovic <am...@sightmachine.com>>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <se...@bettercloud.com>>
Cc: Hao Sun <ha...@zendesk.com>>; Yuval Itzchakov <yu...@gmail.com>>; user <us...@flink.apache.org>>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:

https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com>> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com>> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:

  *   is this is already a solved problem that i've missed?
  *   is this issue already on the community's radar?

thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com/>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com/>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



--
Best Regards,
Yuval Itzchakov.


--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com/>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com/>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com>
Introducing the BetterCloud Integration Center
Automate actions across every app and own SaaSOps<https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang <da...@gmail.com> wrote:

> @ Hao Sun,
> I have made a confirmation that even we change parallelism and/or modify
> operators, add new operators,
> the flink cluster could also recover from latest checkpoint.
>
> @ Vijay
> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
> jobmanagers
> and taskmanagers are alive), it could recover from the latest checkpoint.
> b) All jobmanagers and taskmanagers fails, it could still recover from the
> latest checkpoint if the cluster-id
> is not changed.
>
> When we enable the HA, The meta of jobgraph and checkpoint is saved on
> zookeeper and the real files are save
> on high-availability storage(HDFS). So when the flink application is
> submitted again with same cluster-id, it could
> recover jobs and checkpoint from zookeeper. I think it has been supported
> for a long time. Maybe you could have a
> try with flink-1.8 or 1.9.
>
> Best,
> Yang
>
>
> Vijay Bhaskar <bh...@gmail.com> 于2019年10月10日周四 下午2:26写道:
>
>> Thanks Yang and Sean. I have couple of questions:
>>
>> 1) Suppose the scenario of , bringing back entire cluster,
>>      a) In that case, at least one job manager out of HA group should be
>> up and running right? or
>>      b) All the job managers fails, then also this works? In that case
>> please let me know the procedure/share the documentation?
>>          How to start from previous check point?
>>          What Flink version onwards this feature is stable?
>>
>> Regards
>> Bhaskar
>>
>>
>> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> If you are using HA solution, i think you do not need to specify the
>>> savepoint. Instead the checkpoint is used.
>>> The checkpoint is done automatically and periodically based on your
>>> configuration.When the
>>> jobmanager/taskmanager fails or the whole cluster crashes, it could
>>> always recover from the latest
>>> checkpoint. Does this meed your requirement?
>>>
>>> Best,
>>> Yang
>>>
>>> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>>>
>>>> Vijay,
>>>>
>>>> That is my understanding as well: the HA solution only solves the
>>>> problem up to the point all job managers fail/restart at the same time.
>>>> That's where my original concern was.
>>>>
>>>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job
>>>> Managers per cluster--as long as they are all deployed to separate GKE
>>>> nodes--would provide a very high uptime/low failure rate, at least on
>>>> paper. It's a promising enough option that we're going to run in HA for a
>>>> month or two and monitor results before we put in any extra work to
>>>> customize the savepoint start-up behavior.
>>>>
>>>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
>>>> wrote:
>>>>
>>>>> I don't think HA will help to recover from cluster crash, for that we
>>>>> should take periodic savepoint right? Please correct me in case i am wrong
>>>>>
>>>>> Regards
>>>>> Bhaskar
>>>>>
>>>>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <
>>>>> bhaskar.ebay77@gmail.com> wrote:
>>>>>
>>>>>> Suppose my cluster got crashed and need to bring up the entire
>>>>>> cluster back? Does HA still helps to run the cluster from latest save
>>>>>> point?
>>>>>>
>>>>>> Regards
>>>>>> Bhaskar
>>>>>>
>>>>>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <
>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>
>>>>>>> thanks to everyone for all the replies.
>>>>>>>
>>>>>>> i think the original concern here with "just" relying on the HA
>>>>>>> option is that there are some disaster recovery and data center migration
>>>>>>> use cases where the continuity of the job managers is difficult to
>>>>>>> preserve. but those are admittedly very edgy use cases. i think it's
>>>>>>> definitely worth reviewing the SLAs with our site reliability engineers to
>>>>>>> see how likely it would be to completely lose all job managers under an HA
>>>>>>> configuration. that small a risk might be acceptable/preferable to a
>>>>>>> one-off solution.
>>>>>>>
>>>>>>> @Aleksander, would love to learn more about Zookeeper-less HA. i
>>>>>>> think i spotted a thread somewhere between Till and someone (perhaps you)
>>>>>>> about that. feel free to DM me.
>>>>>>>
>>>>>>> thanks again to everyone!
>>>>>>>
>>>>>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, Aleksandar
>>>>>>>>
>>>>>>>> Savepoint option in standalone job cluster is optional. If you want
>>>>>>>> to always recover
>>>>>>>> from the latest checkpoint, just as Aleksandar and Yun Tang said
>>>>>>>> you could use the
>>>>>>>> high-availability configuration. Make sure the cluster-id is not
>>>>>>>> changed, i think the job
>>>>>>>> could recover both at exceptionally crash and restart by
>>>>>>>> expectation.
>>>>>>>>
>>>>>>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also
>>>>>>>> have an zookeeper-less high-availability implementation[1].
>>>>>>>> Maybe we could have some discussion and contribute this useful
>>>>>>>> feature to the community.
>>>>>>>>
>>>>>>>> [1].
>>>>>>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Yang
>>>>>>>>
>>>>>>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>>>>>>> 上午4:11写道:
>>>>>>>>
>>>>>>>>> Would you guys (Flink devs) be interested in our solution for
>>>>>>>>> zookeeper-less HA? I could ask the managers how they feel about
>>>>>>>>> open-sourcing the improvement.
>>>>>>>>>
>>>>>>>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>>>>>>>
>>>>>>>>> As Aleksandar said, k8s with HA configuration could solve your
>>>>>>>>> problem. There already have some discussion about how to implement such HA
>>>>>>>>> in k8s if we don't have a zookeeper service: FLINK-11105 [1] and
>>>>>>>>> FLINK-12884 [2]. Currently, you might only have to choose zookeeper as
>>>>>>>>> high-availability service.
>>>>>>>>>
>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>> Yun Tang
>>>>>>>>> ------------------------------
>>>>>>>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>>>>>>>> *Sent:* Thursday, September 26, 2019 1:57
>>>>>>>>> *To:* Sean Hester <se...@bettercloud.com>
>>>>>>>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <
>>>>>>>>> yuvalos@gmail.com>; user <us...@flink.apache.org>
>>>>>>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On
>>>>>>>>> Kubernetes
>>>>>>>>>
>>>>>>>>> Can’t you simply use JobManager in HA mode? It would pick up where
>>>>>>>>> it left off if you don’t provide a Savepoint.
>>>>>>>>>
>>>>>>>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <
>>>>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>>>>
>>>>>>>>> thanks for all replies! i'll definitely take a look at the Flink
>>>>>>>>> k8s Operator project.
>>>>>>>>>
>>>>>>>>> i'll try to restate the issue to clarify. this issue is specific
>>>>>>>>> to starting a job from a savepoint in job-cluster mode. in these cases the
>>>>>>>>> Job Manager container is configured to run a single Flink job at start-up.
>>>>>>>>> the savepoint needs to be provided as an argument to the entrypoint. the
>>>>>>>>> Flink documentation for this approach is here:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>>>>>>>
>>>>>>>>> the issue is that taking this approach means that the job will
>>>>>>>>> *always* start from the savepoint provided as the start argument
>>>>>>>>> in the Kubernetes YAML. this includes unplanned restarts of the job
>>>>>>>>> manager, but we'd really prefer any *unplanned* restarts resume
>>>>>>>>> for the most recent checkpoint instead of restarting from the configured
>>>>>>>>> savepoint. so in a sense we want the savepoint argument to be transient,
>>>>>>>>> only being used during the initial deployment, but this runs counter to the
>>>>>>>>> design of Kubernetes which always wants to restore a deployment to the
>>>>>>>>> "goal state" as defined in the YAML.
>>>>>>>>>
>>>>>>>>> i hope this helps. if you want more details please let me know,
>>>>>>>>> and thanks again for your time.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>>>>>>>
>>>>>>>>> I think I overlooked it. Good point. I am using Redis to save the
>>>>>>>>> path to my savepoint, I might be able to set a TTL to avoid such issue.
>>>>>>>>>
>>>>>>>>> Hao Sun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Hao,
>>>>>>>>>
>>>>>>>>> I think he's exactly talking about the usecase where the JM/TM
>>>>>>>>> restart and they come back up from the latest savepoint which might be
>>>>>>>>> stale by that time.
>>>>>>>>>
>>>>>>>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>>>>>>>
>>>>>>>>> We always make a savepoint before we shutdown the job-cluster. So
>>>>>>>>> the savepoint is always the latest. When we fix a bug or change the job
>>>>>>>>> graph, it can resume well.
>>>>>>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed
>>>>>>>>> JM/TM, uncaught exception, etc.
>>>>>>>>>
>>>>>>>>> Maybe I do not understand your use case well, I do not see a need
>>>>>>>>> to start from checkpoint after a bug fix.
>>>>>>>>> From what I know, currently you can use checkpoint as a savepoint
>>>>>>>>> as well
>>>>>>>>>
>>>>>>>>> Hao Sun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> AFAIK there's currently nothing implemented to solve this problem,
>>>>>>>>> but working on a possible fix can be implemented on top of
>>>>>>>>> https://github.com/lyft/flinkk8soperator which already has a
>>>>>>>>> pretty fancy state machine for rolling upgrades. I'd love to be involved as
>>>>>>>>> this is an issue I've been thinking about as well.
>>>>>>>>>
>>>>>>>>> Yuval
>>>>>>>>>
>>>>>>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>>>>
>>>>>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>>>>>>> cases when deploying Flink jobs to start from savepoints using the
>>>>>>>>> job-cluster mode in Kubernetes.
>>>>>>>>>
>>>>>>>>> we're running a ~15 different jobs, all in job-cluster mode, using
>>>>>>>>> a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>>>>>>> are all long-running streaming jobs, all essentially acting as
>>>>>>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>>>>>>
>>>>>>>>> we have a number of use cases where we want to restart jobs from a
>>>>>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>>>>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>>>>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>>>>>>> latest checkpoint.
>>>>>>>>>
>>>>>>>>> the issue we run into is that any obvious/standard/idiomatic
>>>>>>>>> Kubernetes deployment includes the savepoint argument in the configuration.
>>>>>>>>> if the Job Manager container(s) have an unplanned restart, when they come
>>>>>>>>> back up they will start from the savepoint instead of resuming from the
>>>>>>>>> latest checkpoint. everything is working as configured, but that's not
>>>>>>>>> exactly what we want. we want the savepoint argument to be transient
>>>>>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't
>>>>>>>>> really support the concept of transient configuration.
>>>>>>>>>
>>>>>>>>> i can see a couple of potential solutions that either involve
>>>>>>>>> custom code in the jobs or custom logic in the container (i.e. a custom
>>>>>>>>> entrypoint script that records that the configured savepoint has already
>>>>>>>>> been used in a file on a persistent volume or GCS, and potentially
>>>>>>>>> when/why/by which deployment). but these seem like unexpected and hacky
>>>>>>>>> solutions. before we head down that road i wanted to ask:
>>>>>>>>>
>>>>>>>>>    - is this is already a solved problem that i've missed?
>>>>>>>>>    - is this issue already on the community's radar?
>>>>>>>>>
>>>>>>>>> thanks in advance!
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>>>>> networking experience”
>>>>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards,
>>>>>>>>> Yuval Itzchakov.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>>>>> networking experience”
>>>>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>>> networking experience”
>>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>> *Introducing the BetterCloud Integration Center *
>>>> Automate actions across every app and own SaaSOps
>>>> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>>>>
>>>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yang Wang <da...@gmail.com>.
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify
operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother
jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the
latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on
zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is
submitted again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported
for a long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar <bh...@gmail.com> 于2019年10月10日周四 下午2:26写道:

> Thanks Yang and Sean. I have couple of questions:
>
> 1) Suppose the scenario of , bringing back entire cluster,
>      a) In that case, at least one job manager out of HA group should be
> up and running right? or
>      b) All the job managers fails, then also this works? In that case
> please let me know the procedure/share the documentation?
>          How to start from previous check point?
>          What Flink version onwards this feature is stable?
>
> Regards
> Bhaskar
>
>
> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> If you are using HA solution, i think you do not need to specify the
>> savepoint. Instead the checkpoint is used.
>> The checkpoint is done automatically and periodically based on your
>> configuration.When the
>> jobmanager/taskmanager fails or the whole cluster crashes, it could
>> always recover from the latest
>> checkpoint. Does this meed your requirement?
>>
>> Best,
>> Yang
>>
>> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>>
>>> Vijay,
>>>
>>> That is my understanding as well: the HA solution only solves the
>>> problem up to the point all job managers fail/restart at the same time.
>>> That's where my original concern was.
>>>
>>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job
>>> Managers per cluster--as long as they are all deployed to separate GKE
>>> nodes--would provide a very high uptime/low failure rate, at least on
>>> paper. It's a promising enough option that we're going to run in HA for a
>>> month or two and monitor results before we put in any extra work to
>>> customize the savepoint start-up behavior.
>>>
>>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
>>> wrote:
>>>
>>>> I don't think HA will help to recover from cluster crash, for that we
>>>> should take periodic savepoint right? Please correct me in case i am wrong
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <
>>>> bhaskar.ebay77@gmail.com> wrote:
>>>>
>>>>> Suppose my cluster got crashed and need to bring up the entire cluster
>>>>> back? Does HA still helps to run the cluster from latest save point?
>>>>>
>>>>> Regards
>>>>> Bhaskar
>>>>>
>>>>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <
>>>>> sean.hester@bettercloud.com> wrote:
>>>>>
>>>>>> thanks to everyone for all the replies.
>>>>>>
>>>>>> i think the original concern here with "just" relying on the HA
>>>>>> option is that there are some disaster recovery and data center migration
>>>>>> use cases where the continuity of the job managers is difficult to
>>>>>> preserve. but those are admittedly very edgy use cases. i think it's
>>>>>> definitely worth reviewing the SLAs with our site reliability engineers to
>>>>>> see how likely it would be to completely lose all job managers under an HA
>>>>>> configuration. that small a risk might be acceptable/preferable to a
>>>>>> one-off solution.
>>>>>>
>>>>>> @Aleksander, would love to learn more about Zookeeper-less HA. i
>>>>>> think i spotted a thread somewhere between Till and someone (perhaps you)
>>>>>> about that. feel free to DM me.
>>>>>>
>>>>>> thanks again to everyone!
>>>>>>
>>>>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, Aleksandar
>>>>>>>
>>>>>>> Savepoint option in standalone job cluster is optional. If you want
>>>>>>> to always recover
>>>>>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>>>>>> could use the
>>>>>>> high-availability configuration. Make sure the cluster-id is not
>>>>>>> changed, i think the job
>>>>>>> could recover both at exceptionally crash and restart by expectation.
>>>>>>>
>>>>>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also
>>>>>>> have an zookeeper-less high-availability implementation[1].
>>>>>>> Maybe we could have some discussion and contribute this useful
>>>>>>> feature to the community.
>>>>>>>
>>>>>>> [1].
>>>>>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>>>>>
>>>>>>> Best,
>>>>>>> Yang
>>>>>>>
>>>>>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>>>>>> 上午4:11写道:
>>>>>>>
>>>>>>>> Would you guys (Flink devs) be interested in our solution for
>>>>>>>> zookeeper-less HA? I could ask the managers how they feel about
>>>>>>>> open-sourcing the improvement.
>>>>>>>>
>>>>>>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>>>>>>
>>>>>>>> As Aleksandar said, k8s with HA configuration could solve your
>>>>>>>> problem. There already have some discussion about how to implement such HA
>>>>>>>> in k8s if we don't have a zookeeper service: FLINK-11105 [1] and
>>>>>>>> FLINK-12884 [2]. Currently, you might only have to choose zookeeper as
>>>>>>>> high-availability service.
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Yun Tang
>>>>>>>> ------------------------------
>>>>>>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>>>>>>> *Sent:* Thursday, September 26, 2019 1:57
>>>>>>>> *To:* Sean Hester <se...@bettercloud.com>
>>>>>>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <
>>>>>>>> yuvalos@gmail.com>; user <us...@flink.apache.org>
>>>>>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On
>>>>>>>> Kubernetes
>>>>>>>>
>>>>>>>> Can’t you simply use JobManager in HA mode? It would pick up where
>>>>>>>> it left off if you don’t provide a Savepoint.
>>>>>>>>
>>>>>>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <
>>>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>>>
>>>>>>>> thanks for all replies! i'll definitely take a look at the Flink
>>>>>>>> k8s Operator project.
>>>>>>>>
>>>>>>>> i'll try to restate the issue to clarify. this issue is specific to
>>>>>>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>>>>>>> Manager container is configured to run a single Flink job at start-up. the
>>>>>>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>>>>>>> documentation for this approach is here:
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>>>>>>
>>>>>>>> the issue is that taking this approach means that the job will
>>>>>>>> *always* start from the savepoint provided as the start argument
>>>>>>>> in the Kubernetes YAML. this includes unplanned restarts of the job
>>>>>>>> manager, but we'd really prefer any *unplanned* restarts resume
>>>>>>>> for the most recent checkpoint instead of restarting from the configured
>>>>>>>> savepoint. so in a sense we want the savepoint argument to be transient,
>>>>>>>> only being used during the initial deployment, but this runs counter to the
>>>>>>>> design of Kubernetes which always wants to restore a deployment to the
>>>>>>>> "goal state" as defined in the YAML.
>>>>>>>>
>>>>>>>> i hope this helps. if you want more details please let me know, and
>>>>>>>> thanks again for your time.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>>>>>>
>>>>>>>> I think I overlooked it. Good point. I am using Redis to save the
>>>>>>>> path to my savepoint, I might be able to set a TTL to avoid such issue.
>>>>>>>>
>>>>>>>> Hao Sun
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Hao,
>>>>>>>>
>>>>>>>> I think he's exactly talking about the usecase where the JM/TM
>>>>>>>> restart and they come back up from the latest savepoint which might be
>>>>>>>> stale by that time.
>>>>>>>>
>>>>>>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>>>>>>
>>>>>>>> We always make a savepoint before we shutdown the job-cluster. So
>>>>>>>> the savepoint is always the latest. When we fix a bug or change the job
>>>>>>>> graph, it can resume well.
>>>>>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed
>>>>>>>> JM/TM, uncaught exception, etc.
>>>>>>>>
>>>>>>>> Maybe I do not understand your use case well, I do not see a need
>>>>>>>> to start from checkpoint after a bug fix.
>>>>>>>> From what I know, currently you can use checkpoint as a savepoint
>>>>>>>> as well
>>>>>>>>
>>>>>>>> Hao Sun
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> AFAIK there's currently nothing implemented to solve this problem,
>>>>>>>> but working on a possible fix can be implemented on top of
>>>>>>>> https://github.com/lyft/flinkk8soperator which already has a
>>>>>>>> pretty fancy state machine for rolling upgrades. I'd love to be involved as
>>>>>>>> this is an issue I've been thinking about as well.
>>>>>>>>
>>>>>>>> Yuval
>>>>>>>>
>>>>>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>>>
>>>>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>>>>>> cases when deploying Flink jobs to start from savepoints using the
>>>>>>>> job-cluster mode in Kubernetes.
>>>>>>>>
>>>>>>>> we're running a ~15 different jobs, all in job-cluster mode, using
>>>>>>>> a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>>>>>> are all long-running streaming jobs, all essentially acting as
>>>>>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>>>>>
>>>>>>>> we have a number of use cases where we want to restart jobs from a
>>>>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>>>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>>>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>>>>>> latest checkpoint.
>>>>>>>>
>>>>>>>> the issue we run into is that any obvious/standard/idiomatic
>>>>>>>> Kubernetes deployment includes the savepoint argument in the configuration.
>>>>>>>> if the Job Manager container(s) have an unplanned restart, when they come
>>>>>>>> back up they will start from the savepoint instead of resuming from the
>>>>>>>> latest checkpoint. everything is working as configured, but that's not
>>>>>>>> exactly what we want. we want the savepoint argument to be transient
>>>>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't
>>>>>>>> really support the concept of transient configuration.
>>>>>>>>
>>>>>>>> i can see a couple of potential solutions that either involve
>>>>>>>> custom code in the jobs or custom logic in the container (i.e. a custom
>>>>>>>> entrypoint script that records that the configured savepoint has already
>>>>>>>> been used in a file on a persistent volume or GCS, and potentially
>>>>>>>> when/why/by which deployment). but these seem like unexpected and hacky
>>>>>>>> solutions. before we head down that road i wanted to ask:
>>>>>>>>
>>>>>>>>    - is this is already a solved problem that i've missed?
>>>>>>>>    - is this issue already on the community's radar?
>>>>>>>>
>>>>>>>> thanks in advance!
>>>>>>>>
>>>>>>>> --
>>>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>>>> networking experience”
>>>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards,
>>>>>>>> Yuval Itzchakov.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>>>> networking experience”
>>>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>> networking experience”
>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>
>>>>>>
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>> *Introducing the BetterCloud Integration Center *
>>> Automate actions across every app and own SaaSOps
>>> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>>>
>>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
     a) In that case, at least one job manager out of HA group should be up
and running right? or
     b) All the job managers fails, then also this works? In that case
please let me know the procedure/share the documentation?
         How to start from previous check point?
         What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang <da...@gmail.com> wrote:

> Hi Vijay,
>
> If you are using HA solution, i think you do not need to specify the
> savepoint. Instead the checkpoint is used.
> The checkpoint is done automatically and periodically based on your
> configuration.When the
> jobmanager/taskmanager fails or the whole cluster crashes, it could always
> recover from the latest
> checkpoint. Does this meed your requirement?
>
> Best,
> Yang
>
> Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:
>
>> Vijay,
>>
>> That is my understanding as well: the HA solution only solves the problem
>> up to the point all job managers fail/restart at the same time. That's
>> where my original concern was.
>>
>> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
>> per cluster--as long as they are all deployed to separate GKE nodes--would
>> provide a very high uptime/low failure rate, at least on paper. It's a
>> promising enough option that we're going to run in HA for a month or two
>> and monitor results before we put in any extra work to customize the
>> savepoint start-up behavior.
>>
>> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
>> wrote:
>>
>>> I don't think HA will help to recover from cluster crash, for that we
>>> should take periodic savepoint right? Please correct me in case i am wrong
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
>>> wrote:
>>>
>>>> Suppose my cluster got crashed and need to bring up the entire cluster
>>>> back? Does HA still helps to run the cluster from latest save point?
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <
>>>> sean.hester@bettercloud.com> wrote:
>>>>
>>>>> thanks to everyone for all the replies.
>>>>>
>>>>> i think the original concern here with "just" relying on the HA option
>>>>> is that there are some disaster recovery and data center migration use
>>>>> cases where the continuity of the job managers is difficult to preserve.
>>>>> but those are admittedly very edgy use cases. i think it's definitely worth
>>>>> reviewing the SLAs with our site reliability engineers to see how likely it
>>>>> would be to completely lose all job managers under an HA configuration.
>>>>> that small a risk might be acceptable/preferable to a one-off solution.
>>>>>
>>>>> @Aleksander, would love to learn more about Zookeeper-less HA. i
>>>>> think i spotted a thread somewhere between Till and someone (perhaps you)
>>>>> about that. feel free to DM me.
>>>>>
>>>>> thanks again to everyone!
>>>>>
>>>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, Aleksandar
>>>>>>
>>>>>> Savepoint option in standalone job cluster is optional. If you want
>>>>>> to always recover
>>>>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>>>>> could use the
>>>>>> high-availability configuration. Make sure the cluster-id is not
>>>>>> changed, i think the job
>>>>>> could recover both at exceptionally crash and restart by expectation.
>>>>>>
>>>>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also
>>>>>> have an zookeeper-less high-availability implementation[1].
>>>>>> Maybe we could have some discussion and contribute this useful
>>>>>> feature to the community.
>>>>>>
>>>>>> [1].
>>>>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>>>>> 上午4:11写道:
>>>>>>
>>>>>>> Would you guys (Flink devs) be interested in our solution for
>>>>>>> zookeeper-less HA? I could ask the managers how they feel about
>>>>>>> open-sourcing the improvement.
>>>>>>>
>>>>>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>>>>>
>>>>>>> As Aleksandar said, k8s with HA configuration could solve your
>>>>>>> problem. There already have some discussion about how to implement such HA
>>>>>>> in k8s if we don't have a zookeeper service: FLINK-11105 [1] and
>>>>>>> FLINK-12884 [2]. Currently, you might only have to choose zookeeper as
>>>>>>> high-availability service.
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>>>>>
>>>>>>> Best
>>>>>>> Yun Tang
>>>>>>> ------------------------------
>>>>>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>>>>>> *Sent:* Thursday, September 26, 2019 1:57
>>>>>>> *To:* Sean Hester <se...@bettercloud.com>
>>>>>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <
>>>>>>> yuvalos@gmail.com>; user <us...@flink.apache.org>
>>>>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On
>>>>>>> Kubernetes
>>>>>>>
>>>>>>> Can’t you simply use JobManager in HA mode? It would pick up where
>>>>>>> it left off if you don’t provide a Savepoint.
>>>>>>>
>>>>>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <
>>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>>
>>>>>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>>>>>> Operator project.
>>>>>>>
>>>>>>> i'll try to restate the issue to clarify. this issue is specific to
>>>>>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>>>>>> Manager container is configured to run a single Flink job at start-up. the
>>>>>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>>>>>> documentation for this approach is here:
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>>>>>
>>>>>>> the issue is that taking this approach means that the job will
>>>>>>> *always* start from the savepoint provided as the start argument in
>>>>>>> the Kubernetes YAML. this includes unplanned restarts of the job manager,
>>>>>>> but we'd really prefer any *unplanned* restarts resume for the most
>>>>>>> recent checkpoint instead of restarting from the configured savepoint. so
>>>>>>> in a sense we want the savepoint argument to be transient, only being used
>>>>>>> during the initial deployment, but this runs counter to the design of
>>>>>>> Kubernetes which always wants to restore a deployment to the "goal state"
>>>>>>> as defined in the YAML.
>>>>>>>
>>>>>>> i hope this helps. if you want more details please let me know, and
>>>>>>> thanks again for your time.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>>>>>
>>>>>>> I think I overlooked it. Good point. I am using Redis to save the
>>>>>>> path to my savepoint, I might be able to set a TTL to avoid such issue.
>>>>>>>
>>>>>>> Hao Sun
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Hao,
>>>>>>>
>>>>>>> I think he's exactly talking about the usecase where the JM/TM
>>>>>>> restart and they come back up from the latest savepoint which might be
>>>>>>> stale by that time.
>>>>>>>
>>>>>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>>>>>
>>>>>>> We always make a savepoint before we shutdown the job-cluster. So
>>>>>>> the savepoint is always the latest. When we fix a bug or change the job
>>>>>>> graph, it can resume well.
>>>>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed
>>>>>>> JM/TM, uncaught exception, etc.
>>>>>>>
>>>>>>> Maybe I do not understand your use case well, I do not see a need to
>>>>>>> start from checkpoint after a bug fix.
>>>>>>> From what I know, currently you can use checkpoint as a savepoint as
>>>>>>> well
>>>>>>>
>>>>>>> Hao Sun
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> AFAIK there's currently nothing implemented to solve this problem,
>>>>>>> but working on a possible fix can be implemented on top of
>>>>>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>>>>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>>>>>> is an issue I've been thinking about as well.
>>>>>>>
>>>>>>> Yuval
>>>>>>>
>>>>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>>
>>>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>>>>> cases when deploying Flink jobs to start from savepoints using the
>>>>>>> job-cluster mode in Kubernetes.
>>>>>>>
>>>>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>>>>> are all long-running streaming jobs, all essentially acting as
>>>>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>>>>
>>>>>>> we have a number of use cases where we want to restart jobs from a
>>>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>>>>> latest checkpoint.
>>>>>>>
>>>>>>> the issue we run into is that any obvious/standard/idiomatic
>>>>>>> Kubernetes deployment includes the savepoint argument in the configuration.
>>>>>>> if the Job Manager container(s) have an unplanned restart, when they come
>>>>>>> back up they will start from the savepoint instead of resuming from the
>>>>>>> latest checkpoint. everything is working as configured, but that's not
>>>>>>> exactly what we want. we want the savepoint argument to be transient
>>>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't
>>>>>>> really support the concept of transient configuration.
>>>>>>>
>>>>>>> i can see a couple of potential solutions that either involve custom
>>>>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>>>>> script that records that the configured savepoint has already been used in
>>>>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>>>>> head down that road i wanted to ask:
>>>>>>>
>>>>>>>    - is this is already a solved problem that i've missed?
>>>>>>>    - is this issue already on the community's radar?
>>>>>>>
>>>>>>> thanks in advance!
>>>>>>>
>>>>>>> --
>>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>>> networking experience”
>>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Yuval Itzchakov.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>>> networking experience”
>>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>> networking experience”
>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>
>>>>>
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>> *Introducing the BetterCloud Integration Center *
>> Automate actions across every app and own SaaSOps
>> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>>
>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yang Wang <da...@gmail.com>.
Hi Vijay,

If you are using HA solution, i think you do not need to specify the
savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your
configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always
recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester <se...@bettercloud.com> 于2019年10月1日周二 上午1:47写道:

> Vijay,
>
> That is my understanding as well: the HA solution only solves the problem
> up to the point all job managers fail/restart at the same time. That's
> where my original concern was.
>
> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
> per cluster--as long as they are all deployed to separate GKE nodes--would
> provide a very high uptime/low failure rate, at least on paper. It's a
> promising enough option that we're going to run in HA for a month or two
> and monitor results before we put in any extra work to customize the
> savepoint start-up behavior.
>
> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
>> I don't think HA will help to recover from cluster crash, for that we
>> should take periodic savepoint right? Please correct me in case i am wrong
>>
>> Regards
>> Bhaskar
>>
>> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
>> wrote:
>>
>>> Suppose my cluster got crashed and need to bring up the entire cluster
>>> back? Does HA still helps to run the cluster from latest save point?
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
>>> wrote:
>>>
>>>> thanks to everyone for all the replies.
>>>>
>>>> i think the original concern here with "just" relying on the HA option
>>>> is that there are some disaster recovery and data center migration use
>>>> cases where the continuity of the job managers is difficult to preserve.
>>>> but those are admittedly very edgy use cases. i think it's definitely worth
>>>> reviewing the SLAs with our site reliability engineers to see how likely it
>>>> would be to completely lose all job managers under an HA configuration.
>>>> that small a risk might be acceptable/preferable to a one-off solution.
>>>>
>>>> @Aleksander, would love to learn more about Zookeeper-less HA. i
>>>> think i spotted a thread somewhere between Till and someone (perhaps you)
>>>> about that. feel free to DM me.
>>>>
>>>> thanks again to everyone!
>>>>
>>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, Aleksandar
>>>>>
>>>>> Savepoint option in standalone job cluster is optional. If you want to
>>>>> always recover
>>>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>>>> could use the
>>>>> high-availability configuration. Make sure the cluster-id is not
>>>>> changed, i think the job
>>>>> could recover both at exceptionally crash and restart by expectation.
>>>>>
>>>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also
>>>>> have an zookeeper-less high-availability implementation[1].
>>>>> Maybe we could have some discussion and contribute this useful feature
>>>>> to the community.
>>>>>
>>>>> [1].
>>>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>>>
>>>>> Best,
>>>>> Yang
>>>>>
>>>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>>>> 上午4:11写道:
>>>>>
>>>>>> Would you guys (Flink devs) be interested in our solution for
>>>>>> zookeeper-less HA? I could ask the managers how they feel about
>>>>>> open-sourcing the improvement.
>>>>>>
>>>>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>>>>
>>>>>> As Aleksandar said, k8s with HA configuration could solve your
>>>>>> problem. There already have some discussion about how to implement such HA
>>>>>> in k8s if we don't have a zookeeper service: FLINK-11105 [1] and
>>>>>> FLINK-12884 [2]. Currently, you might only have to choose zookeeper as
>>>>>> high-availability service.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>>>>
>>>>>> Best
>>>>>> Yun Tang
>>>>>> ------------------------------
>>>>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>>>>> *Sent:* Thursday, September 26, 2019 1:57
>>>>>> *To:* Sean Hester <se...@bettercloud.com>
>>>>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>>>>>> user <us...@flink.apache.org>
>>>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On
>>>>>> Kubernetes
>>>>>>
>>>>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>>>>> left off if you don’t provide a Savepoint.
>>>>>>
>>>>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>>>>>> wrote:
>>>>>>
>>>>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>>>>> Operator project.
>>>>>>
>>>>>> i'll try to restate the issue to clarify. this issue is specific to
>>>>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>>>>> Manager container is configured to run a single Flink job at start-up. the
>>>>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>>>>> documentation for this approach is here:
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>>>>
>>>>>> the issue is that taking this approach means that the job will
>>>>>> *always* start from the savepoint provided as the start argument in
>>>>>> the Kubernetes YAML. this includes unplanned restarts of the job manager,
>>>>>> but we'd really prefer any *unplanned* restarts resume for the most
>>>>>> recent checkpoint instead of restarting from the configured savepoint. so
>>>>>> in a sense we want the savepoint argument to be transient, only being used
>>>>>> during the initial deployment, but this runs counter to the design of
>>>>>> Kubernetes which always wants to restore a deployment to the "goal state"
>>>>>> as defined in the YAML.
>>>>>>
>>>>>> i hope this helps. if you want more details please let me know, and
>>>>>> thanks again for your time.
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>>>>
>>>>>> I think I overlooked it. Good point. I am using Redis to save the
>>>>>> path to my savepoint, I might be able to set a TTL to avoid such issue.
>>>>>>
>>>>>> Hao Sun
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Hao,
>>>>>>
>>>>>> I think he's exactly talking about the usecase where the JM/TM
>>>>>> restart and they come back up from the latest savepoint which might be
>>>>>> stale by that time.
>>>>>>
>>>>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>>>>
>>>>>> We always make a savepoint before we shutdown the job-cluster. So the
>>>>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>>>>> it can resume well.
>>>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed
>>>>>> JM/TM, uncaught exception, etc.
>>>>>>
>>>>>> Maybe I do not understand your use case well, I do not see a need to
>>>>>> start from checkpoint after a bug fix.
>>>>>> From what I know, currently you can use checkpoint as a savepoint as
>>>>>> well
>>>>>>
>>>>>> Hao Sun
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> AFAIK there's currently nothing implemented to solve this problem,
>>>>>> but working on a possible fix can be implemented on top of
>>>>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>>>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>>>>> is an issue I've been thinking about as well.
>>>>>>
>>>>>> Yuval
>>>>>>
>>>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>>>> sean.hester@bettercloud.com> wrote:
>>>>>>
>>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>>>> cases when deploying Flink jobs to start from savepoints using the
>>>>>> job-cluster mode in Kubernetes.
>>>>>>
>>>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>>>> are all long-running streaming jobs, all essentially acting as
>>>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>>>
>>>>>> we have a number of use cases where we want to restart jobs from a
>>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>>>> latest checkpoint.
>>>>>>
>>>>>> the issue we run into is that any obvious/standard/idiomatic
>>>>>> Kubernetes deployment includes the savepoint argument in the configuration.
>>>>>> if the Job Manager container(s) have an unplanned restart, when they come
>>>>>> back up they will start from the savepoint instead of resuming from the
>>>>>> latest checkpoint. everything is working as configured, but that's not
>>>>>> exactly what we want. we want the savepoint argument to be transient
>>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't
>>>>>> really support the concept of transient configuration.
>>>>>>
>>>>>> i can see a couple of potential solutions that either involve custom
>>>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>>>> script that records that the configured savepoint has already been used in
>>>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>>>> head down that road i wanted to ask:
>>>>>>
>>>>>>    - is this is already a solved problem that i've missed?
>>>>>>    - is this issue already on the community's radar?
>>>>>>
>>>>>> thanks in advance!
>>>>>>
>>>>>> --
>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>> networking experience”
>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Yuval Itzchakov.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>>> networking experience”
>>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com> <http://www.bettercloud.com>
> *Introducing the BetterCloud Integration Center *
> Automate actions across every app and own SaaSOps
> <https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>
>
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Sean Hester <se...@bettercloud.com>.
Vijay,

That is my understanding as well: the HA solution only solves the problem
up to the point all job managers fail/restart at the same time. That's
where my original concern was.

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
per cluster--as long as they are all deployed to separate GKE nodes--would
provide a very high uptime/low failure rate, at least on paper. It's a
promising enough option that we're going to run in HA for a month or two
and monitor results before we put in any extra work to customize the
savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar <bh...@gmail.com>
wrote:

> I don't think HA will help to recover from cluster crash, for that we
> should take periodic savepoint right? Please correct me in case i am wrong
>
> Regards
> Bhaskar
>
> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
> wrote:
>
>> Suppose my cluster got crashed and need to bring up the entire cluster
>> back? Does HA still helps to run the cluster from latest save point?
>>
>> Regards
>> Bhaskar
>>
>> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
>> wrote:
>>
>>> thanks to everyone for all the replies.
>>>
>>> i think the original concern here with "just" relying on the HA option
>>> is that there are some disaster recovery and data center migration use
>>> cases where the continuity of the job managers is difficult to preserve.
>>> but those are admittedly very edgy use cases. i think it's definitely worth
>>> reviewing the SLAs with our site reliability engineers to see how likely it
>>> would be to completely lose all job managers under an HA configuration.
>>> that small a risk might be acceptable/preferable to a one-off solution.
>>>
>>> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
>>> spotted a thread somewhere between Till and someone (perhaps you) about
>>> that. feel free to DM me.
>>>
>>> thanks again to everyone!
>>>
>>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:
>>>
>>>> Hi, Aleksandar
>>>>
>>>> Savepoint option in standalone job cluster is optional. If you want to
>>>> always recover
>>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>>> could use the
>>>> high-availability configuration. Make sure the cluster-id is not
>>>> changed, i think the job
>>>> could recover both at exceptionally crash and restart by expectation.
>>>>
>>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also
>>>> have an zookeeper-less high-availability implementation[1].
>>>> Maybe we could have some discussion and contribute this useful feature
>>>> to the community.
>>>>
>>>> [1].
>>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>>> 上午4:11写道:
>>>>
>>>>> Would you guys (Flink devs) be interested in our solution for
>>>>> zookeeper-less HA? I could ask the managers how they feel about
>>>>> open-sourcing the improvement.
>>>>>
>>>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>>>
>>>>> As Aleksandar said, k8s with HA configuration could solve your
>>>>> problem. There already have some discussion about how to implement such HA
>>>>> in k8s if we don't have a zookeeper service: FLINK-11105 [1] and
>>>>> FLINK-12884 [2]. Currently, you might only have to choose zookeeper as
>>>>> high-availability service.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>>>
>>>>> Best
>>>>> Yun Tang
>>>>> ------------------------------
>>>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>>>> *Sent:* Thursday, September 26, 2019 1:57
>>>>> *To:* Sean Hester <se...@bettercloud.com>
>>>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>>>>> user <us...@flink.apache.org>
>>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On
>>>>> Kubernetes
>>>>>
>>>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>>>> left off if you don’t provide a Savepoint.
>>>>>
>>>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>>>>> wrote:
>>>>>
>>>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>>>> Operator project.
>>>>>
>>>>> i'll try to restate the issue to clarify. this issue is specific to
>>>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>>>> Manager container is configured to run a single Flink job at start-up. the
>>>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>>>> documentation for this approach is here:
>>>>>
>>>>>
>>>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>>>
>>>>> the issue is that taking this approach means that the job will
>>>>> *always* start from the savepoint provided as the start argument in
>>>>> the Kubernetes YAML. this includes unplanned restarts of the job manager,
>>>>> but we'd really prefer any *unplanned* restarts resume for the most
>>>>> recent checkpoint instead of restarting from the configured savepoint. so
>>>>> in a sense we want the savepoint argument to be transient, only being used
>>>>> during the initial deployment, but this runs counter to the design of
>>>>> Kubernetes which always wants to restore a deployment to the "goal state"
>>>>> as defined in the YAML.
>>>>>
>>>>> i hope this helps. if you want more details please let me know, and
>>>>> thanks again for your time.
>>>>>
>>>>>
>>>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>>>
>>>>> I think I overlooked it. Good point. I am using Redis to save the path
>>>>> to my savepoint, I might be able to set a TTL to avoid such issue.
>>>>>
>>>>> Hao Sun
>>>>>
>>>>>
>>>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Hao,
>>>>>
>>>>> I think he's exactly talking about the usecase where the JM/TM restart
>>>>> and they come back up from the latest savepoint which might be stale by
>>>>> that time.
>>>>>
>>>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>>>
>>>>> We always make a savepoint before we shutdown the job-cluster. So the
>>>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>>>> it can resume well.
>>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>>>> uncaught exception, etc.
>>>>>
>>>>> Maybe I do not understand your use case well, I do not see a need to
>>>>> start from checkpoint after a bug fix.
>>>>> From what I know, currently you can use checkpoint as a savepoint as
>>>>> well
>>>>>
>>>>> Hao Sun
>>>>>
>>>>>
>>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> AFAIK there's currently nothing implemented to solve this problem, but
>>>>> working on a possible fix can be implemented on top of
>>>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>>>> is an issue I've been thinking about as well.
>>>>>
>>>>> Yuval
>>>>>
>>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>>> sean.hester@bettercloud.com> wrote:
>>>>>
>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>>> cases when deploying Flink jobs to start from savepoints using the
>>>>> job-cluster mode in Kubernetes.
>>>>>
>>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>>> are all long-running streaming jobs, all essentially acting as
>>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>>
>>>>> we have a number of use cases where we want to restart jobs from a
>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>>> latest checkpoint.
>>>>>
>>>>> the issue we run into is that any obvious/standard/idiomatic
>>>>> Kubernetes deployment includes the savepoint argument in the configuration.
>>>>> if the Job Manager container(s) have an unplanned restart, when they come
>>>>> back up they will start from the savepoint instead of resuming from the
>>>>> latest checkpoint. everything is working as configured, but that's not
>>>>> exactly what we want. we want the savepoint argument to be transient
>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't
>>>>> really support the concept of transient configuration.
>>>>>
>>>>> i can see a couple of potential solutions that either involve custom
>>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>>> script that records that the configured savepoint has already been used in
>>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>>> head down that road i wanted to ask:
>>>>>
>>>>>    - is this is already a solved problem that i've missed?
>>>>>    - is this issue already on the community's radar?
>>>>>
>>>>> thanks in advance!
>>>>>
>>>>> --
>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>> networking experience”
>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>> networking experience”
>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>
>>>

-- 
*Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com> <http://www.bettercloud.com>
*Introducing the BetterCloud Integration Center *
Automate actions across every app and own SaaSOps
<https://www.bettercloud.com/integrations-webinar/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-integration-center>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
I don't think HA will help to recover from cluster crash, for that we
should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar <bh...@gmail.com>
wrote:

> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
> wrote:
>
>> thanks to everyone for all the replies.
>>
>> i think the original concern here with "just" relying on the HA option is
>> that there are some disaster recovery and data center migration use cases
>> where the continuity of the job managers is difficult to preserve. but
>> those are admittedly very edgy use cases. i think it's definitely worth
>> reviewing the SLAs with our site reliability engineers to see how likely it
>> would be to completely lose all job managers under an HA configuration.
>> that small a risk might be acceptable/preferable to a one-off solution.
>>
>> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
>> spotted a thread somewhere between Till and someone (perhaps you) about
>> that. feel free to DM me.
>>
>> thanks again to everyone!
>>
>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:
>>
>>> Hi, Aleksandar
>>>
>>> Savepoint option in standalone job cluster is optional. If you want to
>>> always recover
>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>> could use the
>>> high-availability configuration. Make sure the cluster-id is not
>>> changed, i think the job
>>> could recover both at exceptionally crash and restart by expectation.
>>>
>>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also have
>>> an zookeeper-less high-availability implementation[1].
>>> Maybe we could have some discussion and contribute this useful feature
>>> to the community.
>>>
>>> [1].
>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>
>>> Best,
>>> Yang
>>>
>>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>>> 上午4:11写道:
>>>
>>>> Would you guys (Flink devs) be interested in our solution for
>>>> zookeeper-less HA? I could ask the managers how they feel about
>>>> open-sourcing the improvement.
>>>>
>>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>>
>>>> As Aleksandar said, k8s with HA configuration could solve your problem.
>>>> There already have some discussion about how to implement such HA in k8s if
>>>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>>>> Currently, you might only have to choose zookeeper as high-availability
>>>> service.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>>
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>>> *Sent:* Thursday, September 26, 2019 1:57
>>>> *To:* Sean Hester <se...@bettercloud.com>
>>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>>>> user <us...@flink.apache.org>
>>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>>
>>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>>> left off if you don’t provide a Savepoint.
>>>>
>>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>>>> wrote:
>>>>
>>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>>> Operator project.
>>>>
>>>> i'll try to restate the issue to clarify. this issue is specific to
>>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>>> Manager container is configured to run a single Flink job at start-up. the
>>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>>> documentation for this approach is here:
>>>>
>>>>
>>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>>
>>>> the issue is that taking this approach means that the job will *always*
>>>>  start from the savepoint provided as the start argument in the
>>>> Kubernetes YAML. this includes unplanned restarts of the job manager, but
>>>> we'd really prefer any *unplanned* restarts resume for the most recent
>>>> checkpoint instead of restarting from the configured savepoint. so in a
>>>> sense we want the savepoint argument to be transient, only being used
>>>> during the initial deployment, but this runs counter to the design of
>>>> Kubernetes which always wants to restore a deployment to the "goal state"
>>>> as defined in the YAML.
>>>>
>>>> i hope this helps. if you want more details please let me know, and
>>>> thanks again for your time.
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>>
>>>> I think I overlooked it. Good point. I am using Redis to save the path
>>>> to my savepoint, I might be able to set a TTL to avoid such issue.
>>>>
>>>> Hao Sun
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Hao,
>>>>
>>>> I think he's exactly talking about the usecase where the JM/TM restart
>>>> and they come back up from the latest savepoint which might be stale by
>>>> that time.
>>>>
>>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>>
>>>> We always make a savepoint before we shutdown the job-cluster. So the
>>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>>> it can resume well.
>>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>>> uncaught exception, etc.
>>>>
>>>> Maybe I do not understand your use case well, I do not see a need to
>>>> start from checkpoint after a bug fix.
>>>> From what I know, currently you can use checkpoint as a savepoint as
>>>> well
>>>>
>>>> Hao Sun
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>> AFAIK there's currently nothing implemented to solve this problem, but
>>>> working on a possible fix can be implemented on top of
>>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>>> is an issue I've been thinking about as well.
>>>>
>>>> Yuval
>>>>
>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>> sean.hester@bettercloud.com> wrote:
>>>>
>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>> cases when deploying Flink jobs to start from savepoints using the
>>>> job-cluster mode in Kubernetes.
>>>>
>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>> are all long-running streaming jobs, all essentially acting as
>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>
>>>> we have a number of use cases where we want to restart jobs from a
>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>> latest checkpoint.
>>>>
>>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>>> deployment includes the savepoint argument in the configuration. if the Job
>>>> Manager container(s) have an unplanned restart, when they come back up they
>>>> will start from the savepoint instead of resuming from the latest
>>>> checkpoint. everything is working as configured, but that's not exactly
>>>> what we want. we want the savepoint argument to be transient somehow (only
>>>> used during the initial deployment), but Kubernetes doesn't really support
>>>> the concept of transient configuration.
>>>>
>>>> i can see a couple of potential solutions that either involve custom
>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>> script that records that the configured savepoint has already been used in
>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>> head down that road i wanted to ask:
>>>>
>>>>    - is this is already a solved problem that i've missed?
>>>>    - is this issue already on the community's radar?
>>>>
>>>> thanks in advance!
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov.
>>>>
>>>>
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>>
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience”
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Vijay Bhaskar <bh...@gmail.com>.
Suppose my cluster got crashed and need to bring up the entire cluster
back? Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester <se...@bettercloud.com>
wrote:

> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very edgy use cases. i think it's definitely worth
> reviewing the SLAs with our site reliability engineers to see how likely it
> would be to completely lose all job managers under an HA configuration.
> that small a risk might be acceptable/preferable to a one-off solution.
>
> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
> spotted a thread somewhere between Till and someone (perhaps you) about
> that. feel free to DM me.
>
> thanks again to everyone!
>
> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:
>
>> Hi, Aleksandar
>>
>> Savepoint option in standalone job cluster is optional. If you want to
>> always recover
>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>> could use the
>> high-availability configuration. Make sure the cluster-id is not changed,
>> i think the job
>> could recover both at exceptionally crash and restart by expectation.
>>
>> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also have
>> an zookeeper-less high-availability implementation[1].
>> Maybe we could have some discussion and contribute this useful feature to
>> the community.
>>
>> [1].
>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>
>> Best,
>> Yang
>>
>> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
>> 上午4:11写道:
>>
>>> Would you guys (Flink devs) be interested in our solution for
>>> zookeeper-less HA? I could ask the managers how they feel about
>>> open-sourcing the improvement.
>>>
>>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>>
>>> As Aleksandar said, k8s with HA configuration could solve your problem.
>>> There already have some discussion about how to implement such HA in k8s if
>>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>>> Currently, you might only have to choose zookeeper as high-availability
>>> service.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>>> *Sent:* Thursday, September 26, 2019 1:57
>>> *To:* Sean Hester <se...@bettercloud.com>
>>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>>> user <us...@flink.apache.org>
>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>
>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>> left off if you don’t provide a Savepoint.
>>>
>>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>>> wrote:
>>>
>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>> Operator project.
>>>
>>> i'll try to restate the issue to clarify. this issue is specific to
>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>> Manager container is configured to run a single Flink job at start-up. the
>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>> documentation for this approach is here:
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>
>>> the issue is that taking this approach means that the job will *always* start
>>> from the savepoint provided as the start argument in the Kubernetes YAML.
>>> this includes unplanned restarts of the job manager, but we'd really prefer
>>> any *unplanned* restarts resume for the most recent checkpoint instead
>>> of restarting from the configured savepoint. so in a sense we want the
>>> savepoint argument to be transient, only being used during the initial
>>> deployment, but this runs counter to the design of Kubernetes which always
>>> wants to restore a deployment to the "goal state" as defined in the YAML.
>>>
>>> i hope this helps. if you want more details please let me know, and
>>> thanks again for your time.
>>>
>>>
>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>>
>>> I think I overlooked it. Good point. I am using Redis to save the path
>>> to my savepoint, I might be able to set a TTL to avoid such issue.
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>> Hi Hao,
>>>
>>> I think he's exactly talking about the usecase where the JM/TM restart
>>> and they come back up from the latest savepoint which might be stale by
>>> that time.
>>>
>>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>>
>>> We always make a savepoint before we shutdown the job-cluster. So the
>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>> it can resume well.
>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>> uncaught exception, etc.
>>>
>>> Maybe I do not understand your use case well, I do not see a need to
>>> start from checkpoint after a bug fix.
>>> From what I know, currently you can use checkpoint as a savepoint as well
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>> AFAIK there's currently nothing implemented to solve this problem, but
>>> working on a possible fix can be implemented on top of
>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>> is an issue I've been thinking about as well.
>>>
>>> Yuval
>>>
>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
>>> wrote:
>>>
>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>>> when deploying Flink jobs to start from savepoints using the job-cluster
>>> mode in Kubernetes.
>>>
>>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>>> all long-running streaming jobs, all essentially acting as microservices.
>>> we're using Helm charts to configure all of our deployments.
>>>
>>> we have a number of use cases where we want to restart jobs from a
>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>> or fixed a bug. but after the deployment we want to have the job resume
>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>> latest checkpoint.
>>>
>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>> deployment includes the savepoint argument in the configuration. if the Job
>>> Manager container(s) have an unplanned restart, when they come back up they
>>> will start from the savepoint instead of resuming from the latest
>>> checkpoint. everything is working as configured, but that's not exactly
>>> what we want. we want the savepoint argument to be transient somehow (only
>>> used during the initial deployment), but Kubernetes doesn't really support
>>> the concept of transient configuration.
>>>
>>> i can see a couple of potential solutions that either involve custom
>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>> script that records that the configured savepoint has already been used in
>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>> deployment). but these seem like unexpected and hacky solutions. before we
>>> head down that road i wanted to ask:
>>>
>>>    - is this is already a solved problem that i've missed?
>>>    - is this issue already on the community's radar?
>>>
>>> thanks in advance!
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>>
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>
>>>
>>>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com> <http://www.bettercloud.com>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Sean Hester <se...@bettercloud.com>.
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is
that there are some disaster recovery and data center migration use cases
where the continuity of the job managers is difficult to preserve. but
those are admittedly very edgy use cases. i think it's definitely worth
reviewing the SLAs with our site reliability engineers to see how likely it
would be to completely lose all job managers under an HA configuration.
that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i
spotted a thread somewhere between Till and someone (perhaps you) about
that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang <da...@gmail.com> wrote:

> Hi, Aleksandar
>
> Savepoint option in standalone job cluster is optional. If you want to
> always recover
> from the latest checkpoint, just as Aleksandar and Yun Tang said you could
> use the
> high-availability configuration. Make sure the cluster-id is not changed,
> i think the job
> could recover both at exceptionally crash and restart by expectation.
>
> @Aleksandar Mastilovic <am...@sightmachine.com>, we are also have
> an zookeeper-less high-availability implementation[1].
> Maybe we could have some discussion and contribute this useful feature to
> the community.
>
> [1].
> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>
> Best,
> Yang
>
> Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四
> 上午4:11写道:
>
>> Would you guys (Flink devs) be interested in our solution for
>> zookeeper-less HA? I could ask the managers how they feel about
>> open-sourcing the improvement.
>>
>> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>>
>> As Aleksandar said, k8s with HA configuration could solve your problem.
>> There already have some discussion about how to implement such HA in k8s if
>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>> Currently, you might only have to choose zookeeper as high-availability
>> service.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
>> *Sent:* Thursday, September 26, 2019 1:57
>> *To:* Sean Hester <se...@bettercloud.com>
>> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
>> user <us...@flink.apache.org>
>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>
>> Can’t you simply use JobManager in HA mode? It would pick up where it
>> left off if you don’t provide a Savepoint.
>>
>> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
>> wrote:
>>
>> thanks for all replies! i'll definitely take a look at the Flink k8s
>> Operator project.
>>
>> i'll try to restate the issue to clarify. this issue is specific to
>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>> Manager container is configured to run a single Flink job at start-up. the
>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>> documentation for this approach is here:
>>
>>
>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>
>> the issue is that taking this approach means that the job will *always* start
>> from the savepoint provided as the start argument in the Kubernetes YAML.
>> this includes unplanned restarts of the job manager, but we'd really prefer
>> any *unplanned* restarts resume for the most recent checkpoint instead
>> of restarting from the configured savepoint. so in a sense we want the
>> savepoint argument to be transient, only being used during the initial
>> deployment, but this runs counter to the design of Kubernetes which always
>> wants to restore a deployment to the "goal state" as defined in the YAML.
>>
>> i hope this helps. if you want more details please let me know, and
>> thanks again for your time.
>>
>>
>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>>
>> I think I overlooked it. Good point. I am using Redis to save the path to
>> my savepoint, I might be able to set a TTL to avoid such issue.
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>> Hi Hao,
>>
>> I think he's exactly talking about the usecase where the JM/TM restart
>> and they come back up from the latest savepoint which might be stale by
>> that time.
>>
>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>
>> We always make a savepoint before we shutdown the job-cluster. So the
>> savepoint is always the latest. When we fix a bug or change the job graph,
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>> uncaught exception, etc.
>>
>> Maybe I do not understand your use case well, I do not see a need to
>> start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>> AFAIK there's currently nothing implemented to solve this problem, but
>> working on a possible fix can be implemented on top of
>> https://github.com/lyft/flinkk8soperator which already has a pretty
>> fancy state machine for rolling upgrades. I'd love to be involved as this
>> is an issue I've been thinking about as well.
>>
>> Yuval
>>
>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
>> wrote:
>>
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>> when deploying Flink jobs to start from savepoints using the job-cluster
>> mode in Kubernetes.
>>
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>> all long-running streaming jobs, all essentially acting as microservices.
>> we're using Helm charts to configure all of our deployments.
>>
>> we have a number of use cases where we want to restart jobs from a
>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>> or fixed a bug. but after the deployment we want to have the job resume
>> it's "long-running" behavior, where any unplanned restarts resume from the
>> latest checkpoint.
>>
>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>> deployment includes the savepoint argument in the configuration. if the Job
>> Manager container(s) have an unplanned restart, when they come back up they
>> will start from the savepoint instead of resuming from the latest
>> checkpoint. everything is working as configured, but that's not exactly
>> what we want. we want the savepoint argument to be transient somehow (only
>> used during the initial deployment), but Kubernetes doesn't really support
>> the concept of transient configuration.
>>
>> i can see a couple of potential solutions that either involve custom code
>> in the jobs or custom logic in the container (i.e. a custom entrypoint
>> script that records that the configured savepoint has already been used in
>> a file on a persistent volume or GCS, and potentially when/why/by which
>> deployment). but these seem like unexpected and hacky solutions. before we
>> head down that road i wanted to ask:
>>
>>    - is this is already a solved problem that i've missed?
>>    - is this issue already on the community's radar?
>>
>> thanks in advance!
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience”
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>>
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience”
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>>

-- 
*Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com> <http://www.bettercloud.com>
*Altitude 2019 in San Francisco | Sept. 23 - 25*
It’s not just an IT conference, it’s “a complete learning and networking
experience”
<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yang Wang <da...@gmail.com>.
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to
always recover
from the latest checkpoint, just as Aleksandar and Yun Tang said you could
use the
high-availability configuration. Make sure the cluster-id is not changed, i
think the job
could recover both at exceptionally crash and restart by expectation.

@Aleksandar Mastilovic <am...@sightmachine.com>, we are also have an
zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to
the community.

[1].
https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit

Best,
Yang

Aleksandar Mastilovic <am...@sightmachine.com> 于2019年9月26日周四 上午4:11写道:

> Would you guys (Flink devs) be interested in our solution for
> zookeeper-less HA? I could ask the managers how they feel about
> open-sourcing the improvement.
>
> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
>
> As Aleksandar said, k8s with HA configuration could solve your problem.
> There already have some discussion about how to implement such HA in k8s if
> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
> Currently, you might only have to choose zookeeper as high-availability
> service.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11105
> [2] https://issues.apache.org/jira/browse/FLINK-12884
>
> Best
> Yun Tang
> ------------------------------
> *From:* Aleksandar Mastilovic <am...@sightmachine.com>
> *Sent:* Thursday, September 26, 2019 1:57
> *To:* Sean Hester <se...@bettercloud.com>
> *Cc:* Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>;
> user <us...@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Can’t you simply use JobManager in HA mode? It would pick up where it left
> off if you don’t provide a Savepoint.
>
> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>
> wrote:
>
> thanks for all replies! i'll definitely take a look at the Flink k8s
> Operator project.
>
> i'll try to restate the issue to clarify. this issue is specific to
> starting a job from a savepoint in job-cluster mode. in these cases the Job
> Manager container is configured to run a single Flink job at start-up. the
> savepoint needs to be provided as an argument to the entrypoint. the Flink
> documentation for this approach is here:
>
>
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>
> the issue is that taking this approach means that the job will *always* start
> from the savepoint provided as the start argument in the Kubernetes YAML.
> this includes unplanned restarts of the job manager, but we'd really prefer
> any *unplanned* restarts resume for the most recent checkpoint instead of
> restarting from the configured savepoint. so in a sense we want the
> savepoint argument to be transient, only being used during the initial
> deployment, but this runs counter to the design of Kubernetes which always
> wants to restore a deployment to the "goal state" as defined in the YAML.
>
> i hope this helps. if you want more details please let me know, and thanks
> again for your time.
>
>
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:
>
> I think I overlooked it. Good point. I am using Redis to save the path to
> my savepoint, I might be able to set a TTL to avoid such issue.
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>
> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
>
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator which already has a pretty fancy
> state machine for rolling upgrades. I'd love to be involved as this is an
> issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
> wrote:
>
> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> when deploying Flink jobs to start from savepoints using the job-cluster
> mode in Kubernetes.
>
> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> all long-running streaming jobs, all essentially acting as microservices.
> we're using Helm charts to configure all of our deployments.
>
> we have a number of use cases where we want to restart jobs from a
> savepoint to replay recent events, i.e. when we've enhanced the job logic
> or fixed a bug. but after the deployment we want to have the job resume
> it's "long-running" behavior, where any unplanned restarts resume from the
> latest checkpoint.
>
> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> deployment includes the savepoint argument in the configuration. if the Job
> Manager container(s) have an unplanned restart, when they come back up they
> will start from the savepoint instead of resuming from the latest
> checkpoint. everything is working as configured, but that's not exactly
> what we want. we want the savepoint argument to be transient somehow (only
> used during the initial deployment), but Kubernetes doesn't really support
> the concept of transient configuration.
>
> i can see a couple of potential solutions that either involve custom code
> in the jobs or custom logic in the container (i.e. a custom entrypoint
> script that records that the configured savepoint has already been used in
> a file on a persistent volume or GCS, and potentially when/why/by which
> deployment). but these seem like unexpected and hacky solutions. before we
> head down that road i wanted to ask:
>
>    - is this is already a solved problem that i've missed?
>    - is this issue already on the community's radar?
>
> thanks in advance!
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com/> <http://www.bettercloud.com/>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Aleksandar Mastilovic <am...@sightmachine.com>.
Would you guys (Flink devs) be interested in our solution for zookeeper-less HA? I could ask the managers how they feel about open-sourcing the improvement.

> On Sep 25, 2019, at 11:49 AM, Yun Tang <my...@live.com> wrote:
> 
> As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-11105 <https://issues.apache.org/jira/browse/FLINK-11105>
> [2] https://issues.apache.org/jira/browse/FLINK-12884 <https://issues.apache.org/jira/browse/FLINK-12884>
> 
> Best
> Yun Tang
> From: Aleksandar Mastilovic <am...@sightmachine.com>
> Sent: Thursday, September 26, 2019 1:57
> To: Sean Hester <se...@bettercloud.com>
> Cc: Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <us...@flink.apache.org>
> Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
>  
> Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.
> 
>> On Sep 25, 2019, at 6:07 AM, Sean Hester <sean.hester@bettercloud.com <ma...@bettercloud.com>> wrote:
>> 
>> thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.
>> 
>> i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:
>> 
>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint <https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint>
>> 
>> the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.
>> 
>> i hope this helps. if you want more details please let me know, and thanks again for your time.
>> 
>> 
>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <hasun@zendesk.com <ma...@zendesk.com>> wrote:
>> I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.
>> 
>> Hao Sun
>> 
>> 
>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yuvalos@gmail.com <ma...@gmail.com>> wrote:
>> Hi Hao,
>> 
>> I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.
>> 
>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <hasun@zendesk.com <ma...@zendesk.com>> wrote:
>> We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.
>> 
>> Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>> 
>> Hao Sun
>> 
>> 
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yuvalos@gmail.com <ma...@gmail.com>> wrote:
>> AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator <https://github.com/lyft/flinkk8soperator> which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.
>> 
>> Yuval
>> 
>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <sean.hester@bettercloud.com <ma...@bettercloud.com>> wrote:
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.
>> 
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.
>> 
>> we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.
>> 
>> the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.
>> 
>> i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
>> is this is already a solved problem that i've missed?
>> is this issue already on the community's radar?
>> thanks in advance!
>> 
>> -- 
>> Sean Hester | Senior Staff Software Engineer | m. 404-828-0865 <>
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 
>>  <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>> Altitude 2019 in San Francisco | Sept. 23 - 25
>> It’s not just an IT conference, it’s “a complete learning and networking experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>> 
>> 
>> 
>> -- 
>> Best Regards,
>> Yuval Itzchakov.
>> 
>> 
>> -- 
>> Sean Hester | Senior Staff Software Engineer | m. 404-828-0865 <>
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 
>>  <http://www.bettercloud.com/> <http://www.bettercloud.com/>
>> Altitude 2019 in San Francisco | Sept. 23 - 25
>> It’s not just an IT conference, it’s “a complete learning and networking experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by vishalovercome <vi...@moengage.com>.
Thanks for your reply!

What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminations. Under these
scenarios, won't the job try to recover from a savepoint?

I've gone through various tickets reporting stability issues due to
zookeeper that you've mentioned you intend to resolve soon. But until the
zookeeper based HA is stable, should we assume that it will repeatedly
restore from savepoints? I would rather rely on kafka offsets to resume
where it left off rather than savepoints.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Till Rohrmann <tr...@apache.org>.
Flink should try to pick the latest checkpoint and will only use the
savepoint if no newer checkpoint could be found.

Cheers,
Till

On Wed, Dec 16, 2020 at 10:13 PM vishalovercome <vi...@moengage.com> wrote:

> I'm not sure if this addresses the original concern. For instance consider
> this sequence:
>
> 1. Job starts from savepoint
> 2. Job creates a few checkpoints
> 3. Job manager (just one in kubernetes) crashes and restarts with the
> commands specified in the kubernetes manifest which has the savepoint path
>
> Will Zookeeper based HA ensure that this savepoint path will be ignored?
>
> I've asked this and various other questions here -
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Will-job-manager-restarts-lead-to-repeated-savepoint-restoration-tp40188.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by vishalovercome <vi...@moengage.com>.
I'm not sure if this addresses the original concern. For instance consider
this sequence:

1. Job starts from savepoint
2. Job creates a few checkpoints
3. Job manager (just one in kubernetes) crashes and restarts with the
commands specified in the kubernetes manifest which has the savepoint path

Will Zookeeper based HA ensure that this savepoint path will be ignored? 

I've asked this and various other questions here -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Will-job-manager-restarts-lead-to-repeated-savepoint-restoration-tp40188.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yun Tang <my...@live.com>.
As Aleksandar said, k8s with HA configuration could solve your problem. There already have some discussion about how to implement such HA in k8s if we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you might only have to choose zookeeper as high-availability service.

[1] https://issues.apache.org/jira/browse/FLINK-11105
[2] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang
________________________________
From: Aleksandar Mastilovic <am...@sightmachine.com>
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester <se...@bettercloud.com>
Cc: Hao Sun <ha...@zendesk.com>; Yuval Itzchakov <yu...@gmail.com>; user <us...@flink.apache.org>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com>> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.

i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:

https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com>> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com>> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com>> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:

  *   is this is already a solved problem that i've missed?
  *   is this issue already on the community's radar?

thanks in advance!

--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com/>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com/>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



--
Best Regards,
Yuval Itzchakov.


--
Sean Hester | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com/>[https://www.bettercloud.com/monitor/wp-content/uploads/sites/3/2016/12/bettercloud-emaillogo.png]<http://www.bettercloud.com/>
Altitude 2019 in San Francisco | Sept. 23 - 25
It’s not just an IT conference, it’s “a complete learning and networking experience”<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>



Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Aleksandar Mastilovic <am...@sightmachine.com>.
Can’t you simply use JobManager in HA mode? It would pick up where it left off if you don’t provide a Savepoint.

> On Sep 25, 2019, at 6:07 AM, Sean Hester <se...@bettercloud.com> wrote:
> 
> thanks for all replies! i'll definitely take a look at the Flink k8s Operator project.
> 
> i'll try to restate the issue to clarify. this issue is specific to starting a job from a savepoint in job-cluster mode. in these cases the Job Manager container is configured to run a single Flink job at start-up. the savepoint needs to be provided as an argument to the entrypoint. the Flink documentation for this approach is here:
> 
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint <https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint>
> 
> the issue is that taking this approach means that the job will always start from the savepoint provided as the start argument in the Kubernetes YAML. this includes unplanned restarts of the job manager, but we'd really prefer any unplanned restarts resume for the most recent checkpoint instead of restarting from the configured savepoint. so in a sense we want the savepoint argument to be transient, only being used during the initial deployment, but this runs counter to the design of Kubernetes which always wants to restore a deployment to the "goal state" as defined in the YAML.
> 
> i hope this helps. if you want more details please let me know, and thanks again for your time.
> 
> 
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <hasun@zendesk.com <ma...@zendesk.com>> wrote:
> I think I overlooked it. Good point. I am using Redis to save the path to my savepoint, I might be able to set a TTL to avoid such issue.
> 
> Hao Sun
> 
> 
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yuvalos@gmail.com <ma...@gmail.com>> wrote:
> Hi Hao,
> 
> I think he's exactly talking about the usecase where the JM/TM restart and they come back up from the latest savepoint which might be stale by that time.
> 
> On Tue, 24 Sep 2019, 19:24 Hao Sun, <hasun@zendesk.com <ma...@zendesk.com>> wrote:
> We always make a savepoint before we shutdown the job-cluster. So the savepoint is always the latest. When we fix a bug or change the job graph, it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught exception, etc.
> 
> Maybe I do not understand your use case well, I do not see a need to start from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
> 
> Hao Sun
> 
> 
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yuvalos@gmail.com <ma...@gmail.com>> wrote:
> AFAIK there's currently nothing implemented to solve this problem, but working on a possible fix can be implemented on top of https://github.com/lyft/flinkk8soperator <https://github.com/lyft/flinkk8soperator> which already has a pretty fancy state machine for rolling upgrades. I'd love to be involved as this is an issue I've been thinking about as well.
> 
> Yuval
> 
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <sean.hester@bettercloud.com <ma...@bettercloud.com>> wrote:
> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when deploying Flink jobs to start from savepoints using the job-cluster mode in Kubernetes.
> 
> we're running a ~15 different jobs, all in job-cluster mode, using a mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all long-running streaming jobs, all essentially acting as microservices. we're using Helm charts to configure all of our deployments.
> 
> we have a number of use cases where we want to restart jobs from a savepoint to replay recent events, i.e. when we've enhanced the job logic or fixed a bug. but after the deployment we want to have the job resume it's "long-running" behavior, where any unplanned restarts resume from the latest checkpoint.
> 
> the issue we run into is that any obvious/standard/idiomatic Kubernetes deployment includes the savepoint argument in the configuration. if the Job Manager container(s) have an unplanned restart, when they come back up they will start from the savepoint instead of resuming from the latest checkpoint. everything is working as configured, but that's not exactly what we want. we want the savepoint argument to be transient somehow (only used during the initial deployment), but Kubernetes doesn't really support the concept of transient configuration.
> 
> i can see a couple of potential solutions that either involve custom code in the jobs or custom logic in the container (i.e. a custom entrypoint script that records that the configured savepoint has already been used in a file on a persistent volume or GCS, and potentially when/why/by which deployment). but these seem like unexpected and hacky solutions. before we head down that road i wanted to ask:
> is this is already a solved problem that i've missed?
> is this issue already on the community's radar?
> thanks in advance!
> 
> -- 
> Sean Hester | Senior Staff Software Engineer | m. 404-828-0865 <>
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 
>  <http://www.bettercloud.com/> <http://www.bettercloud.com/>
> Altitude 2019 in San Francisco | Sept. 23 - 25
> It’s not just an IT conference, it’s “a complete learning and networking experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
> 
> 
> 
> -- 
> Best Regards,
> Yuval Itzchakov.
> 
> 
> -- 
> Sean Hester | Senior Staff Software Engineer | m. 404-828-0865 <>
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305 
>  <http://www.bettercloud.com/> <http://www.bettercloud.com/>
> Altitude 2019 in San Francisco | Sept. 23 - 25
> It’s not just an IT conference, it’s “a complete learning and networking experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
> 


Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Sean Hester <se...@bettercloud.com>.
thanks for all replies! i'll definitely take a look at the Flink k8s
Operator project.

i'll try to restate the issue to clarify. this issue is specific to
starting a job from a savepoint in job-cluster mode. in these cases the Job
Manager container is configured to run a single Flink job at start-up. the
savepoint needs to be provided as an argument to the entrypoint. the Flink
documentation for this approach is here:

https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

the issue is that taking this approach means that the job will *always*
start from the savepoint provided as the start argument in the Kubernetes
YAML. this includes unplanned restarts of the job manager, but we'd really
prefer any *unplanned* restarts resume for the most recent checkpoint
instead of restarting from the configured savepoint. so in a sense we want
the savepoint argument to be transient, only being used during the initial
deployment, but this runs counter to the design of Kubernetes which always
wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks
again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun <ha...@zendesk.com> wrote:

> I think I overlooked it. Good point. I am using Redis to save the path to
> my savepoint, I might be able to set a TTL to avoid such issue.
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> Hi Hao,
>>
>> I think he's exactly talking about the usecase where the JM/TM restart
>> and they come back up from the latest savepoint which might be stale by
>> that time.
>>
>> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>>
>>> We always make a savepoint before we shutdown the job-cluster. So the
>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>> it can resume well.
>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>> uncaught exception, etc.
>>>
>>> Maybe I do not understand your use case well, I do not see a need to
>>> start from checkpoint after a bug fix.
>>> From what I know, currently you can use checkpoint as a savepoint as well
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>>> AFAIK there's currently nothing implemented to solve this problem, but
>>>> working on a possible fix can be implemented on top of
>>>> https://github.com/lyft/flinkk8soperator which already has a pretty
>>>> fancy state machine for rolling upgrades. I'd love to be involved as this
>>>> is an issue I've been thinking about as well.
>>>>
>>>> Yuval
>>>>
>>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
>>>> sean.hester@bettercloud.com> wrote:
>>>>
>>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>>> cases when deploying Flink jobs to start from savepoints using the
>>>>> job-cluster mode in Kubernetes.
>>>>>
>>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>>> are all long-running streaming jobs, all essentially acting as
>>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>>
>>>>> we have a number of use cases where we want to restart jobs from a
>>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>>> latest checkpoint.
>>>>>
>>>>> the issue we run into is that any obvious/standard/idiomatic
>>>>> Kubernetes deployment includes the savepoint argument in the configuration.
>>>>> if the Job Manager container(s) have an unplanned restart, when they come
>>>>> back up they will start from the savepoint instead of resuming from the
>>>>> latest checkpoint. everything is working as configured, but that's not
>>>>> exactly what we want. we want the savepoint argument to be transient
>>>>> somehow (only used during the initial deployment), but Kubernetes doesn't
>>>>> really support the concept of transient configuration.
>>>>>
>>>>> i can see a couple of potential solutions that either involve custom
>>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>>> script that records that the configured savepoint has already been used in
>>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>>> head down that road i wanted to ask:
>>>>>
>>>>>    - is this is already a solved problem that i've missed?
>>>>>    - is this issue already on the community's radar?
>>>>>
>>>>> thanks in advance!
>>>>>
>>>>> --
>>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>>> It’s not just an IT conference, it’s “a complete learning and
>>>>> networking experience”
>>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov.
>>>>
>>>

-- 
*Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
<http://www.bettercloud.com> <http://www.bettercloud.com>
*Altitude 2019 in San Francisco | Sept. 23 - 25*
It’s not just an IT conference, it’s “a complete learning and networking
experience”
<https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Hao Sun <ha...@zendesk.com>.
I think I overlooked it. Good point. I am using Redis to save the path to
my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov <yu...@gmail.com> wrote:

> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:
>
>> We always make a savepoint before we shutdown the job-cluster. So the
>> savepoint is always the latest. When we fix a bug or change the job graph,
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>> uncaught exception, etc.
>>
>> Maybe I do not understand your use case well, I do not see a need to
>> start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> AFAIK there's currently nothing implemented to solve this problem, but
>>> working on a possible fix can be implemented on top of
>>> https://github.com/lyft/flinkk8soperator
>>> <https://github.com/lyft/flinkk8soperator> which
>>> already has a pretty fancy state machine for rolling upgrades. I'd love to
>>> be involved as this is an issue I've been thinking about as well.
>>>
>>> Yuval
>>>
>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
>>> wrote:
>>>
>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>> cases when deploying Flink jobs to start from savepoints using the
>>>> job-cluster mode in Kubernetes.
>>>>
>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>> are all long-running streaming jobs, all essentially acting as
>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>
>>>> we have a number of use cases where we want to restart jobs from a
>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>> latest checkpoint.
>>>>
>>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>>> deployment includes the savepoint argument in the configuration. if the Job
>>>> Manager container(s) have an unplanned restart, when they come back up they
>>>> will start from the savepoint instead of resuming from the latest
>>>> checkpoint. everything is working as configured, but that's not exactly
>>>> what we want. we want the savepoint argument to be transient somehow (only
>>>> used during the initial deployment), but Kubernetes doesn't really support
>>>> the concept of transient configuration.
>>>>
>>>> i can see a couple of potential solutions that either involve custom
>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>> script that records that the configured savepoint has already been used in
>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>> head down that road i wanted to ask:
>>>>
>>>>    - is this is already a solved problem that i've missed?
>>>>    - is this issue already on the community's radar?
>>>>
>>>> thanks in advance!
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com>
>>>> <http://www.bettercloud.com>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and
they come back up from the latest savepoint which might be stale by that
time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, <ha...@zendesk.com> wrote:

> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
>
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> AFAIK there's currently nothing implemented to solve this problem, but
>> working on a possible fix can be implemented on top of
>> https://github.com/lyft/flinkk8soperator which already has a pretty
>> fancy state machine for rolling upgrades. I'd love to be involved as this
>> is an issue I've been thinking about as well.
>>
>> Yuval
>>
>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
>> wrote:
>>
>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>>> when deploying Flink jobs to start from savepoints using the job-cluster
>>> mode in Kubernetes.
>>>
>>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>>> all long-running streaming jobs, all essentially acting as microservices.
>>> we're using Helm charts to configure all of our deployments.
>>>
>>> we have a number of use cases where we want to restart jobs from a
>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>> or fixed a bug. but after the deployment we want to have the job resume
>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>> latest checkpoint.
>>>
>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>> deployment includes the savepoint argument in the configuration. if the Job
>>> Manager container(s) have an unplanned restart, when they come back up they
>>> will start from the savepoint instead of resuming from the latest
>>> checkpoint. everything is working as configured, but that's not exactly
>>> what we want. we want the savepoint argument to be transient somehow (only
>>> used during the initial deployment), but Kubernetes doesn't really support
>>> the concept of transient configuration.
>>>
>>> i can see a couple of potential solutions that either involve custom
>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>> script that records that the configured savepoint has already been used in
>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>> deployment). but these seem like unexpected and hacky solutions. before we
>>> head down that road i wanted to ask:
>>>
>>>    - is this is already a solved problem that i've missed?
>>>    - is this issue already on the community's radar?
>>>
>>> thanks in advance!
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>> <http://www.bettercloud.com> <http://www.bettercloud.com>
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>
>>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Matt Magsombol <ra...@gmail.com>.
I'm not the original poster, but I'm running into this same issue. What you just described is exactly what I want. I presume you guys are using some variant of this helm https://github.com/docker-flink/examples/tree/master/helm/flink to configure your k8s cluster? I'm also assuming that this cluster is running as a job cluster and not a session cluster right?
If so, how did you guys set up the deployments.yaml file such that it picks up the latest savepoint from a savepoint directory ( and what happens if that savepoint directory is empty? This is for cases when we're starting a new cluster, new job from scratch and there's no need to recover from previous savepoint ).

On 2019/09/24 16:23:52, Hao Sun <ha...@zendesk.com> wrote: 
> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
> 
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
> 
> Hao Sun
> 
> 
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com> wrote:
> 
> > AFAIK there's currently nothing implemented to solve this problem, but
> > working on a possible fix can be implemented on top of
> > https://github.com/lyft/flinkk8soperator
> > <https://github.com/lyft/flinkk8soperator> which already
> > has a pretty fancy state machine for rolling upgrades. I'd love to be
> > involved as this is an issue I've been thinking about as well.
> >
> > Yuval
> >
> > On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
> > wrote:
> >
> >> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> >> when deploying Flink jobs to start from savepoints using the job-cluster
> >> mode in Kubernetes.
> >>
> >> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> >> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> >> all long-running streaming jobs, all essentially acting as microservices.
> >> we're using Helm charts to configure all of our deployments.
> >>
> >> we have a number of use cases where we want to restart jobs from a
> >> savepoint to replay recent events, i.e. when we've enhanced the job logic
> >> or fixed a bug. but after the deployment we want to have the job resume
> >> it's "long-running" behavior, where any unplanned restarts resume from the
> >> latest checkpoint.
> >>
> >> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> >> deployment includes the savepoint argument in the configuration. if the Job
> >> Manager container(s) have an unplanned restart, when they come back up they
> >> will start from the savepoint instead of resuming from the latest
> >> checkpoint. everything is working as configured, but that's not exactly
> >> what we want. we want the savepoint argument to be transient somehow (only
> >> used during the initial deployment), but Kubernetes doesn't really support
> >> the concept of transient configuration.
> >>
> >> i can see a couple of potential solutions that either involve custom code
> >> in the jobs or custom logic in the container (i.e. a custom entrypoint
> >> script that records that the configured savepoint has already been used in
> >> a file on a persistent volume or GCS, and potentially when/why/by which
> >> deployment). but these seem like unexpected and hacky solutions. before we
> >> head down that road i wanted to ask:
> >>
> >>    - is this is already a solved problem that i've missed?
> >>    - is this issue already on the community's radar?
> >>
> >> thanks in advance!
> >>
> >> --
> >> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> >> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> >> <http://www.bettercloud.com>
> >> <http://www.bettercloud.com>
> >> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> >> It’s not just an IT conference, it’s “a complete learning and networking
> >> experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
> >>
> >>
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
> 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Hao Sun <ha...@zendesk.com>.
We always make a savepoint before we shutdown the job-cluster. So the
savepoint is always the latest. When we fix a bug or change the job graph,
it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start
from checkpoint after a bug fix.
From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov <yu...@gmail.com> wrote:

> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator
> <https://github.com/lyft/flinkk8soperator> which already
> has a pretty fancy state machine for rolling upgrades. I'd love to be
> involved as this is an issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
> wrote:
>
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>> when deploying Flink jobs to start from savepoints using the job-cluster
>> mode in Kubernetes.
>>
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>> all long-running streaming jobs, all essentially acting as microservices.
>> we're using Helm charts to configure all of our deployments.
>>
>> we have a number of use cases where we want to restart jobs from a
>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>> or fixed a bug. but after the deployment we want to have the job resume
>> it's "long-running" behavior, where any unplanned restarts resume from the
>> latest checkpoint.
>>
>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>> deployment includes the savepoint argument in the configuration. if the Job
>> Manager container(s) have an unplanned restart, when they come back up they
>> will start from the savepoint instead of resuming from the latest
>> checkpoint. everything is working as configured, but that's not exactly
>> what we want. we want the savepoint argument to be transient somehow (only
>> used during the initial deployment), but Kubernetes doesn't really support
>> the concept of transient configuration.
>>
>> i can see a couple of potential solutions that either involve custom code
>> in the jobs or custom logic in the container (i.e. a custom entrypoint
>> script that records that the configured savepoint has already been used in
>> a file on a persistent volume or GCS, and potentially when/why/by which
>> deployment). but these seem like unexpected and hacky solutions. before we
>> head down that road i wanted to ask:
>>
>>    - is this is already a solved problem that i've missed?
>>    - is this issue already on the community's radar?
>>
>> thanks in advance!
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com>
>> <http://www.bettercloud.com>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience” <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Re: Challenges Deploying Flink With Savepoints On Kubernetes

Posted by Yuval Itzchakov <yu...@gmail.com>.
AFAIK there's currently nothing implemented to solve this problem, but
working on a possible fix can be implemented on top of
https://github.com/lyft/flinkk8soperator which already has a pretty fancy
state machine for rolling upgrades. I'd love to be involved as this is an
issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <se...@bettercloud.com>
wrote:

> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> when deploying Flink jobs to start from savepoints using the job-cluster
> mode in Kubernetes.
>
> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> all long-running streaming jobs, all essentially acting as microservices.
> we're using Helm charts to configure all of our deployments.
>
> we have a number of use cases where we want to restart jobs from a
> savepoint to replay recent events, i.e. when we've enhanced the job logic
> or fixed a bug. but after the deployment we want to have the job resume
> it's "long-running" behavior, where any unplanned restarts resume from the
> latest checkpoint.
>
> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> deployment includes the savepoint argument in the configuration. if the Job
> Manager container(s) have an unplanned restart, when they come back up they
> will start from the savepoint instead of resuming from the latest
> checkpoint. everything is working as configured, but that's not exactly
> what we want. we want the savepoint argument to be transient somehow (only
> used during the initial deployment), but Kubernetes doesn't really support
> the concept of transient configuration.
>
> i can see a couple of potential solutions that either involve custom code
> in the jobs or custom logic in the container (i.e. a custom entrypoint
> script that records that the configured savepoint has already been used in
> a file on a persistent volume or GCS, and potentially when/why/by which
> deployment). but these seem like unexpected and hacky solutions. before we
> head down that road i wanted to ask:
>
>    - is this is already a solved problem that i've missed?
>    - is this issue already on the community's radar?
>
> thanks in advance!
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> <http://www.bettercloud.com> <http://www.bettercloud.com>
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>
>

-- 
Best Regards,
Yuval Itzchakov.