You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/01/31 14:09:27 UTC
[GitHub] flink pull request #5394: [FLINK-6571][tests] Catch InterruptedException in ...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/5394
[FLINK-6571][tests] Catch InterruptedException in StreamSourceOperato…
## What is the purpose of the change
This PR resolves a test instability in the StreamSourceOperatorTest, where the `InfiniteSource` could fail due to an `InterruptedException`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 6571
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5394.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5394
----
commit 453ba011b6b6beaf5102f3f376beb7c3a7260892
Author: zentol <ch...@...>
Date: 2018-01-31T14:07:55Z
[FLINK-6571][tests] Catch InterruptedException in StreamSourceOperatorTest
----
---
[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5394
It may not be a problem in this test, but I wanted to raise that this pattern is a bit dangerous.
If the thread ever gets interrupted while 'running' is still true, this goes into a hot loop constantly throwing exceptions: Every time it falls through the loop and attempts to sleep again, it will immediately throw an Interrupted Exception.
---
[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5394
I have often handled it like one of the below variants. What do you think about that pattern?
### Variant 1: Handle interruption if still running
```java
public void run(SourceContext<T> ctx) throws Exception {
while (running) {
try {
// do stuff
Thread.sleep(20);
} catch (InterruptedException e) {
// restore interruption flag
Thread.currentThread().interrupt();
if (running) {
throw new FlinkException("interrupted while still running", e);
}
// else fall through the loop
}
}
```
### Variant 2: Simple let InterruptedException bubble out
This variant is also fine, because the Task status is set to CANCELED before the interruption, so any exception bubbling out be suppresses.
```java
public void run(SourceContext<T> ctx) throws Exception {
while (running) {
// do stuff
// the InterruptedException from here simply fails the execution
Thread.sleep(20);
}
}
```
---
[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/5394
What's the state @zentol? Would Stephan's proposal work?
---
[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5394
I think neither solve the problem.
Variant 2 looks identical to what we have in master.
Variant 1 only allows interrupts after the task was canceled.
According to what @StephanEwen said, if the UDF throws an exception after the task was canceled the exception will be suppressed and should not lead to a test failure. Since the test did fail it thus must've been thrown _before_ the task was cancelled. Given that variant 1 still throws an exception in this case we aren't solving the stability issue.
---
[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5394
How about calling `Thread.currentThread().interrupt();` only after having left the loop?
```
public void run(SourceContext<T> ctx) throws Exception {
boolean setInterruptFlag = false;
while (running) {
try {
Thread.sleep(20);
} catch (InterruptedException ignored) {
setInterruptFlag = true;
}
}
if (setInterruptFlag) {
Thread.currentThread().interrupt();
}
}
```
This should behave like the original proposal, without the hot loop.
---