You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/05/07 12:05:05 UTC

[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/5962

    [FLINK-9304] Timer service shutdown should not stop if interrupted

    ## What is the purpose of the change
    
    This PR prevents that interruption prematurely stops the shutdown of the timer service.
    
    
    ## Brief change log
    
    We catch `InterruptedException` around the shutdown call and repeat the attempt while there is still time left w.r.t. to the shutdown grace period.
    
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink FLINK-9304-timer-service

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5962
    
----
commit 962bf06a531ea484bc43e453c3c93a793765cad5
Author: Stefan Richter <s....@...>
Date:   2018-05-07T09:55:35Z

    [FLINK-9304] Timer service shutdown should not stop if interrupted

----


---

[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5962
  
    The comment to make at least one attempt sounds good.
    
    If you refactor this a little bit, you can actually test it. Either make it a static method to which you pass the timer service, or better, make it a method `shutdownUninterruptible(Deadline, Logger)` on the TimerService itself.


---

[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5962#discussion_r188004638
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---
    @@ -504,7 +475,101 @@ public void testShutdownAndWaitPending() {
     			Assert.fail("Unexpected interruption.");
     		}
     
    -		Assert.assertTrue(check.get());
    +		Assert.assertTrue(timerExecutionFinished.get());
    +		Assert.assertTrue(timeService.isTerminated());
    +	}
    +
    +	@Test
    +	public void testShutdownServiceUninterruptible() {
    +		final Object lock = new Object();
    +		final OneShotLatch blockUntilTriggered = new OneShotLatch();
    +		final AtomicBoolean timerFinished = new AtomicBoolean(false);
    +
    +		final SystemProcessingTimeService timeService =
    +			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
    +
    +		Assert.assertFalse(timeService.isTerminated());
    +
    +		final Thread interruptTarget = Thread.currentThread();
    +		final AtomicBoolean runInterrupts = new AtomicBoolean(true);
    +		final Thread interruptCallerThread = new Thread(() -> {
    +			while (runInterrupts.get()) {
    +				interruptTarget.interrupt();
    +				try {
    +					Thread.sleep(10);
    +				} catch (InterruptedException ignore) {
    +				}
    +			}
    +		});
    +
    +		interruptCallerThread.start();
    +
    +		final long timeoutMs = 1000L;
    --- End diff --
    
    Can we make this a bit faster?


---

[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5962


---

[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5962
  
    CC @tillrohrmann 


---

[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5962#discussion_r188004528
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---
    @@ -197,6 +204,23 @@ public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws Inte
     		return timerService.awaitTermination(time, timeUnit);
     	}
     
    +	@Override
    +	public boolean shutdownServiceUninterruptible(long timeoutMs) {
    --- End diff --
    
    Do we want to make the shutdown of the timer service swallow the `InterruptedException` also for all subsequent calls happening after the shutdown of the timer service?


---

[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5962
  
    Thanks for the comments, I have updated the PR with another commit. Please take a look again if this can be merged.


---

[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5962#discussion_r186775375
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---
    @@ -706,6 +689,38 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
     		}
     	}
     
    +	private void tryShutdownTimerService() {
    +		if (timerService != null && !timerService.isTerminated()) {
    +			try {
    +
    +				final long totalTimeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
    +					getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
    +
    +				final Deadline deadline = Deadline.fromNow(Duration.ofMillis(totalTimeoutMs));
    +
    +				boolean timerServiceShutdownComplete = false;
    +
    +				while (!timerServiceShutdownComplete && deadline.hasTimeLeft()) {
    +					try {
    --- End diff --
    
    Shall we at least try to shutdown the time service one time? If so, I think maybe we should use `do {...} while()` here.


---

[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5962#discussion_r188006850
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---
    @@ -504,7 +475,101 @@ public void testShutdownAndWaitPending() {
     			Assert.fail("Unexpected interruption.");
     		}
     
    -		Assert.assertTrue(check.get());
    +		Assert.assertTrue(timerExecutionFinished.get());
    +		Assert.assertTrue(timeService.isTerminated());
    +	}
    +
    +	@Test
    +	public void testShutdownServiceUninterruptible() {
    +		final Object lock = new Object();
    +		final OneShotLatch blockUntilTriggered = new OneShotLatch();
    +		final AtomicBoolean timerFinished = new AtomicBoolean(false);
    +
    +		final SystemProcessingTimeService timeService =
    +			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
    +
    +		Assert.assertFalse(timeService.isTerminated());
    +
    +		final Thread interruptTarget = Thread.currentThread();
    +		final AtomicBoolean runInterrupts = new AtomicBoolean(true);
    +		final Thread interruptCallerThread = new Thread(() -> {
    +			while (runInterrupts.get()) {
    +				interruptTarget.interrupt();
    +				try {
    +					Thread.sleep(10);
    +				} catch (InterruptedException ignore) {
    +				}
    +			}
    +		});
    +
    +		interruptCallerThread.start();
    +
    +		final long timeoutMs = 1000L;
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #5962: [FLINK-9304] Timer service shutdown should not sto...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5962#discussion_r188010236
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---
    @@ -197,6 +204,23 @@ public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws Inte
     		return timerService.awaitTermination(time, timeUnit);
     	}
     
    +	@Override
    +	public boolean shutdownServiceUninterruptible(long timeoutMs) {
    --- End diff --
    
    👍 


---