You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/01/31 14:09:27 UTC

[GitHub] flink pull request #5394: [FLINK-6571][tests] Catch InterruptedException in ...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5394

    [FLINK-6571][tests] Catch InterruptedException in StreamSourceOperato…

    ## What is the purpose of the change
    
    This PR resolves a test instability in the StreamSourceOperatorTest, where the `InfiniteSource` could fail due to an `InterruptedException`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 6571

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5394.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5394
    
----
commit 453ba011b6b6beaf5102f3f376beb7c3a7260892
Author: zentol <ch...@...>
Date:   2018-01-31T14:07:55Z

    [FLINK-6571][tests] Catch InterruptedException in StreamSourceOperatorTest

----


---

[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5394
  
    It may not be a problem in this test, but I wanted to raise that this pattern is a bit dangerous.
    If the thread ever gets interrupted while 'running' is still true, this goes into a hot loop constantly throwing exceptions: Every time it falls through the loop and attempts to sleep again, it will immediately throw an Interrupted Exception.


---

[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5394
  
    I have often handled it like one of the below variants. What do you think about that pattern?
    
    ### Variant 1: Handle interruption if still running
    ```java
    public void run(SourceContext<T> ctx) throws Exception {
        while (running) {
            try {
                // do stuff
                Thread.sleep(20);
            } catch (InterruptedException e) {
                // restore interruption flag
                Thread.currentThread().interrupt();
                if (running) {
                    throw new FlinkException("interrupted while still running", e);
                }
                // else fall through the loop
    	}
    }
    ```
    
    ### Variant 2: Simple let InterruptedException bubble out
    
    This variant is also fine, because the Task status is set to CANCELED before the interruption, so any exception bubbling out be suppresses.
    
    ```java
    public void run(SourceContext<T> ctx) throws Exception {
        while (running) {
            // do stuff
    
            // the InterruptedException from here simply fails the execution
            Thread.sleep(20);
        }
    }
    ```


---

[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5394
  
    What's the state @zentol? Would Stephan's proposal work?


---

[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5394
  
    I think neither solve the problem.
    
    Variant 2 looks identical to what we have in master.
    
    Variant 1 only allows interrupts after the task was canceled.
    According to what @StephanEwen said, if the UDF throws an exception after the task was canceled the exception will be suppressed and should not lead to a test failure. Since the test did fail it thus must've been thrown _before_ the task was cancelled. Given that variant 1 still throws an exception in this case we aren't solving the stability issue.


---

[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5394
  
    How about calling `Thread.currentThread().interrupt();` only after having left the loop?
    ```
    public void run(SourceContext<T> ctx) throws Exception {
    	boolean setInterruptFlag = false;
    	while (running) {
    		try {
    			Thread.sleep(20);
    		} catch (InterruptedException ignored) {
    			setInterruptFlag = true;
    		}
    	}
    	if (setInterruptFlag) {
    		Thread.currentThread().interrupt();
    	}
    }
    ```
    
    This should behave like the original proposal, without the hot loop.


---