You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by bartektartanus <gi...@git.apache.org> on 2017/10/30 15:04:15 UTC
[GitHub] flink pull request #4924: [FLINK-7949] AsyncWaitOperator is not restarting w...
GitHub user bartektartanus opened a pull request:
https://github.com/apache/flink/pull/4924
[FLINK-7949] AsyncWaitOperator is not restarting when queue is full
Change:
Emitter thread is started BEFORE filling up the queue of recovered elements
Issue description:
During process restart, if the queue was full (with N elements) and there was pending element waiting to be added to the queue, then the queue couldn't fit N+1 elements and thread was blocked forever. As Till Rohrmann suggested here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
I've changed the order of this code to start emitter thread earlier.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/bartektartanus/flink master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4924.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4924
----
commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c
Author: Bartłomiej Tartanus <ba...@gmail.com>
Date: 2017-10-30T14:39:43Z
start emmiter thread BEFORE filling up the queue of recovered elements
----
---
[GitHub] flink pull request #4924: [FLINK-7949] AsyncWaitOperator is not restarting w...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4924
---
[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...
Posted by bartektartanus <gi...@git.apache.org>.
Github user bartektartanus commented on the issue:
https://github.com/apache/flink/pull/4924
Hi @tillrohrmann
I've finally managed to write a simple test that fails without my change in AsyncWaitOperator class. Test steps:
1. add enough records to fill up the AsyncWaitOperator queue
2. add record which processing takes more time than timeout which causes restart
3. data stream is restarted and in `open()` method it tries to add N+1 recovered stream elements to queue of size N
4. test waits forever and fails due to timeout.
Works exactly as I've mentioned before. But if the emitter is started earlier, eventually test passes after two restarts (`timeoutCounter` field in `TimeoutableFunction`). Tried to make this test as short and quick as possible. I don't know if this is the right file for this, please fix this if so. I also hope this change will be in next flink version :)
Happy New Year!
---
[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4924
Thanks a lot for this fix @bartektartanus. The changes look good to me. Merging your PR :-)
---
[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4924
Thanks for the fix @bartektartanus. LGTM.
Did you verify that this actually fixes the problem? Would be great if we could also add a unit test to guard against future regressions.
---
[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...
Posted by bartektartanus <gi...@git.apache.org>.
Github user bartektartanus commented on the issue:
https://github.com/apache/flink/pull/4924
Yes, it fixes our issue - now our test in [nussknacker](https://github.com/TouK/nussknacker) is passing. Process restarts seamlessly and works fine even after another restarts, but I haven't managed to reproduce this error in simple unit Flink test (yet). Maybe next week :)
---