You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 10:54:12 UTC
flink git commit: [FLINK-9304] Timer service shutdown should not stop
if interrupted
Repository: flink
Updated Branches:
refs/heads/release-1.4 4375be1ad -> eff3628e5
[FLINK-9304] Timer service shutdown should not stop if interrupted
This closes #5962.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eff3628e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eff3628e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eff3628e
Branch: refs/heads/release-1.4
Commit: eff3628e5917298fcf7372d8837877ad888335ec
Parents: 4375be1
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 7 11:55:35 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 10:08:07 2018 +0200
----------------------------------------------------------------------
.../apache/flink/api/common/time/Deadline.java | 82 ++++++++++++
.../runtime/tasks/ProcessingTimeService.java | 11 ++
.../streaming/runtime/tasks/StreamTask.java | 44 +++---
.../tasks/SystemProcessingTimeService.java | 32 +++++
.../tasks/TestProcessingTimeService.java | 6 +
.../tasks/SystemProcessingTimeServiceTest.java | 133 ++++++++++++++-----
6 files changed, 250 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java b/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
new file mode 100644
index 0000000..1c3a82e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.time;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+
+ /** The deadline, relative to {@link System#nanoTime()}. */
+ private final long timeNanos;
+
+ private Deadline(long deadline) {
+ this.timeNanos = deadline;
+ }
+
+ public Deadline plus(Duration other) {
+ return new Deadline(Math.addExact(timeNanos, other.toNanos()));
+ }
+
+ /**
+ * Returns the time left between the deadline and now. The result is negative if the deadline
+ * has passed.
+ */
+ public Duration timeLeft() {
+ return Duration.ofNanos(Math.subtractExact(timeNanos, System.nanoTime()));
+ }
+
+ /**
+ * Returns whether there is any time left between the deadline and now.
+ */
+ public boolean hasTimeLeft() {
+ return !isOverdue();
+ }
+
+ /**
+ * Determines whether the deadline is in the past, i.e. whether the time left is negative.
+ */
+ public boolean isOverdue() {
+ return timeNanos < System.nanoTime();
+ }
+
+ // ------------------------------------------------------------------------
+ // Creating Deadlines
+ // ------------------------------------------------------------------------
+
+ /**
+ * Constructs a {@link Deadline} that has now as the deadline. Use this and then extend via
+ * {@link #plus(Duration)} to specify a deadline in the future.
+ */
+ public static Deadline now() {
+ return new Deadline(System.nanoTime());
+ }
+
+ /**
+ * Constructs a Deadline that is a given duration after now.
+ */
+ public static Deadline fromNow(Duration duration) {
+ return new Deadline(Math.addExact(System.nanoTime(), duration.toNanos()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 2516299..4515ce2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -96,6 +96,17 @@ public abstract class ProcessingTimeService {
public abstract void shutdownService();
/**
+ * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+ * for any timer to complete. Any further call to {@link #registerTimer(long, ProcessingTimeCallback)}
+ * will result in a hard exception. This call cannot be interrupted and will block until the shutdown is completed
+ * or the timeout is exceeded.
+ *
+ * @param timeoutMs timeout for blocking on the service shutdown in milliseconds.
+ * @return returns true iff the shutdown was completed.
+ */
+ public abstract boolean shutdownServiceUninterruptible(long timeoutMs);
+
+ /**
* Shuts down and clean up the timer service provider hard and immediately. This does wait
* for all timers to complete or until the time limit is exceeded. Any call to
* {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.
http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e18ee18..dcf6ec0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -70,7 +70,6 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -305,30 +304,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// clean up everything we initialized
isRunning = false;
- // clear the interrupted status so that we can wait for the following resource shutdowns to complete
- Thread.interrupted();
-
// stop all timers and threads
- if (timerService != null && !timerService.isTerminated()) {
- try {
-
- final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
- getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
-
- // wait for a reasonable time for all pending timer threads to finish
- boolean timerShutdownComplete =
- timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
-
- if (!timerShutdownComplete) {
- LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
- "timers. Will continue with shutdown procedure.", timeoutMs);
- }
- }
- catch (Throwable t) {
- // catch and log the exception to not replace the original exception
- LOG.error("Could not shut down timer service", t);
- }
- }
+ tryShutdownTimerService();
// stop all asynchronous checkpoint threads
try {
@@ -660,6 +637,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
+ private void tryShutdownTimerService() {
+
+ if (timerService != null && !timerService.isTerminated()) {
+
+ try {
+ final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
+ getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
+
+ if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
+ LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
+ "timers. Will continue with shutdown procedure.", timeoutMs);
+ }
+ } catch (Throwable t) {
+ // catch and log the exception to not replace the original exception
+ LOG.error("Could not shut down timer service", t);
+ }
+ }
+ }
+
private void checkpointState(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index be8b23c..4e4208f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,10 +18,15 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nonnull;
+import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Delayed;
@@ -41,6 +46,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class SystemProcessingTimeService extends ProcessingTimeService {
+ private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
+
private static final int STATUS_ALIVE = 0;
private static final int STATUS_QUIESCED = 1;
private static final int STATUS_SHUTDOWN = 2;
@@ -197,6 +204,31 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
return timerService.awaitTermination(time, timeUnit);
}
+ @Override
+ public boolean shutdownServiceUninterruptible(long timeoutMs) {
+
+ final Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs));
+
+ boolean shutdownComplete = false;
+ boolean receivedInterrupt = false;
+
+ do {
+ try {
+ // wait for a reasonable time for all pending timer threads to finish
+ shutdownComplete = shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException iex) {
+ receivedInterrupt = true;
+ LOG.trace("Intercepted attempt to interrupt timer service shutdown.", iex);
+ }
+ } while (deadline.hasTimeLeft() && !shutdownComplete);
+
+ if (receivedInterrupt) {
+ Thread.currentThread().interrupt();
+ }
+
+ return shutdownComplete;
+ }
+
// safety net to destroy the thread pool
@Override
protected void finalize() throws Throwable {
http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 2081f19..f4a5f37 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -135,6 +135,12 @@ public class TestProcessingTimeService extends ProcessingTimeService {
}
@Override
+ public boolean shutdownServiceUninterruptible(long timeoutMs) {
+ shutdownService();
+ return true;
+ }
+
+ @Override
public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
shutdownService();
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 01fd778..cfcaf72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -449,41 +450,11 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
public void testShutdownAndWaitPending() {
final Object lock = new Object();
- final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
- final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
final OneShotLatch blockUntilTriggered = new OneShotLatch();
- final AtomicBoolean check = new AtomicBoolean(true);
-
- final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
- (message, exception) -> {
- },
- lock);
-
- timeService.scheduleAtFixedRate(
- timestamp -> {
-
- waitUntilTimerStarted.trigger();
-
- try {
- blockUntilTerminationInterrupts.await();
- check.set(false);
- } catch (InterruptedException ignore) {
- }
-
- try {
- blockUntilTriggered.await();
- } catch (InterruptedException ignore) {
- check.set(false);
- }
- },
- 0L,
- 10L);
+ final AtomicBoolean timerExecutionFinished = new AtomicBoolean(false);
- try {
- waitUntilTimerStarted.await();
- } catch (InterruptedException e) {
- Assert.fail();
- }
+ final SystemProcessingTimeService timeService =
+ createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerExecutionFinished);
Assert.assertFalse(timeService.isTerminated());
@@ -504,7 +475,101 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
Assert.fail("Unexpected interruption.");
}
- Assert.assertTrue(check.get());
+ Assert.assertTrue(timerExecutionFinished.get());
+ Assert.assertTrue(timeService.isTerminated());
+ }
+
+ @Test
+ public void testShutdownServiceUninterruptible() {
+ final Object lock = new Object();
+ final OneShotLatch blockUntilTriggered = new OneShotLatch();
+ final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+ final SystemProcessingTimeService timeService =
+ createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
+
+ Assert.assertFalse(timeService.isTerminated());
+
+ final Thread interruptTarget = Thread.currentThread();
+ final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+ final Thread interruptCallerThread = new Thread(() -> {
+ while (runInterrupts.get()) {
+ interruptTarget.interrupt();
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ });
+
+ interruptCallerThread.start();
+
+ final long timeoutMs = 50L;
+ final long startTime = System.nanoTime();
+ Assert.assertFalse(timeService.isTerminated());
+ // check that termination did not succeed (because of blocking timer execution)
+ Assert.assertFalse(timeService.shutdownServiceUninterruptible(timeoutMs));
+ // check that termination flag was set.
Assert.assertTrue(timeService.isTerminated());
+ // check that the blocked timer is still in flight.
+ Assert.assertFalse(timerFinished.get());
+ // check that we waited until timeout
+ Assert.assertTrue((System.nanoTime() - startTime) >= (1_000_000L * timeoutMs));
+
+ runInterrupts.set(false);
+
+ do {
+ try {
+ interruptCallerThread.join();
+ } catch (InterruptedException ignore) {
+ }
+ } while (interruptCallerThread.isAlive());
+
+ blockUntilTriggered.trigger();
+ Assert.assertTrue(timeService.shutdownServiceUninterruptible(timeoutMs));
+ Assert.assertTrue(timerFinished.get());
+ }
+
+ private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(
+ final Object lock,
+ final OneShotLatch blockUntilTriggered,
+ final AtomicBoolean check) {
+
+ final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+
+ Preconditions.checkState(!check.get());
+
+ final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
+ (message, exception) -> {
+ },
+ lock);
+
+ timeService.scheduleAtFixedRate(
+ timestamp -> {
+
+ waitUntilTimerStarted.trigger();
+
+ boolean unblocked = false;
+
+ while (!unblocked) {
+ try {
+ blockUntilTriggered.await();
+ unblocked = true;
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ check.set(true);
+ },
+ 0L,
+ 10L);
+
+ try {
+ waitUntilTimerStarted.await();
+ } catch (InterruptedException e) {
+ Assert.fail("Problem while starting up service.");
+ }
+
+ return timeService;
}
}