You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Karthik Deivasigamani <ka...@gmail.com> on 2017/10/06 14:52:38 UTC

Checkpoint was declined (tasks not ready)

Hi,
    I'm noticing a weird issue with our flink streaming job. We use async
io operator which makes a HTTP call and in certain cases when the async
task times out, it throws an exception and causing the job to restart.

java.lang.Exception: An async function call terminated with an
exception. Failing the AsyncWaitOperator.
	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.TimeoutException: Async function call has timed
out.
	at org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)


After the job restarts(we have a fixed restart strategy) we notice that the
checkpoints start failing continuously with this message :
Checkpoint was declined (tasks not ready)

[image: Inline image 1]

But we see the job is running, its processing data, the accumulators we
have are getting incremented etc but checkpointing fails with tasks not
ready message.

Wanted to reach out to the community to see if anyone else has experienced
this issue before?
~
Karthik

Re: Checkpoint was declined (tasks not ready)

Posted by bartektartanus <ba...@gmail.com>.
I've created an issue on Jira and prepared pull request, here's the link:
https://github.com/apache/flink/pull/4924
Travis CI check is not passing but looks like it's not my fault :)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint was declined (tasks not ready)

Posted by Till Rohrmann <tr...@apache.org>.
Yes please open the PR against Flink's master branch. You can also ping me
once you've opened the PR. Then we can hopefully quickly merge it :-)

Cheers,
Till

On Thu, Oct 26, 2017 at 12:44 PM, bartektartanus <ba...@gmail.com>
wrote:

> I think we could try with option number one, as it seems to be easier to
> implement. Currently I'm cloning Flink repo to fix this and test that
> solution with our currently not working code. Unfortunately, it takes
> forever to download all the dependencies. Anyway, I hope that eventually
> will manage to create pull request (today). To which branch? Is master ok?
>
> Bartek
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Checkpoint was declined (tasks not ready)

Posted by bartektartanus <ba...@gmail.com>.
I think we could try with option number one, as it seems to be easier to
implement. Currently I'm cloning Flink repo to fix this and test that
solution with our currently not working code. Unfortunately, it takes
forever to download all the dependencies. Anyway, I hope that eventually
will manage to create pull request (today). To which branch? Is master ok?
 
Bartek



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint was declined (tasks not ready)

Posted by Till Rohrmann <tr...@apache.org>.
Hi Bartek,

I think your explanation of the problem is correct. Thanks a lot for your
investigation.

What we could do to solve the problem is the following:

Either) We start the emitter thread before we restore the elements in the
open method. That way the open method won't block forever but only until
the first element has been emitted downstream.

or) Don't accept a pendingStreamElementQueueEntry by waiting in the
processElement function until we have capacity left again in the queue.

What do you think?

Do you want to contribute the fix for this problem?

Cheers,
Till

On Mon, Oct 23, 2017 at 4:30 PM, bartektartanus <ba...@gmail.com>
wrote:

