You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Thomas Lamirault <th...@ericsson.com> on 2016/02/18 18:59:45 UTC

Flink HA

Hi !



We are trying flink in HA mode.

Our application is a streaming application with windowing mechanism.

We set in the flink yaml :



state.backend: filesystem

recovery.mode: zookeeper
recovery.zookeeper.quorum:<Our quorum>

recovery.zookeeper.path.root: <path>

recovery.zookeeper.storageDir: <storageDir>

recovery.backend.fs.checkpointdir: <pathcheckpoint>

yarn.application-attempts: 100



We want in case of application crash, the pending window has to be restore when the application restart.

Pending data are store into the <storageDir>/blob directory ?

Also, we try to write a script who restart the application after exceed the max attempts, with the last pending window.

How can I do that ? A simple restart of the application is enough, or do I have to "clean" the recovery.zookeeper.path.root ?



Thanks !



Thomas Lamirault

Re: Flink HA

Posted by Robert Metzger <rm...@apache.org>.
Hi Thomas,

To avoid having jobs forever restarting, you have to cancel them manually
(from the web interface or the /bin/flink client).
Also, you can set an appropriate restart strategy (in 1.0-SNAPSHOT), which
limits the number of retries. This way the retrying will eventually stop.

On Fri, Feb 19, 2016 at 4:05 PM, Thomas Lamirault <
thomas.lamirault@ericsson.com> wrote:

> I have resolved my issues.
> It seems that Avro had difficulties with my POJO. I change the management
> of the null value and it works fine.
>
> But, there is a way to cancel the old jobGraph who are starving in
> restarting status, and to keep the last one to restart ? Other than cancel
> JobId manually ?
>
> Thanks
>
> Thomas
> ________________________________________
> De : Thomas Lamirault [thomas.lamirault@ericsson.com]
> Envoyé : vendredi 19 février 2016 10:56
> À : user@flink.apache.org
> Objet : RE:Flink HA
>
> After set this configuration, I have some exceptions :
>
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException: java.util.HashMap; invalid
> descriptor for field
>         at
> java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
>         at
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
>         ... 3 more
> Caused by: java.lang.IllegalArgumentException: illegal signature
>         at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
>         at
> java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
>         ... 13 more
>
>
> If I run the application in not-HA mode, there is no problem.
> What can cause this kind of error ?
>
> Thanks
>
> Thomas
> ________________________________________De : Thomas Lamirault [
> thomas.lamirault@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À :
> user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !>
> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but
> I will modify  the recovery.zookeeper.path.root> This is only relevant if
> you are using YARN. From your completeYes, I omit to say we will use
> YARN.>Does this help?Yes, a lot
> :-)Thomas________________________________________De : Ufuk Celebi [
> uce@apache.org]Envoyé : jeudi 18 février 2016 19:19À :
> user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM,
> Thomas Lamirault<th...@ericsson.com> wrote:> We are trying
> flink in HA mode.Great to hear!> We set in the flink yaml :>>
> state.backend: filesystem>> recovery.mode: zookeeper>
> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root:
> <path>>> recovery.zookeeper.storageDir: <storageDir>>>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be
> state.backend.fs.checkpointdir.Just to check: Both
> state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point
> to a file system path.> yarn.application-attempts: 100This is only relevant
> if you are using YARN. From your complete> We want in case of application
> crash, the pending window has to be restore> when the application
> restart.>> Pending data are store into the <storageDir>/blob directory ?>>
> Also, we try to write a script who restart the application after exceed
> the> max attempts, with the last pending window.>> How can I do that ? A
> simple restart of the application is enough, or do I> have to "clean" the
> recovery.zookeeper.path.root ?Restore happens automatically to the most
> recently checkpointed state.Everything under <storageDir> contains the
> actual state (includingJARs and JobGraph). ZooKeeper contains pointers to
> this state.Therefore, you must not delete the ZooKeeper root path.For the
> automatic restart, I would recommend using YARN. If you wantto do it
> manually, you need to restart the JobManager/TaskManagerinstances. The
> application will be recovered automatically fromZooKeeper/state
> backend.Does this help?– Ufuk
>

RE:Flink HA

Posted by Thomas Lamirault <th...@ericsson.com>.
I have resolved my issues.
It seems that Avro had difficulties with my POJO. I change the management of the null value and it works fine.

But, there is a way to cancel the old jobGraph who are starving in restarting status, and to keep the last one to restart ? Other than cancel JobId manually ?

Thanks

