You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/05/07 12:05:05 UTC
[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/5962
[FLINK-9304] Timer service shutdown should not stop if interrupted
## What is the purpose of the change
This PR prevents that interruption prematurely stops the shutdown of the timer service.
## Brief change log
We catch `InterruptedException` around the shutdown call and repeat the attempt while there is still time left w.r.t. to the shutdown grace period.
## Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink FLINK-9304-timer-service
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5962.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5962
----
commit 962bf06a531ea484bc43e453c3c93a793765cad5
Author: Stefan Richter <s....@...>
Date: 2018-05-07T09:55:35Z
[FLINK-9304] Timer service shutdown should not stop if interrupted
----
---
[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...
Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/5962
The comment to make at least one attempt sounds good.
If you refactor this a little bit, you can actually test it. Either make it a static method to which you pass the timer service, or better, make it a method `shutdownUninterruptible(Deadline, Logger)` on the TimerService itself.
---
[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5962#discussion_r188004638
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---
@@ -504,7 +475,101 @@ public void testShutdownAndWaitPending() {
Assert.fail("Unexpected interruption.");
}
- Assert.assertTrue(check.get());
+ Assert.assertTrue(timerExecutionFinished.get());
+ Assert.assertTrue(timeService.isTerminated());
+ }
+
+ @Test
+ public void testShutdownServiceUninterruptible() {
+ final Object lock = new Object();
+ final OneShotLatch blockUntilTriggered = new OneShotLatch();
+ final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+ final SystemProcessingTimeService timeService =
+ createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
+
+ Assert.assertFalse(timeService.isTerminated());
+
+ final Thread interruptTarget = Thread.currentThread();
+ final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+ final Thread interruptCallerThread = new Thread(() -> {
+ while (runInterrupts.get()) {
+ interruptTarget.interrupt();
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ });
+
+ interruptCallerThread.start();
+
+ final long timeoutMs = 1000L;
--- End diff --
Can we make this a bit faster?
---
[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5962
---
[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5962
CC @tillrohrmann
---
[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5962#discussion_r188004528
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---
@@ -197,6 +204,23 @@ public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws Inte
return timerService.awaitTermination(time, timeUnit);
}
+ @Override
+ public boolean shutdownServiceUninterruptible(long timeoutMs) {
--- End diff --
Do we want to make the shutdown of the timer service swallow the `InterruptedException` also for all subsequent calls happening after the shutdown of the timer service?
---
[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5962
Thanks for the comments, I have updated the PR with another commit. Please take a look again if this can be merged.
---
[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5962#discussion_r186775375
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---
@@ -706,6 +689,38 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
}
}
+ private void tryShutdownTimerService() {
+ if (timerService != null && !timerService.isTerminated()) {
+ try {
+
+ final long totalTimeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
+ getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
+
+ final Deadline deadline = Deadline.fromNow(Duration.ofMillis(totalTimeoutMs));
+
+ boolean timerServiceShutdownComplete = false;
+
+ while (!timerServiceShutdownComplete && deadline.hasTimeLeft()) {
+ try {
--- End diff --
Shall we at least try to shutdown the time service one time? If so, I think maybe we should use `do {...} while()` here.
---
[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5962#discussion_r188006850
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---
@@ -504,7 +475,101 @@ public void testShutdownAndWaitPending() {
Assert.fail("Unexpected interruption.");
}
- Assert.assertTrue(check.get());
+ Assert.assertTrue(timerExecutionFinished.get());
+ Assert.assertTrue(timeService.isTerminated());
+ }
+
+ @Test
+ public void testShutdownServiceUninterruptible() {
+ final Object lock = new Object();
+ final OneShotLatch blockUntilTriggered = new OneShotLatch();
+ final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+ final SystemProcessingTimeService timeService =
+ createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
+
+ Assert.assertFalse(timeService.isTerminated());
+
+ final Thread interruptTarget = Thread.currentThread();
+ final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+ final Thread interruptCallerThread = new Thread(() -> {
+ while (runInterrupts.get()) {
+ interruptTarget.interrupt();
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ });
+
+ interruptCallerThread.start();
+
+ final long timeoutMs = 1000L;
--- End diff --
👍
---
[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5962#discussion_r188010236
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---
@@ -197,6 +204,23 @@ public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws Inte
return timerService.awaitTermination(time, timeUnit);
}
+ @Override
+ public boolean shutdownServiceUninterruptible(long timeoutMs) {
--- End diff --
👍
---