You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 11:27:41 UTC

[flink] branch release-1.9 updated: [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 42a475d  [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
42a475d is described below

commit 42a475d42ec136eecdc8eb972fc3d2d555e48a67
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jul 11 15:40:06 2019 +0200

    [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
    
    Before, exceptions that occurred after cancelling a source (as the
    KafkaConsumer did, for example) would make a job fail when attempting a
    "stop-with-savepoint". Now we ignore those exceptions.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 11 ++-
 .../runtime/tasks/SourceStreamTaskTest.java        | 89 ++++++++++++++++++++++
 2 files changed, 99 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 73f6bc4..d7b467d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -49,6 +49,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	private volatile boolean externallyInducedCheckpoints;
 
+	/**
+	 * Indicates whether this Task was purposefully finished (by finishTask()), in this case we
+	 * want to ignore exceptions thrown after finishing, to ensure shutdown works smoothly.
+	 */
+	private volatile boolean isFinished = false;
+
 	public SourceStreamTask(Environment env) {
 		super(env);
 	}
@@ -118,7 +124,9 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 		}
 
 		sourceThread.join();
-		sourceThread.checkThrowSourceExecutionException();
+		if (!isFinished) {
+			sourceThread.checkThrowSourceExecutionException();
+		}
 
 		context.allActionsCompleted();
 	}
@@ -147,6 +155,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	@Override
 	protected void finishTask() throws Exception {
+		isFinished = true;
 		cancelTask();
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9019d88..9662720 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -56,6 +57,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -231,6 +234,59 @@ public class SourceStreamTaskTest {
 			testHarness.getOutput());
 	}
 
+	/**
+	 * Cancelling should not swallow exceptions in the Invokable. They will eventually be ignored
+	 * because the bubble up into the Task thread, where they go nowhere.
+	 */
+	@Test
+	public void cancellingForwardsExceptions() throws Exception {
+		final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(
+				SourceStreamTask::new,
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setupOutputForSingletonOperatorChain();
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(new StreamSource<>(new ExceptionThrowingSource()));
+		streamConfig.setOperatorID(new OperatorID());
+
+		testHarness.invoke();
+		ExceptionThrowingSource.isInRunLoop.get();
+		testHarness.getTask().cancel();
+
+		Optional<ExceptionThrowingSource.TestException> testException = Optional.empty();
+		try {
+			testHarness.waitForTaskCompletion();
+		} catch (Throwable t) {
+			testException = ExceptionUtils.findThrowable(
+					t,
+					ExceptionThrowingSource.TestException.class);
+		}
+
+		assertTrue(testException.isPresent());
+		assertThat(testException.get(), isA(ExceptionThrowingSource.TestException.class));
+	}
+
+	/**
+	 * If finishing a task doesn't swallow exceptions this test would fail with an exception.
+	 */
+	@Test
+	public void finishingIgnoresExceptions() throws Exception {
+		final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(
+				SourceStreamTask::new,
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setupOutputForSingletonOperatorChain();
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		streamConfig.setStreamOperator(new StreamSource<>(new ExceptionThrowingSource()));
+		streamConfig.setOperatorID(new OperatorID());
+
+		testHarness.invoke();
+		ExceptionThrowingSource.isInRunLoop.get();
+		testHarness.getTask().finishTask();
+
+		testHarness.waitForTaskCompletion();
+	}
+
 	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
@@ -406,5 +462,38 @@ public class SourceStreamTaskTest {
 			return dataProcessing;
 		}
 	}
+
+	/**
+	 * A {@link SourceFunction} that throws an exception from {@link #run(SourceContext)} when it is
+	 * cancelled via {@link #cancel()}.
+	 */
+	private static class ExceptionThrowingSource implements SourceFunction<String> {
+
+		private volatile boolean running = true;
+		static CompletableFuture<Void> isInRunLoop = new CompletableFuture<>();
+
+		public static class TestException extends RuntimeException {
+			public TestException(String message) {
+				super(message);
+			}
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws TestException {
+			while (running) {
+				if (!isInRunLoop.isDone()) {
+					isInRunLoop.complete(null);
+				}
+				ctx.collect("hello");
+			}
+
+			throw new TestException("Oh no, we're failing.");
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
 }