You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 11:27:41 UTC
[flink] branch release-1.9 updated: [FLINK-13124] Don't forward
exceptions when finishing SourceStreamTask
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 42a475d [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
42a475d is described below
commit 42a475d42ec136eecdc8eb972fc3d2d555e48a67
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jul 11 15:40:06 2019 +0200
[FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
Before, exceptions that occurred after cancelling a source (as the
KafkaConsumer did, for example) would make a job fail when attempting a
"stop-with-savepoint". Now we ignore those exceptions.
---
.../streaming/runtime/tasks/SourceStreamTask.java | 11 ++-
.../runtime/tasks/SourceStreamTaskTest.java | 89 ++++++++++++++++++++++
2 files changed, 99 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 73f6bc4..d7b467d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -49,6 +49,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
private volatile boolean externallyInducedCheckpoints;
+ /**
+ * Indicates whether this Task was purposefully finished (by finishTask()), in this case we
+ * want to ignore exceptions thrown after finishing, to ensure shutdown works smoothly.
+ */
+ private volatile boolean isFinished = false;
+
public SourceStreamTask(Environment env) {
super(env);
}
@@ -118,7 +124,9 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
}
sourceThread.join();
- sourceThread.checkThrowSourceExecutionException();
+ if (!isFinished) {
+ sourceThread.checkThrowSourceExecutionException();
+ }
context.allActionsCompleted();
}
@@ -147,6 +155,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
@Override
protected void finishTask() throws Exception {
+ isFinished = true;
cancelTask();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9019d88..9662720 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -46,6 +46,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -56,6 +57,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -231,6 +234,59 @@ public class SourceStreamTaskTest {
testHarness.getOutput());
}
+ /**
+ * Cancelling should not swallow exceptions in the Invokable. They will eventually be ignored
+ * because the bubble up into the Task thread, where they go nowhere.
+ */
+ @Test
+ public void cancellingForwardsExceptions() throws Exception {
+ final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(
+ SourceStreamTask::new,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setupOutputForSingletonOperatorChain();
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ streamConfig.setStreamOperator(new StreamSource<>(new ExceptionThrowingSource()));
+ streamConfig.setOperatorID(new OperatorID());
+
+ testHarness.invoke();
+ ExceptionThrowingSource.isInRunLoop.get();
+ testHarness.getTask().cancel();
+
+ Optional<ExceptionThrowingSource.TestException> testException = Optional.empty();
+ try {
+ testHarness.waitForTaskCompletion();
+ } catch (Throwable t) {
+ testException = ExceptionUtils.findThrowable(
+ t,
+ ExceptionThrowingSource.TestException.class);
+ }
+
+ assertTrue(testException.isPresent());
+ assertThat(testException.get(), isA(ExceptionThrowingSource.TestException.class));
+ }
+
+ /**
+ * If finishing a task doesn't swallow exceptions this test would fail with an exception.
+ */
+ @Test
+ public void finishingIgnoresExceptions() throws Exception {
+ final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(
+ SourceStreamTask::new,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setupOutputForSingletonOperatorChain();
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ streamConfig.setStreamOperator(new StreamSource<>(new ExceptionThrowingSource()));
+ streamConfig.setOperatorID(new OperatorID());
+
+ testHarness.invoke();
+ ExceptionThrowingSource.isInRunLoop.get();
+ testHarness.getTask().finishTask();
+
+ testHarness.waitForTaskCompletion();
+ }
+
private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
private static final long serialVersionUID = 1;
@@ -406,5 +462,38 @@ public class SourceStreamTaskTest {
return dataProcessing;
}
}
+
+ /**
+ * A {@link SourceFunction} that throws an exception from {@link #run(SourceContext)} when it is
+ * cancelled via {@link #cancel()}.
+ */
+ private static class ExceptionThrowingSource implements SourceFunction<String> {
+
+ private volatile boolean running = true;
+ static CompletableFuture<Void> isInRunLoop = new CompletableFuture<>();
+
+ public static class TestException extends RuntimeException {
+ public TestException(String message) {
+ super(message);
+ }
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws TestException {
+ while (running) {
+ if (!isInRunLoop.isDone()) {
+ isInRunLoop.complete(null);
+ }
+ ctx.collect("hello");
+ }
+
+ throw new TestException("Oh no, we're failing.");
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
}