You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/11/23 14:24:20 UTC
flink git commit: [FLINK-5465] [streaming] Wait for pending timer
threads to finish or to exceed a time limit in exceptional stream task
shutdown.
Repository: flink
Updated Branches:
refs/heads/master fda2c9ff6 -> d86c6b6bb
[FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown.
This closes #5058.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d86c6b6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d86c6b6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d86c6b6b
Branch: refs/heads/master
Commit: d86c6b6bb32adee9d4b5c9098340a34e8a8a7f1d
Parents: fda2c9f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Nov 22 17:52:35 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Nov 23 15:23:43 2017 +0100
----------------------------------------------------------------------
.../configuration/TimerServiceOptions.java | 38 ++++++++++++
.../runtime/tasks/ProcessingTimeService.java | 12 ++++
.../streaming/runtime/tasks/StreamTask.java | 18 +++++-
.../tasks/SystemProcessingTimeService.java | 6 ++
.../tasks/TestProcessingTimeService.java | 6 ++
.../tasks/SystemProcessingTimeServiceTest.java | 65 ++++++++++++++++++++
6 files changed, 142 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
new file mode 100644
index 0000000..835adce
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Timer service configuration options.
+ */
+@PublicEvolving
+public class TimerServiceOptions {
+
+ /**
+ * This configures how long we wait for the {@link org.apache.flink.streaming.runtime.tasks.ProcessingTimeService}
+ * to finish all pending timer threads when the stream task performs a failover shutdown. See FLINK-5465.
+ */
+ public static final ConfigOption<Long> TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions
+ .key("timerservice.exceptional.shutdown.timeout")
+ .defaultValue(7500L);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index b238252..2516299 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
/**
* Defines the current processing time and handles all related actions,
@@ -93,4 +94,15 @@ public abstract class ProcessingTimeService {
* will result in a hard exception.
*/
public abstract void shutdownService();
+
+ /**
+ * Shuts down and clean up the timer service provider hard and immediately. This does wait
+ * for all timers to complete or until the time limit is exceeded. Any call to
+ * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.
+ * @param time time to wait for termination.
+ * @param timeUnit time unit of parameter time.
+ * @return {@code true} if this timer service and all pending timers are terminated and
+ * {@code false} if the timeout elapsed before this happened.
+ */
+ public abstract boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 36e6748..eff8a29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.configuration.TimerServiceOptions;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
@@ -69,6 +70,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -218,7 +220,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
LOG.debug("Initializing {}.", getName());
asyncOperationsThreadPool = Executors.newCachedThreadPool();
-
configuration = new StreamConfig(getTaskConfiguration());
CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
@@ -319,9 +320,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
isRunning = false;
// stop all timers and threads
- if (timerService != null) {
+ if (timerService != null && !timerService.isTerminated()) {
try {
- timerService.shutdownService();
+
+ final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
+ getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
+
+ // wait for a reasonable time for all pending timer threads to finish
+ boolean timerShutdownComplete =
+ timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
+
+ if (!timerShutdownComplete) {
+ LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
+ "timers. Will continue with shutdown procedure.", timeoutMs);
+ }
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 71bfdf6..be8b23c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -191,6 +191,12 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
}
}
+ @Override
+ public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
+ shutdownService();
+ return timerService.awaitTermination(time, timeUnit);
+ }
+
// safety net to destroy the thread pool
@Override
protected void finalize() throws Throwable {
http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 080eeb5..2081f19 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -134,6 +134,12 @@ public class TestProcessingTimeService extends ProcessingTimeService {
this.isTerminated = true;
}
+ @Override
+ public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
+ shutdownService();
+ return true;
+ }
+
public int getNumActiveTimers() {
int count = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 4c105d3..01fd778 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -22,11 +22,13 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -442,4 +444,67 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
latch.await();
assertTrue(exceptionWasThrown.get());
}
+
+ @Test
+ public void testShutdownAndWaitPending() {
+
+ final Object lock = new Object();
+ final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+ final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
+ final OneShotLatch blockUntilTriggered = new OneShotLatch();
+ final AtomicBoolean check = new AtomicBoolean(true);
+
+ final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
+ (message, exception) -> {
+ },
+ lock);
+
+ timeService.scheduleAtFixedRate(
+ timestamp -> {
+
+ waitUntilTimerStarted.trigger();
+
+ try {
+ blockUntilTerminationInterrupts.await();
+ check.set(false);
+ } catch (InterruptedException ignore) {
+ }
+
+ try {
+ blockUntilTriggered.await();
+ } catch (InterruptedException ignore) {
+ check.set(false);
+ }
+ },
+ 0L,
+ 10L);
+
+ try {
+ waitUntilTimerStarted.await();
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+
+ Assert.assertFalse(timeService.isTerminated());
+
+ // Check that we wait for the timer to terminate. As the timer blocks on the second latch, this should time out.
+ try {
+ Assert.assertFalse(timeService.shutdownAndAwaitPending(1, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Assert.fail("Unexpected interruption.");
+ }
+
+ // Let the timer proceed.
+ blockUntilTriggered.trigger();
+
+ // Now we should succeed in terminating the timer.
+ try {
+ Assert.assertTrue(timeService.shutdownAndAwaitPending(60, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Assert.fail("Unexpected interruption.");
+ }
+
+ Assert.assertTrue(check.get());
+ Assert.assertTrue(timeService.isTerminated());
+ }
}