You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 10:54:12 UTC

flink git commit: [FLINK-9304] Timer service shutdown should not stop if interrupted

Repository: flink
Updated Branches:
  refs/heads/release-1.4 4375be1ad -> eff3628e5


[FLINK-9304] Timer service shutdown should not stop if interrupted

This closes #5962.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eff3628e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eff3628e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eff3628e

Branch: refs/heads/release-1.4
Commit: eff3628e5917298fcf7372d8837877ad888335ec
Parents: 4375be1
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 7 11:55:35 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 10:08:07 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/time/Deadline.java  |  82 ++++++++++++
 .../runtime/tasks/ProcessingTimeService.java    |  11 ++
 .../streaming/runtime/tasks/StreamTask.java     |  44 +++---
 .../tasks/SystemProcessingTimeService.java      |  32 +++++
 .../tasks/TestProcessingTimeService.java        |   6 +
 .../tasks/SystemProcessingTimeServiceTest.java  | 133 ++++++++++++++-----
 6 files changed, 250 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java b/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
new file mode 100644
index 0000000..1c3a82e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.time;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/**
+ * This class stores a deadline, as obtained via {@link #now()} or from {@link #plus(Duration)}.
+ */
+@Internal
+public class Deadline {
+
+	/** The deadline, relative to {@link System#nanoTime()}. */
+	private final long timeNanos;
+
+	private Deadline(long deadline) {
+		this.timeNanos = deadline;
+	}
+
+	public Deadline plus(Duration other) {
+		return new Deadline(Math.addExact(timeNanos, other.toNanos()));
+	}
+
+	/**
+	 * Returns the time left between the deadline and now. The result is negative if the deadline
+	 * has passed.
+	 */
+	public Duration timeLeft() {
+		return Duration.ofNanos(Math.subtractExact(timeNanos, System.nanoTime()));
+	}
+
+	/**
+	 * Returns whether there is any time left between the deadline and now.
+	 */
+	public boolean hasTimeLeft() {
+		return !isOverdue();
+	}
+
+	/**
+	 * Determines whether the deadline is in the past, i.e. whether the time left is negative.
+	 */
+	public boolean isOverdue() {
+		return timeNanos < System.nanoTime();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Creating Deadlines
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Constructs a {@link Deadline} that has now as the deadline. Use this and then extend via
+	 * {@link #plus(Duration)} to specify a deadline in the future.
+	 */
+	public static Deadline now() {
+		return new Deadline(System.nanoTime());
+	}
+
+	/**
+	 * Constructs a Deadline that is a given duration after now.
+	 */
+	public static Deadline fromNow(Duration duration) {
+		return new Deadline(Math.addExact(System.nanoTime(), duration.toNanos()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 2516299..4515ce2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -96,6 +96,17 @@ public abstract class ProcessingTimeService {
 	public abstract void shutdownService();
 
 	/**
+	 * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+	 * for any timer to complete. Any further call to {@link #registerTimer(long, ProcessingTimeCallback)}
+	 * will result in a hard exception. This call cannot be interrupted and will block until the shutdown is completed
+	 * or the timeout is exceeded.
+	 *
+	 * @param timeoutMs timeout for blocking on the service shutdown in milliseconds.
+	 * @return returns true iff the shutdown was completed.
+	 */
+	public abstract boolean shutdownServiceUninterruptible(long timeoutMs);
+
+	/**
 	 * Shuts down and clean up the timer service provider hard and immediately. This does wait
 	 * for all timers to complete or until the time limit is exceeded. Any call to
 	 * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.

http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e18ee18..dcf6ec0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -70,7 +70,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -305,30 +304,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// clean up everything we initialized
 			isRunning = false;
 
-			// clear the interrupted status so that we can wait for the following resource shutdowns to complete
-			Thread.interrupted();
-
 			// stop all timers and threads
-			if (timerService != null && !timerService.isTerminated()) {
-				try {
-
-					final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
-						getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
-
-					// wait for a reasonable time for all pending timer threads to finish
-					boolean timerShutdownComplete =
-						timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
-
-					if (!timerShutdownComplete) {
-						LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
-							"timers. Will continue with shutdown procedure.", timeoutMs);
-					}
-				}
-				catch (Throwable t) {
-					// catch and log the exception to not replace the original exception
-					LOG.error("Could not shut down timer service", t);
-				}
-			}
+			tryShutdownTimerService();
 
 			// stop all asynchronous checkpoint threads
 			try {
@@ -660,6 +637,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
+	private void tryShutdownTimerService() {
+
+		if (timerService != null && !timerService.isTerminated()) {
+
+			try {
+				final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
+					getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
+
+				if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
+					LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
+						"timers. Will continue with shutdown procedure.", timeoutMs);
+				}
+			} catch (Throwable t) {
+				// catch and log the exception to not replace the original exception
+				LOG.error("Could not shut down timer service", t);
+			}
+		}
+	}
+
 	private void checkpointState(
 			CheckpointMetaData checkpointMetaData,
 			CheckpointOptions checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index be8b23c..4e4208f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,10 +18,15 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Delayed;
@@ -41,6 +46,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SystemProcessingTimeService extends ProcessingTimeService {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
+
 	private static final int STATUS_ALIVE = 0;
 	private static final int STATUS_QUIESCED = 1;
 	private static final int STATUS_SHUTDOWN = 2;
@@ -197,6 +204,31 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		return timerService.awaitTermination(time, timeUnit);
 	}
 
+	@Override
+	public boolean shutdownServiceUninterruptible(long timeoutMs) {
+
+		final Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs));
+
+		boolean shutdownComplete = false;
+		boolean receivedInterrupt = false;
+
+		do {
+			try {
+				// wait for a reasonable time for all pending timer threads to finish
+				shutdownComplete = shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			} catch (InterruptedException iex) {
+				receivedInterrupt = true;
+				LOG.trace("Intercepted attempt to interrupt timer service shutdown.", iex);
+			}
+		} while (deadline.hasTimeLeft() && !shutdownComplete);
+
+		if (receivedInterrupt) {
+			Thread.currentThread().interrupt();
+		}
+
+		return shutdownComplete;
+	}
+
 	// safety net to destroy the thread pool
 	@Override
 	protected void finalize() throws Throwable {

http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 2081f19..f4a5f37 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -135,6 +135,12 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 	}
 
 	@Override
+	public boolean shutdownServiceUninterruptible(long timeoutMs) {
+		shutdownService();
+		return true;
+	}
+
+	@Override
 	public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
 		shutdownService();
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/eff3628e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 01fd778..cfcaf72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -449,41 +450,11 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 	public void testShutdownAndWaitPending() {
 
 		final Object lock = new Object();
-		final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
-		final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
 		final OneShotLatch blockUntilTriggered = new OneShotLatch();
-		final AtomicBoolean check = new AtomicBoolean(true);
-
-		final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
-			(message, exception) -> {
-			},
-			lock);
-
-		timeService.scheduleAtFixedRate(
-			timestamp -> {
-
-				waitUntilTimerStarted.trigger();
-
-				try {
-					blockUntilTerminationInterrupts.await();
-					check.set(false);
-				} catch (InterruptedException ignore) {
-				}
-
-				try {
-					blockUntilTriggered.await();
-				} catch (InterruptedException ignore) {
-					check.set(false);
-				}
-			},
-			0L,
-			10L);
+		final AtomicBoolean timerExecutionFinished = new AtomicBoolean(false);
 
-		try {
-			waitUntilTimerStarted.await();
-		} catch (InterruptedException e) {
-			Assert.fail();
-		}
+		final SystemProcessingTimeService timeService =
+			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerExecutionFinished);
 
 		Assert.assertFalse(timeService.isTerminated());
 
@@ -504,7 +475,101 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 			Assert.fail("Unexpected interruption.");
 		}
 
-		Assert.assertTrue(check.get());
+		Assert.assertTrue(timerExecutionFinished.get());
+		Assert.assertTrue(timeService.isTerminated());
+	}
+
+	@Test
+	public void testShutdownServiceUninterruptible() {
+		final Object lock = new Object();
+		final OneShotLatch blockUntilTriggered = new OneShotLatch();
+		final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+		final SystemProcessingTimeService timeService =
+			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
+
+		Assert.assertFalse(timeService.isTerminated());
+
+		final Thread interruptTarget = Thread.currentThread();
+		final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+		final Thread interruptCallerThread = new Thread(() -> {
+			while (runInterrupts.get()) {
+				interruptTarget.interrupt();
+				try {
+					Thread.sleep(1);
+				} catch (InterruptedException ignore) {
+				}
+			}
+		});
+
+		interruptCallerThread.start();
+
+		final long timeoutMs = 50L;
+		final long startTime = System.nanoTime();
+		Assert.assertFalse(timeService.isTerminated());
+		// check that termination did not succeed (because of blocking timer execution)
+		Assert.assertFalse(timeService.shutdownServiceUninterruptible(timeoutMs));
+		// check that termination flag was set.
 		Assert.assertTrue(timeService.isTerminated());
+		// check that the blocked timer is still in flight.
+		Assert.assertFalse(timerFinished.get());
+		// check that we waited until timeout
+		Assert.assertTrue((System.nanoTime() - startTime) >= (1_000_000L * timeoutMs));
+
+		runInterrupts.set(false);
+
+		do {
+			try {
+				interruptCallerThread.join();
+			} catch (InterruptedException ignore) {
+			}
+		} while (interruptCallerThread.isAlive());
+
+		blockUntilTriggered.trigger();
+		Assert.assertTrue(timeService.shutdownServiceUninterruptible(timeoutMs));
+		Assert.assertTrue(timerFinished.get());
+	}
+
+	private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(
+		final Object lock,
+		final OneShotLatch blockUntilTriggered,
+		final AtomicBoolean check) {
+
+		final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+
+		Preconditions.checkState(!check.get());
+
+		final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
+			(message, exception) -> {
+			},
+			lock);
+
+		timeService.scheduleAtFixedRate(
+			timestamp -> {
+
+				waitUntilTimerStarted.trigger();
+
+				boolean unblocked = false;
+
+				while (!unblocked) {
+					try {
+						blockUntilTriggered.await();
+						unblocked = true;
+					} catch (InterruptedException ignore) {
+					}
+				}
+
+				check.set(true);
+			},
+			0L,
+			10L);
+
+		try {
+			waitUntilTimerStarted.await();
+		} catch (InterruptedException e) {
+			Assert.fail("Problem while starting up service.");
+		}
+
+		return timeService;
 	}
 }