You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Reo Lei <le...@gmail.com> on 2019/11/25 16:36:34 UTC

How to recover state from savepoint on embedded mode?

Hi,
I have a job need running on embedded mode, but need to init some rule data
from a database before start. So I used the State Processor API to
construct my state data and save it to the local disk. When I want to used
this savepoint to recover my job, I found resume a job from a savepoint
need to use the command `bin/flink run -s :savepointPath [:runArgs]` to
submit a job to flink cluster. That is mean the job is run on remote mode,
not embedded mode.

And I was wondering why I can't resume a job from a savepoint on embedded
mode. If that is possible, what should I do?
BTW, if we can not  resume a job from a savepoint on embedded mode, how to
know the savepoint is constructed correctly in develop environment and use
idea to debug it?

BR,
Reo

Re: How to recover state from savepoint on embedded mode?

Posted by Biao Liu <mm...@gmail.com>.
Hi Reo,

Maybe we could find another way.

> why I am not use the standalnoe mode to run the job is because the
running env haven't zookeeper, and would not install the zookeeper. So I
need to depend on the embedded mode to run my job.

You could set up a standalone cluster without zookeeper.
Do not set "high-availability" in flink-conf.yaml or set it to "NONE". And
provide the "jobmanager.rpc.address" and "jobmanager.rpc.port" in
flink-conf.yaml at the same time.
In this way, you could build a standalone cluster, see more details in [1].

Could it satisfy your requirement?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/cluster_setup.html

Thanks,
Biao /'bɪ.aʊ/



