You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anil <an...@gmail.com> on 2018/05/31 20:01:48 UTC

Trigerring Savepoint for the Flink Job

I am using Flink 1.4.2. I have forker Uber's AthenaX  project
<https://github.com/uber/AthenaX>  . 

The Flink jobs are deployed in Yarn cluster. I needed to save the Savepoint
for all the jobs everyday.

ClusterClient
<https://github.com/apache/flink/blob/release-1.4.2/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L672>  
gave an implementation for saving savepoint using Flink ID. 
YarnClusterClient
<https://github.com/apache/flink/blob/release-1.4.2/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java>  
is an implementation of ClusteClient.

Initial though was to use YarnClusterClient instance with Flink Id (I save
this when the Flink Job is deployed to Yarn cluster) to trigger savepoint. 
So I created an instance of YarnClusterClient once and saved it so that I
could use it anytime in the application. But this doesn't seems to work. It
doesn't seems that it can cancel or trigger savepoint even with valid Flink
ID. When I try to cancel a valid Flink Job it throws and error for invalid
id. 

I would appreciate if someone could help me out here.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Trigerring Savepoint for the Flink Job

Posted by Anil <an...@gmail.com>.
Sorry about the late reply. 

This reply is more specific to the Uber's AthenaX  project
<https://github.com/uber/AthenaX>.

To trigger the savepoint we need to simply create an instance of
YarnClusterClient. This class has implementation to trigger savepoint.  To
trigger the savepoint for any job running in flink we need the flink_id.
When we submit the job to yarn cluster, we also get the corresponding flink
id and save it for triggering the savepoint. 

To generate the client you need the an instance of ApplicationReport. 
Code for it - 
ApplicationReport report =
cluster.client().getApplicationReport(application-id);

Now instantiate the YarnClusterClient and trigger savepoint. 






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Trigerring Savepoint for the Flink Job

Posted by Rong Rong <wa...@gmail.com>.
Glad to know you found a solution. would you mind sharing you workaround in
Flink 1.4.2. I am pretty sure other users would benefit from your effort
too :-)

Regarding the checkpoints / savepoints backend, we use HDFS as our stateful
backend instead of RocksDB. We are working on putting that logic to AthenaX
open-source soon.
We will definitely consider integration with RocksDB as well in our next
AthenaX release and would love to have your inputs considered towards the
final design.

Thanks,
Rong

On Mon, Jun 4, 2018 at 11:12 AM, Anil <an...@gmail.com> wrote:

> Just out of curiosity how do you save your checkpoint. Currently I'm using
> filesystem but I'm migrating it to RocksDB which allows for async
> operations
> to avoid latency at higher scale as we grow.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Trigerring Savepoint for the Flink Job

Posted by Anil <an...@gmail.com>.
Just out of curiosity how do you save your checkpoint. Currently I'm using
filesystem but I'm migrating it to RocksDB which allows for async operations
to avoid latency at higher scale as we grow.  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Trigerring Savepoint for the Flink Job

Posted by Anil <an...@gmail.com>.
Hi Rongs. Thanks for your help. I was about to look into the CLI API but then 
I figured out how to trigger savepoint and restore a job's savepoint  with
the job running in detached mode. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Trigerring Savepoint for the Flink Job

Posted by Rong Rong <wa...@gmail.com>.
Hi Anil,

Glad to know that you upgrade the system to 1.4, from our experience there
are quite a bit of changes requires to adapt to the new deployment model in
1.4 if I remember correctly.
The Deployment model "run detach" in AthenaX does not support reattach back
to the job, we use REST API to do all the subsequent life-cycle management.

There are a couple of ways I can think of to workaround if upgrade to 1.5
is not an option:
- try to use CLI API [1] instead of REST API by replacing the life-cycle
management component in WatchdogPolicy, so that you can trigger savepoints.
- try to modify the deployment model of AthenaX to not use "run detach"
mode by modifying the "YarnClusterDescriptor"

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html#savepoints

Hope this can help your use case.

Thanks,
Rong

On Thu, May 31, 2018 at 8:38 PM, Anil <an...@gmail.com> wrote:

> Thanks for the reply Rong. We had updated Athenax to version 1.4.
>
> I had checked Flink 1.4, it's rest endpoint dose not support only creating
> Savepoint. It has cancel With Savepoint. I think creating Savepoint is
> supported in 1.5. Since we can't upgrade to 1.5 at the moment it would like
> to find a workaround for the moment.
>
> Can you tell me how to reattaches to a running job in the cluster.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Trigerring Savepoint for the Flink Job

Posted by Anil <an...@gmail.com>.
Thanks for the reply Rong. We had updated Athenax to version 1.4. 

I had checked Flink 1.4, it's rest endpoint dose not support only creating
Savepoint. It has cancel With Savepoint. I think creating Savepoint is
supported in 1.5. Since we can't upgrade to 1.5 at the moment it would like
to find a workaround for the moment. 

Can you tell me how to reattaches to a running job in the cluster. 





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Trigerring Savepoint for the Flink Job

Posted by Rong Rong <wa...@gmail.com>.
Hi Anil,

I am actually one of the engineers maintaining Uber's AthenaX open-source
platform. As for now, AthenaX is still running on Flink 1.3.2 so it might
be weird to follow the 1.4.2 release document. (We are working on upgrading
to latest 1.5 release)

For your question regarding savepoints, AthenaX does not support savepoint
natively at this moment. But we have a separated API WatchdogPolicy that
you can customized. It supports monitoring / management APIs which will be
called periodically. You can utilize that to trigger your daily savepoints.
In term of how to do savepoint, I think the REST API [1] might be a good
starting point in AthenaX case because we launch the job in detach mode.

Please let me know if this is helpful.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html#available-requests

On Thu, May 31, 2018 at 1:01 PM, Anil <an...@gmail.com> wrote:

> I am using Flink 1.4.2. I have forker Uber's AthenaX  project
> <https://github.com/uber/AthenaX>  .
>
> The Flink jobs are deployed in Yarn cluster. I needed to save the Savepoint
> for all the jobs everyday.
>
> ClusterClient
> <https://github.com/apache/flink/blob/release-1.4.2/
> flink-clients/src/main/java/org/apache/flink/client/
> program/ClusterClient.java#L672>
> gave an implementation for saving savepoint using Flink ID.
> YarnClusterClient
> <https://github.com/apache/flink/blob/release-1.4.2/
> flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java>
> is an implementation of ClusteClient.
>
> Initial though was to use YarnClusterClient instance with Flink Id (I save
> this when the Flink Job is deployed to Yarn cluster) to trigger savepoint.
> So I created an instance of YarnClusterClient once and saved it so that I
> could use it anytime in the application. But this doesn't seems to work. It
> doesn't seems that it can cancel or trigger savepoint even with valid Flink
> ID. When I try to cancel a valid Flink Job it throws and error for invalid
> id.
>
> I would appreciate if someone could help me out here.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>