You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/09 20:28:24 UTC
flink git commit: [FLINK-5038] [streaming runtime] Make sure
Canceleables are canceled even them "cancelTask" throws an exception
Repository: flink
Updated Branches:
refs/heads/master ddba618d9 -> 616c4f5e4
[FLINK-5038] [streaming runtime] Make sure Canceleables are canceled even them "cancelTask" throws an exception
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/616c4f5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/616c4f5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/616c4f5e
Branch: refs/heads/master
Commit: 616c4f5e483f0fd81ce2db05e911f01f15a0b583
Parents: ddba618
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 13:09:37 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 9 17:04:35 2016 +0100
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 11 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 127 +++++++++++++++----
2 files changed, 113 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/616c4f5e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 83d72e7..4aaad71 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -351,8 +351,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
public final void cancel() throws Exception {
isRunning = false;
canceled = true;
- cancelTask();
- cancelables.close();
+
+ // the "cancel task" call must come first, but the cancelables must be
+ // closed no matter what
+ try {
+ cancelTask();
+ }
+ finally {
+ cancelables.close();
+ }
}
public final boolean isRunning() {
http://git-wip-us.apache.org/repos/asf/flink/blob/616c4f5e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 603bdd2..73316eb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -72,6 +72,7 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
+import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URL;
@@ -178,6 +179,26 @@ public class StreamTaskTest {
assertEquals(ExecutionState.CANCELED, task.getExecutionState());
}
+ @Test
+ public void testCancellationFailsWithBlockingLock() throws Exception {
+ SYNC_LATCH = new OneShotLatch();
+
+ StreamConfig cfg = new StreamConfig(new Configuration());
+ Task task = createTask(CancelFailingTask.class, cfg, new Configuration());
+
+ // start the task and wait until it runs
+ // execution state RUNNING is not enough, we need to wait until the stream task's run() method
+ // is entered
+ task.startTaskThread();
+ SYNC_LATCH.await();
+
+ // cancel the execution - this should lead to smooth shutdown
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ }
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
@@ -368,8 +389,7 @@ public class StreamTaskTest {
@Override
protected void cleanup() {
- holder.cancel();
- holder.interrupt();
+ holder.close();
}
@Override
@@ -378,37 +398,98 @@ public class StreamTaskTest {
// do not interrupt the lock holder here, to simulate spawned threads that
// we cannot properly interrupt on cancellation
}
+
+ }
+ /**
+ * A task that locks if cancellation attempts to cleanly shut down
+ */
+ public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> {
- private static final class LockHolder extends Thread {
+ @Override
+ protected void init() {}
- private final OneShotLatch trigger;
- private final Object lock;
- private volatile boolean canceled;
+ @Override
+ protected void run() throws Exception {
+ final OneShotLatch latch = new OneShotLatch();
+ final Object lock = new Object();
- private LockHolder(Object lock, OneShotLatch trigger) {
- this.lock = lock;
- this.trigger = trigger;
- }
+ LockHolder holder = new LockHolder(lock, latch);
+ holder.start();
+ try {
+ // cancellation should try and cancel this
+ getCancelables().registerClosable(holder);
- @Override
- public void run() {
+ // wait till the lock holder has the lock
+ latch.await();
+
+ // we are at the point where cancelling can happen
+ SYNC_LATCH.trigger();
+
+ // try to acquire the lock - this is not possible as long as the lock holder
+ // thread lives
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (lock) {
- while (!canceled) {
- // signal that we grabbed the lock
- trigger.trigger();
-
- // basically freeze this thread
- try {
- Thread.sleep(1000000000);
- } catch (InterruptedException ignored) {}
- }
+ // nothing
}
}
+ finally {
+ holder.close();
+ }
+
+ }
+
+ @Override
+ protected void cleanup() {}
+
+ @Override
+ protected void cancelTask() throws Exception {
+ throw new Exception("test exception");
+ }
+
+ }
- public void cancel() {
- canceled = true;
+ // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+
+ /**
+ * A thread that holds a lock as long as it lives
+ */
+ private static final class LockHolder extends Thread implements Closeable {
+
+ private final OneShotLatch trigger;
+ private final Object lock;
+ private volatile boolean canceled;
+
+ private LockHolder(Object lock, OneShotLatch trigger) {
+ this.lock = lock;
+ this.trigger = trigger;
+ }
+
+ @Override
+ public void run() {
+ synchronized (lock) {
+ while (!canceled) {
+ // signal that we grabbed the lock
+ trigger.trigger();
+
+ // basically freeze this thread
+ try {
+ //noinspection SleepWhileHoldingLock
+ Thread.sleep(1000000000);
+ } catch (InterruptedException ignored) {}
+ }
}
}
+
+ public void cancel() {
+ canceled = true;
+ }
+
+ @Override
+ public void close() {
+ canceled = true;
+ interrupt();
+ }
}
}