On Fri, 29 Nov 2019 at 18:45, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi,
>
> I would like to clarify previous responses a bit.
>
> 1. From the architectural point of view yes it is true it is possible to
> restore from a savepoint from a local jvm as long as this jvm has access to
> the checkpoint.
>
> 2. Unfortunately the configuration you pass to the ctor of
> LocalStreamEnvironment is not passed to the StreamGraphGenerator which sets
> the savepoint configuration. That said, unless I am wrong this approach
> will not work.
>
> 3. There is no easy and officially supported way to do this. The official
> way would be to start a local cluster and submit your job remotely to that
> local cluster, which you can also debug remotely.
>
> I know this is not perfect. A different workaround I can offer would be to
> modify/reuse the LocalExecutionEnvironment a bit.
>
> You can
>
>    1.  get a StreamGraph from a StreamExecutionEnvironment (via
>    StreamExecutionEnvironment#getStreamGraph),
>    2.  generate a JobGraph out of it,
>    3.  set the savepoint settings
>    4.  and submit it locally to a MiniCluster.
>
> You can reuse majority of the code from the
> LocalStreamEnvironment#execute(StreamGraph) method. The thing you have to
> add is once you get the jobGraph:
>
> jobGrap.setSavepointRestoreSettings(...)
>
> I know this is not the nicest solution, but some of my colleagues are
> currently working on improving the job submission api. (Some of the FLIPs
> around the topic are:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> and
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> ).
>
> Best,
>
> Dawid
> On 28/11/2019 19:56, Arvid Heise wrote:
>
> Just to add up, if you use LocalStreamEnvironment, you can pass a
> configuration and you can set "execution.savepoint.path" to point to your
> savepoint.
>
> Best,
>
> Arvid
>
> On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu <qc...@gmail.com>
> wrote:
>
>> Hi,
>>
>> You can recovery from checkpoint/savepoint if JM&TM can read from the
>> given path. no math which mode the job is running on.
>>
>> Best,
>> Congxian
>>
>>
>> Reo Lei <le...@gmail.com> 于2019年11月26日周二 下午12:18写道:
>>
>>>
>>>
>>> ---------- Forwarded message ---------
>>> 发件人: Reo Lei <le...@gmail.com>
>>> Date: 2019年11月26日周二 上午9:53
>>> Subject: Re: How to recover state from savepoint on embedded mode?
>>> To: Yun Tang <my...@live.com>
>>>
>>>
>>> Hi Yun,
>>> Thanks for your reply. what I say the embedded mode is the whole flink
>>> cluster and job, include jobmanager, taskmanager and the job application
>>> itself, running within a local JVM progress, which is use the "
>>> LocalStreamEnvironment" within the job. And the start command look like
>>> this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
>>> com.a.b.c.MyJob > /dev/null &"
>>>
>>> why I am not use the standalnoe mode to run the job is because the
>>> running env haven't zookeeper, and would not install the zookeeper. So I
>>> need to depend on the embedded mode to run my job.
>>>
>>> BR,
>>> Reo
>>>
>>> Yun Tang <my...@live.com> 于2019年11月26日周二 上午2:38写道:
>>>
>>>> What is the embedded mode mean here? If you refer to SQL embedded mode,
>>>> you cannot resume from savepoint now; if you refer to local standalone
>>>> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>>>>
>>>>
>>>>
>>>> Best
>>>>
>>>> Yun Tang
>>>>
>>>>
>>>>
>>>> *From: *Reo Lei <le...@gmail.com>
>>>> *Date: *Tuesday, November 26, 2019 at 12:37 AM
>>>> *To: *"user@flink.apache.org" <us...@flink.apache.org>
>>>> *Subject: *How to recover state from savepoint on embedded mode?
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> I have a job need running on embedded mode, but need to init some rule
>>>> data from a database before start. So I used the State Processor API to
>>>> construct my state data and save it to the local disk. When I want to used
>>>> this savepoint to recover my job, I found resume a job from a savepoint
>>>> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]`
>>>> to submit a job to flink cluster. That is mean the job is run on remote
>>>> mode, not embedded mode.
>>>>
>>>>
>>>>
>>>> And I was wondering why I can't resume a job from a savepoint on
>>>> embedded mode. If that is possible, what should I do?
>>>>
>>>> BTW, if we can not  resume a job from a savepoint on embedded mode, how
>>>> to know the savepoint is constructed correctly in develop environment and
>>>> use idea to debug it?
>>>>
>>>>
>>>>
>>>> BR,
>>>>
>>>> Reo
>>>>
>>>>
>>>>
>>>

Re: How to recover state from savepoint on embedded mode?

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

I would like to clarify previous responses a bit.

1. From the architectural point of view yes it is true it is possible to
restore from a savepoint from a local jvm as long as this jvm has access
to the checkpoint.

2. Unfortunately the configuration you pass to the ctor of
LocalStreamEnvironment is not passed to the StreamGraphGenerator which
sets the savepoint configuration. That said, unless I am wrong this
approach will not work.

3. There is no easy and officially supported way to do this. The
official way would be to start a local cluster and submit your job
remotely to that local cluster, which you can also debug remotely.

I know this is not perfect. A different workaround I can offer would be
to modify/reuse the LocalExecutionEnvironment a bit.

You can

 1.  get a StreamGraph from a StreamExecutionEnvironment (via
    StreamExecutionEnvironment#getStreamGraph),
 2.  generate a JobGraph out of it,
 3.  set the savepoint settings
 4.  and submit it locally to a MiniCluster.

You can reuse majority of the code from the
LocalStreamEnvironment#execute(StreamGraph) method. The thing you have
to add is once you get the jobGraph:

jobGrap.setSavepointRestoreSettings(...)

I know this is not the nicest solution, but some of my colleagues are
currently working on improving the job submission api. (Some of the
FLIPs around the topic are:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
and
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API).

Best,

Dawid

On 28/11/2019 19:56, Arvid Heise wrote:
> Just to add up, if you use LocalStreamEnvironment, you can pass a
> configuration and you can set"execution.savepoint.path" to point to
> your savepoint.
>
> Best,
>
> Arvid
>
> On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu <qcx978132955@gmail.com
> <ma...@gmail.com>> wrote:
>
>     Hi,
>
>     You can recovery from checkpoint/savepoint if JM&TM can read from
>     the given path. no math which mode the job is running on.
>
>     Best,
>     Congxian
>
>
>     Reo Lei <leinuowen@gmail.com <ma...@gmail.com>>
>     于2019年11月26日周二 下午12:18写道:
>
>
>
>         ---------- Forwarded message ---------
>         发件人: *Reo Lei* <leinuowen@gmail.com
>         <ma...@gmail.com>>
>         Date: 2019年11月26日周二 上午9:53
>         Subject: Re: How to recover state from savepoint on embedded mode?
>         To: Yun Tang <myasuka@live.com <ma...@live.com>>
>
>
>         Hi Yun,
>         Thanks for your reply. what I say the embedded mode is the
>         whole flink cluster and job, include jobmanager, taskmanager
>         and the job application itself, running within a local JVM
>         progress, which is use the "LocalStreamEnvironment" within the
>         job. And the start command look like this: "java -Xmx512M
>         -XX:... -Dlog.file=... -cp flink-job.jar com.a.b.c.MyJob >
>         /dev/null &"
>
>         why I am not use the standalnoe mode to run the job is because
>         the running env haven't zookeeper, and would not install the
>         zookeeper. So I need to depend on the embedded mode to run my job.
>
>         BR,
>         Reo
>
>         Yun Tang <myasuka@live.com <ma...@live.com>>
>         于2019年11月26日周二 上午2:38写道:
>
>             What is the embedded mode mean here? If you refer to SQL
>             embedded mode, you cannot resume from savepoint now; if
>             you refer to local standalone cluster, you could use
>             `bin/flink run -s` to resume on a local cluster.
>
>              
>
>             Best
>
>             Yun Tang
>
>              
>
>             *From: *Reo Lei <leinuowen@gmail.com
>             <ma...@gmail.com>>
>             *Date: *Tuesday, November 26, 2019 at 12:37 AM
>             *To: *"user@flink.apache.org
>             <ma...@flink.apache.org>" <user@flink.apache.org
>             <ma...@flink.apache.org>>
>             *Subject: *How to recover state from savepoint on embedded
>             mode?
>
>              
>
>             Hi,
>
>             I have a job need running on embedded mode, but need to
>             init some rule data from a database before start. So I
>             used the State Processor API to construct my state data
>             and save it to the local disk. When I want to used this
>             savepoint to recover my job, I found resume a job from a
>             savepoint need to use the command `bin/flink run -s
>             :savepointPath *[*:runArgs]` to submit a job to flink
>             cluster. That is mean the job is run on remote mode, not
>             embedded mode.
>
>              
>
>             And I was wondering* *why I can't resume a job from a
>             savepoint on embedded mode. If that is possible, what
>             should I do?
>
>             BTW, if we can not  resume a job from a savepoint on
>             embedded mode, how to know the savepoint is constructed
>             correctly in develop environment and use idea to debug it?
>
>              
>
>             BR,
>
>             Reo
>
>              
>

Re: How to recover state from savepoint on embedded mode?

Posted by Arvid Heise <ar...@ververica.com>.
Just to add up, if you use LocalStreamEnvironment, you can pass a
configuration and you can set "execution.savepoint.path" to point to your
savepoint.

Best,

Arvid

On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu <qc...@gmail.com> wrote:

> Hi,
>
> You can recovery from checkpoint/savepoint if JM&TM can read from the
> given path. no math which mode the job is running on.
>
> Best,
> Congxian
>
>
> Reo Lei <le...@gmail.com> 于2019年11月26日周二 下午12:18写道:
>
>>
>>
>> ---------- Forwarded message ---------
>> 发件人: Reo Lei <le...@gmail.com>
>> Date: 2019年11月26日周二 上午9:53
>> Subject: Re: How to recover state from savepoint on embedded mode?
>> To: Yun Tang <my...@live.com>
>>
>>
>> Hi Yun,
>> Thanks for your reply. what I say the embedded mode is the whole flink
>> cluster and job, include jobmanager, taskmanager and the job application
>> itself, running within a local JVM progress, which is use the "
>> LocalStreamEnvironment" within the job. And the start command look like
>> this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
>> com.a.b.c.MyJob > /dev/null &"
>>
>> why I am not use the standalnoe mode to run the job is because the
>> running env haven't zookeeper, and would not install the zookeeper. So I
>> need to depend on the embedded mode to run my job.
>>
>> BR,
>> Reo
>>
>> Yun Tang <my...@live.com> 于2019年11月26日周二 上午2:38写道:
>>
>>> What is the embedded mode mean here? If you refer to SQL embedded mode,
>>> you cannot resume from savepoint now; if you refer to local standalone
>>> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>>>
>>>
>>>
>>> Best
>>>
>>> Yun Tang
>>>
>>>
>>>
>>> *From: *Reo Lei <le...@gmail.com>
>>> *Date: *Tuesday, November 26, 2019 at 12:37 AM
>>> *To: *"user@flink.apache.org" <us...@flink.apache.org>
>>> *Subject: *How to recover state from savepoint on embedded mode?
>>>
>>>
>>>
>>> Hi,
>>>
>>> I have a job need running on embedded mode, but need to init some rule
>>> data from a database before start. So I used the State Processor API to
>>> construct my state data and save it to the local disk. When I want to used
>>> this savepoint to recover my job, I found resume a job from a savepoint
>>> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]`
>>> to submit a job to flink cluster. That is mean the job is run on remote
>>> mode, not embedded mode.
>>>
>>>
>>>
>>> And I was wondering why I can't resume a job from a savepoint on
>>> embedded mode. If that is possible, what should I do?
>>>
>>> BTW, if we can not  resume a job from a savepoint on embedded mode, how
>>> to know the savepoint is constructed correctly in develop environment and
>>> use idea to debug it?
>>>
>>>
>>>
>>> BR,
>>>
>>> Reo
>>>
>>>
>>>
>>