> Ok, looks like we've found the cause of this issue. The scenario looks like
> this:
> 1. The queue is full (let's assume that its capacity is N elements)
> 2. There is some pending element waiting, so the
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and
> while-loop in addAsyncBufferEntry method is trying to add this element to
> the queue (but element is not added because queue is full)
> 3. Now the snapshot is taken - the whole queue of N elements is being
> written into the ListState in snapshotState method and also (what is more
> important) this pendingStreamElementQueueEntry is written to this list too.
> 4. The process is being restarted, so it tries to recover all the elements
> and put them again into the queue, but the list of recovered elements hold
> N+1 element and our queue capacity is only N. Process is not started yet,
> so
> it can not process any element and this one element is waiting endlessly.
> But it's never added and the process will never process anything. Deadlock.
> 5. Trigger is fired and indeed discarded because the process is not running
> yet.
>
> If something is unclear in my description - please let me know. We will
> also
> try to reproduce this bug in some unit test and then report Jira issue.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Checkpoint was declined (tasks not ready)

Posted by bartektartanus <ba...@gmail.com>.
Ok, looks like we've found the cause of this issue. The scenario looks like
this:
1. The queue is full (let's assume that its capacity is N elements)
2. There is some pending element waiting, so the
pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and
while-loop in addAsyncBufferEntry method is trying to add this element to
the queue (but element is not added because queue is full)
3. Now the snapshot is taken - the whole queue of N elements is being
written into the ListState in snapshotState method and also (what is more
important) this pendingStreamElementQueueEntry is written to this list too. 
4. The process is being restarted, so it tries to recover all the elements
and put them again into the queue, but the list of recovered elements hold
N+1 element and our queue capacity is only N. Process is not started yet, so
it can not process any element and this one element is waiting endlessly.
But it's never added and the process will never process anything. Deadlock.
5. Trigger is fired and indeed discarded because the process is not running
yet.

If something is unclear in my description - please let me know. We will also
try to reproduce this bug in some unit test and then report Jira issue.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint was declined (tasks not ready)

Posted by Maciek Próchniak <mp...@touk.pl>.
it seems that one of operators is stuck during recovery:

prio=5 os_prio=0 tid=0x00007f634bb31000 nid=0xd5e in Object.wait() 
[0x00007f63f13cc000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502 <http://object.java:502/>)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:406 
<http://asyncwaitoperator.java:406/>)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228 
<http://asyncwaitoperator.java:228/>)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:174 
<http://asyncwaitoperator.java:174/>)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376 
<http://streamtask.java:376/>)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253 
<http://streamtask.java:253/>)
- locked <0x000000037ae51a38> (a java.lang.Object)
atorg.apache.flink.runtime.taskmanager.Task.run 
<http://org.apache.flink.runtime.taskmanager.task.run/>(Task.java:702)
atjava.lang.Thread.run <http://java.lang.thread.run/>(Thread.java:745)


On 23/10/2017 13:54, Maciek Próchniak wrote:
>
> we also have similar problem - it happens really often when we invoke 
> async operators (ordered one). But we also observe that job is not 
> starting properly - we don't process any data when such problems appear
>
> we'll keep you posted if we manage to find exact cause...
>
>
> thanks,
> maciek
>
> On 09/10/2017 12:10, Karthik Deivasigamani wrote:
>> Hi Stephan,
>>     Once the job restarts due to an async io operator timeout we 
>> notice that its checkpoints never succeed again.  But the job is 
>> running fine and is processing data.
>> ~
>> Karthik
>>
>>
>> On Mon, Oct 9, 2017 at 3:19 PM, Stephan Ewen <sewen@apache.org 
>> <ma...@apache.org>> wrote:
>>
>>     As long as this does not appear all the time, but only once in a
>>     while, it should not be a problem.
>>     It simply means that this particular checkpoint could not be
>>     triggered, because some sources were not ready yet.
>>
>>     It should try another checkpoint and then be okay.
>>
>>
>>     On Fri, Oct 6, 2017 at 4:53 PM, Karthik Deivasigamani
>>     <karthik.d@gmail.com <ma...@gmail.com>> wrote:
>>
>>         We are using Flink 1.3.1 in Standalone mode with a HA job
>>         manager setup.
>>         ~
>>         Karthik
>>
>>         On Fri, Oct 6, 2017 at 8:22 PM, Karthik Deivasigamani
>>         <karthik.d@gmail.com <ma...@gmail.com>> wrote:
>>
>>             Hi,
>>                 I'm noticing a weird issue with our flink streaming
>>             job. We use async io operator which makes a HTTP call and
>>             in certain cases when the async task times out, it throws
>>             an exception and causing the job to restart.
>>
>>             java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
>>             	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
>>             	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
>>             	at java.lang.Thread.run(Thread.java:745)
>>             Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Async function call has timed out.
>>             	at org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)
>>
>>
>>             After the job restarts(we have a fixed restart strategy)
>>             we notice that the checkpoints start failing continuously
>>             with this message :
>>             Checkpoint was declined (tasks not ready)
>>
>>             Inline image 1
>>
>>             But we see the job is running, its processing data, the
>>             accumulators we have are getting incremented etc but
>>             checkpointing fails with tasks not ready message.
>>
>>             Wanted to reach out to the community to see if anyone
>>             else has experienced this issue before?
>>             ~
>>             Karthik
>>
>>
>>
>>
>


Re: Checkpoint was declined (tasks not ready)

Posted by Maciek Próchniak <mp...@touk.pl>.
we also have similar problem - it happens really often when we invoke 
async operators (ordered one). But we also observe that job is not 
starting properly - we don't process any data when such problems appear

we'll keep you posted if we manage to find exact cause...


thanks,
maciek

On 09/10/2017 12:10, Karthik Deivasigamani wrote:
> Hi Stephan,
>     Once the job restarts due to an async io operator timeout we 
> notice that its checkpoints never succeed again. But the job is 
> running fine and is processing data.
> ~
> Karthik
>
>
> On Mon, Oct 9, 2017 at 3:19 PM, Stephan Ewen <sewen@apache.org 
> <ma...@apache.org>> wrote:
>
>     As long as this does not appear all the time, but only once in a
>     while, it should not be a problem.
>     It simply means that this particular checkpoint could not be
>     triggered, because some sources were not ready yet.
>
>     It should try another checkpoint and then be okay.
>
>
>     On Fri, Oct 6, 2017 at 4:53 PM, Karthik Deivasigamani
>     <karthik.d@gmail.com <ma...@gmail.com>> wrote:
>
>         We are using Flink 1.3.1 in Standalone mode with a HA job
>         manager setup.
>         ~
>         Karthik
>
>         On Fri, Oct 6, 2017 at 8:22 PM, Karthik Deivasigamani
>         <karthik.d@gmail.com <ma...@gmail.com>> wrote:
>
>             Hi,
>                 I'm noticing a weird issue with our flink streaming
>             job. We use async io operator which makes a HTTP call and
>             in certain cases when the async task times out, it throws
>             an exception and causing the job to restart.
>
>             java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
>             	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
>             	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
>             	at java.lang.Thread.run(Thread.java:745)
>             Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Async function call has timed out.
>             	at org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)
>
>
>             After the job restarts(we have a fixed restart strategy)
>             we notice that the checkpoints start failing continuously
>             with this message :
>             Checkpoint was declined (tasks not ready)
>
>             Inline image 1
>
>             But we see the job is running, its processing data, the
>             accumulators we have are getting incremented etc but
>             checkpointing fails with tasks not ready message.
>
>             Wanted to reach out to the community to see if anyone else
>             has experienced this issue before?
>             ~
>             Karthik
>
>
>
>


Re: Checkpoint was declined (tasks not ready)

Posted by Karthik Deivasigamani <ka...@gmail.com>.
Hi Stephan,
    Once the job restarts due to an async io operator timeout we notice
that its checkpoints never succeed again.  But the job is running fine and
is processing data.
~
Karthik


On Mon, Oct 9, 2017 at 3:19 PM, Stephan Ewen <se...@apache.org> wrote:

> As long as this does not appear all the time, but only once in a while, it
> should not be a problem.
> It simply means that this particular checkpoint could not be triggered,
> because some sources were not ready yet.
>
> It should try another checkpoint and then be okay.
>
>
> On Fri, Oct 6, 2017 at 4:53 PM, Karthik Deivasigamani <karthik.d@gmail.com
> > wrote:
>
>> We are using Flink 1.3.1 in Standalone mode with a HA job manager setup.
>> ~
>> Karthik
>>
>> On Fri, Oct 6, 2017 at 8:22 PM, Karthik Deivasigamani <
>> karthik.d@gmail.com> wrote:
>>
>>> Hi,
>>>     I'm noticing a weird issue with our flink streaming job. We use
>>> async io operator which makes a HTTP call and in certain cases when the
>>> async task times out, it throws an exception and causing the job to
>>> restart.
>>>
>>> java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
>>> 	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
>>> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Async function call has timed out.
>>> 	at org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)
>>>
>>>
>>> After the job restarts(we have a fixed restart strategy) we notice that
>>> the checkpoints start failing continuously with this message :
>>> Checkpoint was declined (tasks not ready)
>>>
>>> [image: Inline image 1]
>>>
>>> But we see the job is running, its processing data, the accumulators we
>>> have are getting incremented etc but checkpointing fails with tasks not
>>> ready message.
>>>
>>> Wanted to reach out to the community to see if anyone else has
>>> experienced this issue before?
>>> ~
>>> Karthik
>>>
>>
>>
>

