You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Thomas Lamirault <th...@ericsson.com> on 2016/02/18 18:59:45 UTC
Flink HA
Hi !
We are trying flink in HA mode.
Our application is a streaming application with windowing mechanism.
We set in the flink yaml :
state.backend: filesystem
recovery.mode: zookeeper
recovery.zookeeper.quorum:<Our quorum>
recovery.zookeeper.path.root: <path>
recovery.zookeeper.storageDir: <storageDir>
recovery.backend.fs.checkpointdir: <pathcheckpoint>
yarn.application-attempts: 100
We want in case of application crash, the pending window has to be restore when the application restart.
Pending data are store into the <storageDir>/blob directory ?
Also, we try to write a script who restart the application after exceed the max attempts, with the last pending window.
How can I do that ? A simple restart of the application is enough, or do I have to "clean" the recovery.zookeeper.path.root ?
Thanks !
Thomas Lamirault
Re: Flink HA
Posted by Robert Metzger <rm...@apache.org>.
Hi Thomas,
To avoid having jobs forever restarting, you have to cancel them manually
(from the web interface or the /bin/flink client).
Also, you can set an appropriate restart strategy (in 1.0-SNAPSHOT), which
limits the number of retries. This way the retrying will eventually stop.
On Fri, Feb 19, 2016 at 4:05 PM, Thomas Lamirault <
thomas.lamirault@ericsson.com> wrote:
> I have resolved my issues.
> It seems that Avro had difficulties with my POJO. I change the management
> of the null value and it works fine.
>
> But, there is a way to cancel the old jobGraph who are starving in
> restarting status, and to keep the last one to restart ? Other than cancel
> JobId manually ?
>
> Thanks
>
> Thomas
> ________________________________________
> De : Thomas Lamirault [thomas.lamirault@ericsson.com]
> Envoyé : vendredi 19 février 2016 10:56
> À : user@flink.apache.org
> Objet : RE:Flink HA
>
> After set this configuration, I have some exceptions :
>
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException: java.util.HashMap; invalid
> descriptor for field
> at
> java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
> at
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
> ... 3 more
> Caused by: java.lang.IllegalArgumentException: illegal signature
> at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
> at
> java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
> ... 13 more
>
>
> If I run the application in not-HA mode, there is no problem.
> What can cause this kind of error ?
>
> Thanks
>
> Thomas
> ________________________________________De : Thomas Lamirault [
> thomas.lamirault@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À :
> user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !>
> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but
> I will modify the recovery.zookeeper.path.root> This is only relevant if
> you are using YARN. From your completeYes, I omit to say we will use
> YARN.>Does this help?Yes, a lot
> :-)Thomas________________________________________De : Ufuk Celebi [
> uce@apache.org]Envoyé : jeudi 18 février 2016 19:19À :
> user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM,
> Thomas Lamirault<th...@ericsson.com> wrote:> We are trying
> flink in HA mode.Great to hear!> We set in the flink yaml :>>
> state.backend: filesystem>> recovery.mode: zookeeper>
> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root:
> <path>>> recovery.zookeeper.storageDir: <storageDir>>>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be
> state.backend.fs.checkpointdir.Just to check: Both
> state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point
> to a file system path.> yarn.application-attempts: 100This is only relevant
> if you are using YARN. From your complete> We want in case of application
> crash, the pending window has to be restore> when the application
> restart.>> Pending data are store into the <storageDir>/blob directory ?>>
> Also, we try to write a script who restart the application after exceed
> the> max attempts, with the last pending window.>> How can I do that ? A
> simple restart of the application is enough, or do I> have to "clean" the
> recovery.zookeeper.path.root ?Restore happens automatically to the most
> recently checkpointed state.Everything under <storageDir> contains the
> actual state (includingJARs and JobGraph). ZooKeeper contains pointers to
> this state.Therefore, you must not delete the ZooKeeper root path.For the
> automatic restart, I would recommend using YARN. If you wantto do it
> manually, you need to restart the JobManager/TaskManagerinstances. The
> application will be recovered automatically fromZooKeeper/state
> backend.Does this help?– Ufuk
>
RE:Flink HA
Posted by Thomas Lamirault <th...@ericsson.com>.
I have resolved my issues.
It seems that Avro had difficulties with my POJO. I change the management of the null value and it works fine.
But, there is a way to cancel the old jobGraph who are starving in restarting status, and to keep the last one to restart ? Other than cancel JobId manually ?
Thanks
Thomas
________________________________________
De : Thomas Lamirault [thomas.lamirault@ericsson.com]
Envoyé : vendredi 19 février 2016 10:56
À : user@flink.apache.org
Objet : RE:Flink HA
After set this configuration, I have some exceptions :
java.lang.Exception: Could not restore checkpointed state to operators and functions
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
... 13 more
If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?
Thanks
Thomas
________________________________________De : Thomas Lamirault [thomas.lamirault@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [uce@apache.org]Envoyé : jeudi 18 février 2016 19:19À : user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<th...@ericsson.com> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk
RE:Flink HA
Posted by Thomas Lamirault <th...@ericsson.com>.
After set this configuration, I have some exceptions :
java.lang.Exception: Could not restore checkpointed state to operators and functions
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
... 13 more
If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?
Thanks
Thomas
________________________________________De : Thomas Lamirault [thomas.lamirault@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [uce@apache.org]Envoyé : jeudi 18 février 2016 19:19À : user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<th...@ericsson.com> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk
RE:Flink HA
Posted by Thomas Lamirault <th...@ericsson.com>.
Thanks for the quick reply !
> state.backend.fs.checkpointdir
Is actually pointing to a hdfs directory but I will modify the recovery.zookeeper.path.root
> This is only relevant if you are using YARN. From your complete
Yes, I omit to say we will use YARN.
>Does this help?
Yes, a lot :-)
Thomas
________________________________________
De : Ufuk Celebi [uce@apache.org]
Envoyé : jeudi 18 février 2016 19:19
À : user@flink.apache.org
Objet : Re: Flink HA
On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
<th...@ericsson.com> wrote:
> We are trying flink in HA mode.
Great to hear!
> We set in the flink yaml :
>
> state.backend: filesystem
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:<Our quorum>
>
> recovery.zookeeper.path.root: <path>
>
> recovery.zookeeper.storageDir: <storageDir>
>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>
It should be state.backend.fs.checkpointdir.
Just to check: Both state.backend.fs.checkpointdir and
recovery.zookeeper.path.root should point to a file system path.
> yarn.application-attempts: 100
This is only relevant if you are using YARN. From your complete
> We want in case of application crash, the pending window has to be restore
> when the application restart.
>
> Pending data are store into the <storageDir>/blob directory ?
>
> Also, we try to write a script who restart the application after exceed the
> max attempts, with the last pending window.
>
> How can I do that ? A simple restart of the application is enough, or do I
> have to "clean" the recovery.zookeeper.path.root ?
Restore happens automatically to the most recently checkpointed state.
Everything under <storageDir> contains the actual state (including
JARs and JobGraph). ZooKeeper contains pointers to this state.
Therefore, you must not delete the ZooKeeper root path.
For the automatic restart, I would recommend using YARN. If you want
to do it manually, you need to restart the JobManager/TaskManager
instances. The application will be recovered automatically from
ZooKeeper/state backend.
Does this help?
– Ufuk
Re: Flink HA
Posted by Ufuk Celebi <uc...@apache.org>.
On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
<th...@ericsson.com> wrote:
> We are trying flink in HA mode.
Great to hear!
> We set in the flink yaml :
>
> state.backend: filesystem
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:<Our quorum>
>
> recovery.zookeeper.path.root: <path>
>
> recovery.zookeeper.storageDir: <storageDir>
>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>
It should be state.backend.fs.checkpointdir.
Just to check: Both state.backend.fs.checkpointdir and
recovery.zookeeper.path.root should point to a file system path.
> yarn.application-attempts: 100
This is only relevant if you are using YARN. From your complete
> We want in case of application crash, the pending window has to be restore
> when the application restart.
>
> Pending data are store into the <storageDir>/blob directory ?
>
> Also, we try to write a script who restart the application after exceed the
> max attempts, with the last pending window.
>
> How can I do that ? A simple restart of the application is enough, or do I
> have to "clean" the recovery.zookeeper.path.root ?
Restore happens automatically to the most recently checkpointed state.
Everything under <storageDir> contains the actual state (including
JARs and JobGraph). ZooKeeper contains pointers to this state.
Therefore, you must not delete the ZooKeeper root path.
For the automatic restart, I would recommend using YARN. If you want
to do it manually, you need to restart the JobManager/TaskManager
instances. The application will be recovered automatically from
ZooKeeper/state backend.
Does this help?
– Ufuk