Re: How to recover state from savepoint on embedded mode?

Posted by Congxian Qiu <qc...@gmail.com>.
Hi,

You can recovery from checkpoint/savepoint if JM&TM can read from the given
path. no math which mode the job is running on.

Best,
Congxian


Reo Lei <le...@gmail.com> 于2019年11月26日周二 下午12:18写道:

>
>
> ---------- Forwarded message ---------
> 发件人: Reo Lei <le...@gmail.com>
> Date: 2019年11月26日周二 上午9:53
> Subject: Re: How to recover state from savepoint on embedded mode?
> To: Yun Tang <my...@live.com>
>
>
> Hi Yun,
> Thanks for your reply. what I say the embedded mode is the whole flink
> cluster and job, include jobmanager, taskmanager and the job application
> itself, running within a local JVM progress, which is use the "
> LocalStreamEnvironment" within the job. And the start command look like
> this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
> com.a.b.c.MyJob > /dev/null &"
>
> why I am not use the standalnoe mode to run the job is because the running
> env haven't zookeeper, and would not install the zookeeper. So I need to depend
> on the embedded mode to run my job.
>
> BR,
> Reo
>
> Yun Tang <my...@live.com> 于2019年11月26日周二 上午2:38写道:
>
>> What is the embedded mode mean here? If you refer to SQL embedded mode,
>> you cannot resume from savepoint now; if you refer to local standalone
>> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>>
>>
>>
>> Best
>>
>> Yun Tang
>>
>>
>>
>> *From: *Reo Lei <le...@gmail.com>
>> *Date: *Tuesday, November 26, 2019 at 12:37 AM
>> *To: *"user@flink.apache.org" <us...@flink.apache.org>
>> *Subject: *How to recover state from savepoint on embedded mode?
>>
>>
>>
>> Hi,
>>
>> I have a job need running on embedded mode, but need to init some rule
>> data from a database before start. So I used the State Processor API to
>> construct my state data and save it to the local disk. When I want to used
>> this savepoint to recover my job, I found resume a job from a savepoint
>> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]`
>> to submit a job to flink cluster. That is mean the job is run on remote
>> mode, not embedded mode.
>>
>>
>>
>> And I was wondering why I can't resume a job from a savepoint on
>> embedded mode. If that is possible, what should I do?
>>
>> BTW, if we can not  resume a job from a savepoint on embedded mode, how
>> to know the savepoint is constructed correctly in develop environment and
>> use idea to debug it?
>>
>>
>>
>> BR,
>>
>> Reo
>>
>>
>>
>

Fwd: How to recover state from savepoint on embedded mode?

Posted by Reo Lei <le...@gmail.com>.
---------- Forwarded message ---------
发件人: Reo Lei <le...@gmail.com>
Date: 2019年11月26日周二 上午9:53
Subject: Re: How to recover state from savepoint on embedded mode?
To: Yun Tang <my...@live.com>


Hi Yun,
Thanks for your reply. what I say the embedded mode is the whole flink
cluster and job, include jobmanager, taskmanager and the job application
itself, running within a local JVM progress, which is use the "
LocalStreamEnvironment" within the job. And the start command look like
this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
com.a.b.c.MyJob > /dev/null &"

why I am not use the standalnoe mode to run the job is because the running
env haven't zookeeper, and would not install the zookeeper. So I need to depend
on the embedded mode to run my job.

BR,
Reo

Yun Tang <my...@live.com> 于2019年11月26日周二 上午2:38写道:

> What is the embedded mode mean here? If you refer to SQL embedded mode,
> you cannot resume from savepoint now; if you refer to local standalone
> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>
>
>
> Best
>
> Yun Tang
>
>
>
> *From: *Reo Lei <le...@gmail.com>
> *Date: *Tuesday, November 26, 2019 at 12:37 AM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *How to recover state from savepoint on embedded mode?
>
>
>
> Hi,
>
> I have a job need running on embedded mode, but need to init some rule
> data from a database before start. So I used the State Processor API to
> construct my state data and save it to the local disk. When I want to used
> this savepoint to recover my job, I found resume a job from a savepoint
> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]` to
> submit a job to flink cluster. That is mean the job is run on remote mode,
> not embedded mode.
>
>
>
> And I was wondering why I can't resume a job from a savepoint on embedded
> mode. If that is possible, what should I do?
>
> BTW, if we can not  resume a job from a savepoint on embedded mode, how to
> know the savepoint is constructed correctly in develop environment and use
> idea to debug it?
>
>
>
> BR,
>
> Reo
>
>
>

Re: How to recover state from savepoint on embedded mode?

Posted by Yun Tang <my...@live.com>.
What is the embedded mode mean here? If you refer to SQL embedded mode, you cannot resume from savepoint now; if you refer to local standalone cluster, you could use `bin/flink run -s` to resume on a local cluster.

Best
Yun Tang

From: Reo Lei <le...@gmail.com>
Date: Tuesday, November 26, 2019 at 12:37 AM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: How to recover state from savepoint on embedded mode?

Hi,
I have a job need running on embedded mode, but need to init some rule data from a database before start. So I used the State Processor API to construct my state data and save it to the local disk. When I want to used this savepoint to recover my job, I found resume a job from a savepoint need to use the command `bin/flink run -s :savepointPath [:runArgs]` to submit a job to flink cluster. That is mean the job is run on remote mode, not embedded mode.

And I was wondering why I can't resume a job from a savepoint on embedded mode. If that is possible, what should I do?
BTW, if we can not  resume a job from a savepoint on embedded mode, how to know the savepoint is constructed correctly in develop environment and use idea to debug it?

BR,
Reo