Re: Checkpoint was declined (tasks not ready)

Posted by Stephan Ewen <se...@apache.org>.
As long as this does not appear all the time, but only once in a while, it
should not be a problem.
It simply means that this particular checkpoint could not be triggered,
because some sources were not ready yet.

It should try another checkpoint and then be okay.


On Fri, Oct 6, 2017 at 4:53 PM, Karthik Deivasigamani <ka...@gmail.com>
wrote:

> We are using Flink 1.3.1 in Standalone mode with a HA job manager setup.
> ~
> Karthik
>
> On Fri, Oct 6, 2017 at 8:22 PM, Karthik Deivasigamani <karthik.d@gmail.com
> > wrote:
>
>> Hi,
>>     I'm noticing a weird issue with our flink streaming job. We use async
>> io operator which makes a HTTP call and in certain cases when the async
>> task times out, it throws an exception and causing the job to restart.
>>
>> java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
>> 	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
>> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
>> 	at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Async function call has timed out.
>> 	at org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)
>>
>>
>> After the job restarts(we have a fixed restart strategy) we notice that
>> the checkpoints start failing continuously with this message :
>> Checkpoint was declined (tasks not ready)
>>
>> [image: Inline image 1]
>>
>> But we see the job is running, its processing data, the accumulators we
>> have are getting incremented etc but checkpointing fails with tasks not
>> ready message.
>>
>> Wanted to reach out to the community to see if anyone else has
>> experienced this issue before?
>> ~
>> Karthik
>>
>
>

Re: Checkpoint was declined (tasks not ready)

Posted by Karthik Deivasigamani <ka...@gmail.com>.
We are using Flink 1.3.1 in Standalone mode with a HA job manager setup.
~
Karthik

On Fri, Oct 6, 2017 at 8:22 PM, Karthik Deivasigamani <ka...@gmail.com>
wrote:

> Hi,
>     I'm noticing a weird issue with our flink streaming job. We use async
> io operator which makes a HTTP call and in certain cases when the async
> task times out, it throws an exception and causing the job to restart.
>
> java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
> 	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Async function call has timed out.
> 	at org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)
>
>
> After the job restarts(we have a fixed restart strategy) we notice that
> the checkpoints start failing continuously with this message :
> Checkpoint was declined (tasks not ready)
>
> [image: Inline image 1]
>
> But we see the job is running, its processing data, the accumulators we
> have are getting incremented etc but checkpointing fails with tasks not
> ready message.
>
> Wanted to reach out to the community to see if anyone else has experienced
> this issue before?
> ~
> Karthik
>