You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/09 20:28:24 UTC

flink git commit: [FLINK-5038] [streaming runtime] Make sure Canceleables are canceled even them "cancelTask" throws an exception

Repository: flink
Updated Branches:
  refs/heads/master ddba618d9 -> 616c4f5e4


[FLINK-5038] [streaming runtime] Make sure Canceleables are canceled even them "cancelTask" throws an exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/616c4f5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/616c4f5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/616c4f5e

Branch: refs/heads/master
Commit: 616c4f5e483f0fd81ce2db05e911f01f15a0b583
Parents: ddba618
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 13:09:37 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 9 17:04:35 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  11 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 127 +++++++++++++++----
 2 files changed, 113 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/616c4f5e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 83d72e7..4aaad71 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -351,8 +351,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	public final void cancel() throws Exception {
 		isRunning = false;
 		canceled = true;
-		cancelTask();
-		cancelables.close();
+
+		// the "cancel task" call must come first, but the cancelables must be
+		// closed no matter what
+		try {
+			cancelTask();
+		}
+		finally {
+			cancelables.close();
+		}
 	}
 
 	public final boolean isRunning() {

http://git-wip-us.apache.org/repos/asf/flink/blob/616c4f5e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 603bdd2..73316eb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -72,6 +72,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.net.URL;
@@ -178,6 +179,26 @@ public class StreamTaskTest {
 		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
 	}
 
+	@Test
+	public void testCancellationFailsWithBlockingLock() throws Exception {
+		SYNC_LATCH = new OneShotLatch();
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		Task task = createTask(CancelFailingTask.class, cfg, new Configuration());
+
+		// start the task and wait until it runs
+		// execution state RUNNING is not enough, we need to wait until the stream task's run() method
+		// is entered
+		task.startTaskThread();
+		SYNC_LATCH.await();
+
+		// cancel the execution - this should lead to smooth shutdown
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------
@@ -368,8 +389,7 @@ public class StreamTaskTest {
 
 		@Override
 		protected void cleanup() {
-			holder.cancel();
-			holder.interrupt();
+			holder.close();
 		}
 
 		@Override
@@ -378,37 +398,98 @@ public class StreamTaskTest {
 			// do not interrupt the lock holder here, to simulate spawned threads that
 			// we cannot properly interrupt on cancellation
 		}
+		
+	}
 
+	/**
+	 * A task that locks if cancellation attempts to cleanly shut down 
+	 */
+	public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> {
 
-		private static final class LockHolder extends Thread {
+		@Override
+		protected void init() {}
 
-			private final OneShotLatch trigger;
-			private final Object lock;
-			private volatile boolean canceled;
+		@Override
+		protected void run() throws Exception {
+			final OneShotLatch latch = new OneShotLatch();
+			final Object lock = new Object();
 
-			private LockHolder(Object lock, OneShotLatch trigger) {
-				this.lock = lock;
-				this.trigger = trigger;
-			}
+			LockHolder holder = new LockHolder(lock, latch);
+			holder.start();
+			try {
+				// cancellation should try and cancel this
+				getCancelables().registerClosable(holder);
 
-			@Override
-			public void run() {
+				// wait till the lock holder has the lock
+				latch.await();
+
+				// we are at the point where cancelling can happen
+				SYNC_LATCH.trigger();
+	
+				// try to acquire the lock - this is not possible as long as the lock holder
+				// thread lives
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
 				synchronized (lock) {
-					while (!canceled) {
-						// signal that we grabbed the lock
-						trigger.trigger();
-
-						// basically freeze this thread
-						try {
-							Thread.sleep(1000000000);
-						} catch (InterruptedException ignored) {}
-					}
+					// nothing
 				}
 			}
+			finally {
+				holder.close();
+			}
+
+		}
+
+		@Override
+		protected void cleanup() {}
+
+		@Override
+		protected void cancelTask() throws Exception {
+			throw new Exception("test exception");
+		}
+
+	}
 
-			public void cancel() {
-				canceled = true;
+	// ------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A thread that holds a lock as long as it lives
+	 */
+	private static final class LockHolder extends Thread implements Closeable {
+
+		private final OneShotLatch trigger;
+		private final Object lock;
+		private volatile boolean canceled;
+
+		private LockHolder(Object lock, OneShotLatch trigger) {
+			this.lock = lock;
+			this.trigger = trigger;
+		}
+
+		@Override
+		public void run() {
+			synchronized (lock) {
+				while (!canceled) {
+					// signal that we grabbed the lock
+					trigger.trigger();
+
+					// basically freeze this thread
+					try {
+						//noinspection SleepWhileHoldingLock
+						Thread.sleep(1000000000);
+					} catch (InterruptedException ignored) {}
+				}
 			}
 		}
+
+		public void cancel() {
+			canceled = true;
+		}
+
+		@Override
+		public void close() {
+			canceled = true;
+			interrupt();
+		}
 	}
 }