You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by bartektartanus <gi...@git.apache.org> on 2017/10/30 15:04:15 UTC

[GitHub] flink pull request #4924: [FLINK-7949] AsyncWaitOperator is not restarting w...

GitHub user bartektartanus opened a pull request:

    https://github.com/apache/flink/pull/4924

    [FLINK-7949] AsyncWaitOperator is not restarting when queue is full

    Change:
    Emitter thread is started BEFORE filling up the queue of recovered elements
    Issue description:
    During process restart, if the queue was full (with N elements) and there was pending element waiting to be added to the queue, then the queue couldn't fit N+1 elements and thread was blocked forever. As Till Rohrmann suggested here:
    http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
    I've changed the order of this code to start emitter thread earlier.   


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bartektartanus/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4924
    
----
commit 97620649ddfcf8f0320b20bdfdb69d9b44dd8f0c
Author: Bartłomiej Tartanus <ba...@gmail.com>
Date:   2017-10-30T14:39:43Z

    start emmiter thread BEFORE filling up the queue of recovered elements

----


---

[GitHub] flink pull request #4924: [FLINK-7949] AsyncWaitOperator is not restarting w...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4924


---

[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...

Posted by bartektartanus <gi...@git.apache.org>.
Github user bartektartanus commented on the issue:

    https://github.com/apache/flink/pull/4924
  
    Hi @tillrohrmann 
    I've finally managed to write a simple test that fails without my change in AsyncWaitOperator class. Test steps:
    1. add enough records to fill up the AsyncWaitOperator queue
    2. add record which processing takes more time than timeout which causes restart
    3. data stream is restarted and in `open()` method it tries to add N+1 recovered stream elements to queue of size N
    4. test waits forever and fails due to timeout. 
    
    Works exactly as I've mentioned before. But if the emitter is started earlier, eventually test passes after two restarts (`timeoutCounter` field in `TimeoutableFunction`). Tried to make this test as short and quick as possible. I don't know if this is the right file for this, please fix this if so. I also hope this change will be in next flink version :) 
    Happy New Year!


---

[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4924
  
    Thanks a lot for this fix @bartektartanus. The changes look good to me. Merging your PR :-)


---

[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4924
  
    Thanks for the fix @bartektartanus. LGTM. 
    
    Did you verify that this actually fixes the problem? Would be great if we could also add a unit test to guard against future regressions.


---

[GitHub] flink issue #4924: [FLINK-7949] AsyncWaitOperator is not restarting when que...

Posted by bartektartanus <gi...@git.apache.org>.
Github user bartektartanus commented on the issue:

    https://github.com/apache/flink/pull/4924
  
    Yes, it fixes our issue - now our test in [nussknacker](https://github.com/TouK/nussknacker) is passing. Process restarts seamlessly and works fine even after another restarts, but I haven't managed to reproduce this error in simple unit Flink test (yet). Maybe next week :)


---