Thomas
________________________________________
De : Thomas Lamirault [thomas.lamirault@ericsson.com]
Envoyé : vendredi 19 février 2016 10:56
À : user@flink.apache.org
Objet : RE:Flink HA

After set this configuration, I have some exceptions :

java.lang.Exception: Could not restore checkpointed state to operators and functions
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
        at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
        ... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
        at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
        ... 13 more


If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?

Thanks

Thomas
________________________________________De : Thomas Lamirault [thomas.lamirault@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify  the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [uce@apache.org]Envoyé : jeudi 18 février 2016 19:19À : user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<th...@ericsson.com> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk

RE:Flink HA

Posted by Thomas Lamirault <th...@ericsson.com>.
After set this configuration, I have some exceptions :

java.lang.Exception: Could not restore checkpointed state to operators and functions
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field 
	at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
	at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
	... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
	at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
	at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
	... 13 more


If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?

Thanks

Thomas
________________________________________De : Thomas Lamirault [thomas.lamirault@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify  the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [uce@apache.org]Envoyé : jeudi 18 février 2016 19:19À : user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<th...@ericsson.com> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk

RE:Flink HA

Posted by Thomas Lamirault <th...@ericsson.com>.
Thanks for the quick reply !

> state.backend.fs.checkpointdir
Is actually pointing to a hdfs directory but I will modify  the recovery.zookeeper.path.root

> This is only relevant if you are using YARN. From your complete
Yes, I omit to say we will use YARN.

>Does this help?
Yes, a lot :-)

Thomas

________________________________________
De : Ufuk Celebi [uce@apache.org]
Envoyé : jeudi 18 février 2016 19:19
À : user@flink.apache.org
Objet : Re: Flink HA

On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
<th...@ericsson.com> wrote:
> We are trying flink in HA mode.

Great to hear!

> We set in the flink yaml :
>
> state.backend: filesystem
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:<Our quorum>
>
> recovery.zookeeper.path.root: <path>
>
> recovery.zookeeper.storageDir: <storageDir>
>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>

It should be state.backend.fs.checkpointdir.

Just to check: Both state.backend.fs.checkpointdir and
recovery.zookeeper.path.root should point to a file system path.

> yarn.application-attempts: 100

This is only relevant if you are using YARN. From your complete


> We want in case of application crash, the pending window has to be restore
> when the application restart.
>
> Pending data are store into the <storageDir>/blob directory ?
>
> Also, we try to write a script who restart the application after exceed the
> max attempts, with the last pending window.
>
> How can I do that ? A simple restart of the application is enough, or do I
> have to "clean" the recovery.zookeeper.path.root ?

Restore happens automatically to the most recently checkpointed state.

Everything under <storageDir> contains the actual state (including
JARs and JobGraph). ZooKeeper contains pointers to this state.
Therefore, you must not delete the ZooKeeper root path.

For the automatic restart, I would recommend using YARN. If you want
to do it manually, you need to restart the JobManager/TaskManager
instances. The application will be recovered automatically from
ZooKeeper/state backend.


Does this help?

– Ufuk

Re: Flink HA

Posted by Ufuk Celebi <uc...@apache.org>.
On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
<th...@ericsson.com> wrote:
> We are trying flink in HA mode.

Great to hear!

> We set in the flink yaml :
>
> state.backend: filesystem
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:<Our quorum>
>
> recovery.zookeeper.path.root: <path>
>
> recovery.zookeeper.storageDir: <storageDir>
>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>

It should be state.backend.fs.checkpointdir.

Just to check: Both state.backend.fs.checkpointdir and
recovery.zookeeper.path.root should point to a file system path.

> yarn.application-attempts: 100

This is only relevant if you are using YARN. From your complete


> We want in case of application crash, the pending window has to be restore
> when the application restart.
>
> Pending data are store into the <storageDir>/blob directory ?
>
> Also, we try to write a script who restart the application after exceed the
> max attempts, with the last pending window.
>
> How can I do that ? A simple restart of the application is enough, or do I
> have to "clean" the recovery.zookeeper.path.root ?

Restore happens automatically to the most recently checkpointed state.

Everything under <storageDir> contains the actual state (including
JARs and JobGraph). ZooKeeper contains pointers to this state.
Therefore, you must not delete the ZooKeeper root path.

For the automatic restart, I would recommend using YARN. If you want
to do it manually, you need to restart the JobManager/TaskManager
instances. The application will be recovered automatically from
ZooKeeper/state backend.


Does this help?

– Ufuk