You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/07 18:23:18 UTC

flink git commit: [FLINK-5028] [streaming] StreamTask skips clean shutdown logic upon cancellation

Repository: flink
Updated Branches:
  refs/heads/master 891950eab -> 1a578657d


[FLINK-5028] [streaming] StreamTask skips clean shutdown logic upon cancellation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a578657
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a578657
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a578657

Branch: refs/heads/master
Commit: 1a578657d078dfb2d26a6f6e60876271d6f4c2ff
Parents: 891950e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 7 15:47:03 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 7 17:04:18 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |   8 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 132 ++++++++++++++++---
 2 files changed, 124 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index fa7d1b0..c75458e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -240,7 +240,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// task specific initialization
 			init();
 
-			// save the work of reloadig state, etc, if the task is already canceled
+			// save the work of reloading state, etc, if the task is already canceled
 			if (canceled) {
 				throw new CancelTaskException();
 			}
@@ -266,6 +266,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			isRunning = true;
 			run();
 
+			// if this left the run() method cleanly despite the fact that this was canceled,
+			// make sure the "clean shutdown" is not attempted
+			if (canceled) {
+				throw new CancelTaskException();
+			}
+
 			// make sure all timers finish and no new timers can come
 			timerService.quiesceAndAwaitPending();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a578657/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 94f6d5a..603bdd2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -56,6 +57,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -89,6 +91,8 @@ import static org.mockito.Mockito.when;
 
 public class StreamTaskTest {
 
+	private static OneShotLatch SYNC_LATCH;
+
 	/**
 	 * This test checks that cancel calls that are issued before the operator is
 	 * instantiated still lead to proper canceling.
@@ -154,6 +158,25 @@ public class StreamTaskTest {
 		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
 	}
 
+	@Test
+	public void testCancellationNotBlockedOnLock() throws Exception {
+		SYNC_LATCH = new OneShotLatch();
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		Task task = createTask(CancelLockingTask.class, cfg, new Configuration());
+
+		// start the task and wait until it runs
+		// execution state RUNNING is not enough, we need to wait until the stream task's run() method
+		// is entered
+		task.startTaskThread();
+		SYNC_LATCH.await();
+
+		// cancel the execution - this should lead to smooth shutdown
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+	}
 
 	// ------------------------------------------------------------------------
 	//  Test Utilities
@@ -233,21 +256,21 @@ public class StreamTaskTest {
 				0);
 
 		return new Task(
-			tdd,
-			mock(MemoryManager.class),
-			mock(IOManager.class),
-			network,
-			mock(BroadcastVariableManager.class),
-			mock(TaskManagerConnection.class),
-			mock(InputSplitProvider.class),
-			mock(CheckpointResponder.class),
-			libCache,
-			mock(FileCache.class),
-			new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
-			new UnregisteredTaskMetricsGroup(),
-			consumableNotifier,
-			partitionStateChecker,
-			executor);
+				tdd,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				network,
+				mock(BroadcastVariableManager.class),
+				mock(TaskManagerConnection.class),
+				mock(InputSplitProvider.class),
+				mock(CheckpointResponder.class),
+				libCache,
+				mock(FileCache.class),
+				new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
+				new UnregisteredTaskMetricsGroup(),
+				consumableNotifier,
+				partitionStateChecker,
+				executor);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -309,4 +332,83 @@ public class StreamTaskTest {
 			return mock(AbstractStateBackend.class);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A task that locks if cancellation attempts to cleanly shut down 
+	 */
+	public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+		private final OneShotLatch latch = new OneShotLatch();
+
+		private LockHolder holder;
+
+		@Override
+		protected void init() {}
+
+		@Override
+		protected void run() throws Exception {
+			holder = new LockHolder(getCheckpointLock(), latch);
+			holder.start();
+			latch.await();
+
+			// we are at the point where cancelling can happen
+			SYNC_LATCH.trigger();
+
+			// just put this to sleep until it is interrupted
+			try {
+				Thread.sleep(100000000);
+			} catch (InterruptedException ignored) {
+				// restore interruption state
+				Thread.currentThread().interrupt();
+			}
+		}
+
+		@Override
+		protected void cleanup() {
+			holder.cancel();
+			holder.interrupt();
+		}
+
+		@Override
+		protected void cancelTask() {
+			holder.cancel();
+			// do not interrupt the lock holder here, to simulate spawned threads that
+			// we cannot properly interrupt on cancellation
+		}
+
+
+		private static final class LockHolder extends Thread {
+
+			private final OneShotLatch trigger;
+			private final Object lock;
+			private volatile boolean canceled;
+
+			private LockHolder(Object lock, OneShotLatch trigger) {
+				this.lock = lock;
+				this.trigger = trigger;
+			}
+
+			@Override
+			public void run() {
+				synchronized (lock) {
+					while (!canceled) {
+						// signal that we grabbed the lock
+						trigger.trigger();
+
+						// basically freeze this thread
+						try {
+							Thread.sleep(1000000000);
+						} catch (InterruptedException ignored) {}
+					}
+				}
+			}
+
+			public void cancel() {
+				canceled = true;
+			}
+		}
+	}
 }