You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/07 18:23:18 UTC
flink git commit: [FLINK-5028] [streaming] StreamTask skips clean
shutdown logic upon cancellation
Repository: flink
Updated Branches:
refs/heads/master 891950eab -> 1a578657d
[FLINK-5028] [streaming] StreamTask skips clean shutdown logic upon cancellation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a578657
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a578657
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a578657
Branch: refs/heads/master
Commit: 1a578657d078dfb2d26a6f6e60876271d6f4c2ff
Parents: 891950e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 7 15:47:03 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 7 17:04:18 2016 +0100
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 8 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 132 ++++++++++++++++---
2 files changed, 124 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fa7d1b0..c75458e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -240,7 +240,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// task specific initialization
init();
- // save the work of reloadig state, etc, if the task is already canceled
+ // save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
@@ -266,6 +266,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
isRunning = true;
run();
+ // if this left the run() method cleanly despite the fact that this was canceled,
+ // make sure the "clean shutdown" is not attempted
+ if (canceled) {
+ throw new CancelTaskException();
+ }
+
// make sure all timers finish and no new timers can come
timerService.quiesceAndAwaitPending();
http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 94f6d5a..603bdd2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -56,6 +57,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -89,6 +91,8 @@ import static org.mockito.Mockito.when;
public class StreamTaskTest {
+ private static OneShotLatch SYNC_LATCH;
+
/**
* This test checks that cancel calls that are issued before the operator is
* instantiated still lead to proper canceling.
@@ -154,6 +158,25 @@ public class StreamTaskTest {
assertEquals(ExecutionState.FINISHED, task.getExecutionState());
}
+ @Test
+ public void testCancellationNotBlockedOnLock() throws Exception {
+ SYNC_LATCH = new OneShotLatch();
+
+ StreamConfig cfg = new StreamConfig(new Configuration());
+ Task task = createTask(CancelLockingTask.class, cfg, new Configuration());
+
+ // start the task and wait until it runs
+ // execution state RUNNING is not enough, we need to wait until the stream task's run() method
+ // is entered
+ task.startTaskThread();
+ SYNC_LATCH.await();
+
+ // cancel the execution - this should lead to smooth shutdown
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ }
// ------------------------------------------------------------------------
// Test Utilities
@@ -233,21 +256,21 @@ public class StreamTaskTest {
0);
return new Task(
- tdd,
- mock(MemoryManager.class),
- mock(IOManager.class),
- network,
- mock(BroadcastVariableManager.class),
- mock(TaskManagerConnection.class),
- mock(InputSplitProvider.class),
- mock(CheckpointResponder.class),
- libCache,
- mock(FileCache.class),
- new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
- new UnregisteredTaskMetricsGroup(),
- consumableNotifier,
- partitionStateChecker,
- executor);
+ tdd,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ network,
+ mock(BroadcastVariableManager.class),
+ mock(TaskManagerConnection.class),
+ mock(InputSplitProvider.class),
+ mock(CheckpointResponder.class),
+ libCache,
+ mock(FileCache.class),
+ new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
+ new UnregisteredTaskMetricsGroup(),
+ consumableNotifier,
+ partitionStateChecker,
+ executor);
}
// ------------------------------------------------------------------------
@@ -309,4 +332,83 @@ public class StreamTaskTest {
return mock(AbstractStateBackend.class);
}
}
+
+ // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+
+ /**
+ * A task that locks if cancellation attempts to cleanly shut down
+ */
+ public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+ private final OneShotLatch latch = new OneShotLatch();
+
+ private LockHolder holder;
+
+ @Override
+ protected void init() {}
+
+ @Override
+ protected void run() throws Exception {
+ holder = new LockHolder(getCheckpointLock(), latch);
+ holder.start();
+ latch.await();
+
+ // we are at the point where cancelling can happen
+ SYNC_LATCH.trigger();
+
+ // just put this to sleep until it is interrupted
+ try {
+ Thread.sleep(100000000);
+ } catch (InterruptedException ignored) {
+ // restore interruption state
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ holder.cancel();
+ holder.interrupt();
+ }
+
+ @Override
+ protected void cancelTask() {
+ holder.cancel();
+ // do not interrupt the lock holder here, to simulate spawned threads that
+ // we cannot properly interrupt on cancellation
+ }
+
+
+ private static final class LockHolder extends Thread {
+
+ private final OneShotLatch trigger;
+ private final Object lock;
+ private volatile boolean canceled;
+
+ private LockHolder(Object lock, OneShotLatch trigger) {
+ this.lock = lock;
+ this.trigger = trigger;
+ }
+
+ @Override
+ public void run() {
+ synchronized (lock) {
+ while (!canceled) {
+ // signal that we grabbed the lock
+ trigger.trigger();
+
+ // basically freeze this thread
+ try {
+ Thread.sleep(1000000000);
+ } catch (InterruptedException ignored) {}
+ }
+ }
+ }
+
+ public void cancel() {
+ canceled = true;
+ }
+ }
+ }
}