You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/11/23 14:24:20 UTC

flink git commit: [FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown.

Repository: flink
Updated Branches:
  refs/heads/master fda2c9ff6 -> d86c6b6bb


[FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown.

This closes #5058.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d86c6b6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d86c6b6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d86c6b6b

Branch: refs/heads/master
Commit: d86c6b6bb32adee9d4b5c9098340a34e8a8a7f1d
Parents: fda2c9f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Nov 22 17:52:35 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Nov 23 15:23:43 2017 +0100

----------------------------------------------------------------------
 .../configuration/TimerServiceOptions.java      | 38 ++++++++++++
 .../runtime/tasks/ProcessingTimeService.java    | 12 ++++
 .../streaming/runtime/tasks/StreamTask.java     | 18 +++++-
 .../tasks/SystemProcessingTimeService.java      |  6 ++
 .../tasks/TestProcessingTimeService.java        |  6 ++
 .../tasks/SystemProcessingTimeServiceTest.java  | 65 ++++++++++++++++++++
 6 files changed, 142 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
new file mode 100644
index 0000000..835adce
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Timer service configuration options.
+ */
+@PublicEvolving
+public class TimerServiceOptions {
+
+	/**
+	 * This configures how long we wait for the {@link org.apache.flink.streaming.runtime.tasks.ProcessingTimeService}
+	 * to finish all pending timer threads when the stream task performs a failover shutdown. See FLINK-5465.
+	 */
+	public static final ConfigOption<Long> TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions
+		.key("timerservice.exceptional.shutdown.timeout")
+		.defaultValue(7500L);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index b238252..2516299 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Defines the current processing time and handles all related actions,
@@ -93,4 +94,15 @@ public abstract class ProcessingTimeService {
 	 * will result in a hard exception.
 	 */
 	public abstract void shutdownService();
+
+	/**
+	 * Shuts down and clean up the timer service provider hard and immediately. This does wait
+	 * for all timers to complete or until the time limit is exceeded. Any call to
+	 * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.
+	 * @param time time to wait for termination.
+	 * @param timeUnit time unit of parameter time.
+	 * @return {@code true} if this timer service and all pending timers are terminated and
+	 *         {@code false} if the timeout elapsed before this happened.
+	 */
+	public abstract boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 36e6748..eff8a29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.configuration.TimerServiceOptions;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
@@ -69,6 +70,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -218,7 +220,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			LOG.debug("Initializing {}.", getName());
 
 			asyncOperationsThreadPool = Executors.newCachedThreadPool();
-
 			configuration = new StreamConfig(getTaskConfiguration());
 
 			CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
@@ -319,9 +320,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			isRunning = false;
 
 			// stop all timers and threads
-			if (timerService != null) {
+			if (timerService != null && !timerService.isTerminated()) {
 				try {
-					timerService.shutdownService();
+
+					final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
+						getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
+
+					// wait for a reasonable time for all pending timer threads to finish
+					boolean timerShutdownComplete =
+						timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
+
+					if (!timerShutdownComplete) {
+						LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
+							"timers. Will continue with shutdown procedure.", timeoutMs);
+					}
 				}
 				catch (Throwable t) {
 					// catch and log the exception to not replace the original exception

http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 71bfdf6..be8b23c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -191,6 +191,12 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		}
 	}
 
+	@Override
+	public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
+		shutdownService();
+		return timerService.awaitTermination(time, timeUnit);
+	}
+
 	// safety net to destroy the thread pool
 	@Override
 	protected void finalize() throws Throwable {

http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 080eeb5..2081f19 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -134,6 +134,12 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 		this.isTerminated = true;
 	}
 
+	@Override
+	public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
+		shutdownService();
+		return true;
+	}
+
 	public int getNumActiveTimers() {
 		int count = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 4c105d3..01fd778 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -22,11 +22,13 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -442,4 +444,67 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 		latch.await();
 		assertTrue(exceptionWasThrown.get());
 	}
+
+	@Test
+	public void testShutdownAndWaitPending() {
+
+		final Object lock = new Object();
+		final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+		final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
+		final OneShotLatch blockUntilTriggered = new OneShotLatch();
+		final AtomicBoolean check = new AtomicBoolean(true);
+
+		final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
+			(message, exception) -> {
+			},
+			lock);
+
+		timeService.scheduleAtFixedRate(
+			timestamp -> {
+
+				waitUntilTimerStarted.trigger();
+
+				try {
+					blockUntilTerminationInterrupts.await();
+					check.set(false);
+				} catch (InterruptedException ignore) {
+				}
+
+				try {
+					blockUntilTriggered.await();
+				} catch (InterruptedException ignore) {
+					check.set(false);
+				}
+			},
+			0L,
+			10L);
+
+		try {
+			waitUntilTimerStarted.await();
+		} catch (InterruptedException e) {
+			Assert.fail();
+		}
+
+		Assert.assertFalse(timeService.isTerminated());
+
+		// Check that we wait for the timer to terminate. As the timer blocks on the second latch, this should time out.
+		try {
+			Assert.assertFalse(timeService.shutdownAndAwaitPending(1, TimeUnit.SECONDS));
+		} catch (InterruptedException e) {
+			Assert.fail("Unexpected interruption.");
+		}
+
+		// Let the timer proceed.
+		blockUntilTriggered.trigger();
+
+		// Now we should succeed in terminating the timer.
+		try {
+			Assert.assertTrue(timeService.shutdownAndAwaitPending(60, TimeUnit.SECONDS));
+		} catch (InterruptedException e) {
+			Assert.fail("Unexpected interruption.");
+		}
+
+		Assert.assertTrue(check.get());
+		Assert.assertTrue(timeService.isTerminated());
+	}
 }