You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 11:03:06 UTC

[flink] branch master updated: [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2f67e  [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
2e2f67e is described below

commit 2e2f67ed348c334402a5d0af76b0fd47cedcf5a7
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jul 11 15:40:06 2019 +0200

    [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
    
    Before, exceptions that occurred after cancelling a source (as the
    KafkaConsumer did, for example) would make a job fail when attempting a
    "stop-with-savepoint". Now we ignore those exceptions.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  |  4 ++
 .../runtime/tasks/SourceStreamTaskTest.java        | 54 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 50fdca1..608d972 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -127,6 +127,10 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	@Override
 	protected void finishTask() throws Exception {
+		// We tell the mailbox to finish, to prevent any exceptions that might occur during
+		// finishing from leading to a FAILED state. This could happen, for example, when cancelling
+		// sources as part of a "stop-with-savepoint".
+		mailboxProcessor.allActionsCompleted();
 		cancelTask();
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9019d88..a07fdad 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -231,6 +231,27 @@ public class SourceStreamTaskTest {
 			testHarness.getOutput());
 	}
 
+	/**
+	 * If finishing a task doesn't swallow exceptions this test would fail with an exception.
+	 */
+	@Test
+	public void finishingIgnoresExceptions() throws Exception {
+		final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(
+				SourceStreamTask::new,
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setupOutputForSingletonOperatorChain();
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(new StreamSource<>(new ExceptionThrowingSource()));
+		streamConfig.setOperatorID(new OperatorID());
+
+		testHarness.invoke();
+		ExceptionThrowingSource.isInRunLoop.get();
+		testHarness.getTask().finishTask();
+
+		testHarness.waitForTaskCompletion();
+	}
+
 	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
@@ -406,5 +427,38 @@ public class SourceStreamTaskTest {
 			return dataProcessing;
 		}
 	}
+
+	/**
+	 * A {@link SourceFunction} that throws an exception from {@link #run(SourceContext)} when it is
+	 * cancelled via {@link #cancel()}.
+	 */
+	private static class ExceptionThrowingSource implements SourceFunction<String> {
+
+		private volatile boolean running = true;
+		static CompletableFuture<Void> isInRunLoop = new CompletableFuture<>();
+
+		public static class TestException extends RuntimeException {
+			public TestException(String message) {
+				super(message);
+			}
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws TestException {
+			while (running) {
+				if (!isInRunLoop.isDone()) {
+					isInRunLoop.complete(null);
+				}
+				ctx.collect("hello");
+			}
+
+			throw new TestException("Oh no, we're failing.");
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
 }