You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 11:03:06 UTC
[flink] branch master updated: [FLINK-13124] Don't forward
exceptions when finishing SourceStreamTask
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2e2f67e [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
2e2f67e is described below
commit 2e2f67ed348c334402a5d0af76b0fd47cedcf5a7
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jul 11 15:40:06 2019 +0200
[FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
Before, exceptions that occurred after cancelling a source (as the
KafkaConsumer did, for example) would make a job fail when attempting a
"stop-with-savepoint". Now we ignore those exceptions.
---
.../streaming/runtime/tasks/SourceStreamTask.java | 4 ++
.../runtime/tasks/SourceStreamTaskTest.java | 54 ++++++++++++++++++++++
2 files changed, 58 insertions(+)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 50fdca1..608d972 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -127,6 +127,10 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
@Override
protected void finishTask() throws Exception {
+ // We tell the mailbox to finish, to prevent any exceptions that might occur during
+ // finishing from leading to a FAILED state. This could happen, for example, when cancelling
+ // sources as part of a "stop-with-savepoint".
+ mailboxProcessor.allActionsCompleted();
cancelTask();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9019d88..a07fdad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -231,6 +231,27 @@ public class SourceStreamTaskTest {
testHarness.getOutput());
}
+ /**
+ * If finishing a task doesn't swallow exceptions this test would fail with an exception.
+ */
+ @Test
+ public void finishingIgnoresExceptions() throws Exception {
+ final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(
+ SourceStreamTask::new,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setupOutputForSingletonOperatorChain();
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ streamConfig.setStreamOperator(new StreamSource<>(new ExceptionThrowingSource()));
+ streamConfig.setOperatorID(new OperatorID());
+
+ testHarness.invoke();
+ ExceptionThrowingSource.isInRunLoop.get();
+ testHarness.getTask().finishTask();
+
+ testHarness.waitForTaskCompletion();
+ }
+
private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
private static final long serialVersionUID = 1;
@@ -406,5 +427,38 @@ public class SourceStreamTaskTest {
return dataProcessing;
}
}
+
+ /**
+ * A {@link SourceFunction} that throws an exception from {@link #run(SourceContext)} when it is
+ * cancelled via {@link #cancel()}.
+ */
+ private static class ExceptionThrowingSource implements SourceFunction<String> {
+
+ private volatile boolean running = true;
+ static CompletableFuture<Void> isInRunLoop = new CompletableFuture<>();
+
+ public static class TestException extends RuntimeException {
+ public TestException(String message) {
+ super(message);
+ }
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws TestException {
+ while (running) {
+ if (!isInRunLoop.isDone()) {
+ isInRunLoop.complete(null);
+ }
+ ctx.collect("hello");
+ }
+
+ throw new TestException("Oh no, we're failing.");
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
}