You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/21 06:14:32 UTC

[flink] 03/03: [hotfix] Encapsulate async exception handling into StreamTask#StreamTaskAsyncExceptionHandler

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 97d11074deb8e4eecda9f870cfd3d069eaae0f5d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 20 12:11:08 2019 +0200

    [hotfix] Encapsulate async exception handling into StreamTask#StreamTaskAsyncExceptionHandler
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 20 +++++++++-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 45 +++++++---------------
 2 files changed, 32 insertions(+), 33 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1dadee5..1b1cfc4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -181,6 +181,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** The currently active background materialization threads. */
 	private final CloseableRegistry cancelables = new CloseableRegistry();
 
+	private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
+
 	/**
 	 * Flag to mark the task "in operation", in which case check needs to be initialized to true,
 	 * so that early cancel() before invoke() behaves correctly.
@@ -248,6 +250,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		this.recordWriters = createRecordWriters(configuration, environment);
 		this.syncSavepointLatch = new SynchronousSavepointLatch();
 		this.mailbox = new MailboxImpl();
+		this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
 	}
 
 	// ------------------------------------------------------------------------
@@ -926,7 +929,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	public void handleAsyncException(String message, Throwable exception) {
 		if (isRunning) {
 			// only fail if the task is still running
-			getEnvironment().failExternally(new AsynchronousException(message, exception));
+			asyncExceptionHandler.handleAsyncException(message, exception);
 		}
 	}
 
@@ -942,6 +945,21 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Utility class to encapsulate the handling of asynchronous exceptions.
+	 */
+	static class StreamTaskAsyncExceptionHandler {
+		private final Environment environment;
+
+		StreamTaskAsyncExceptionHandler(Environment environment) {
+			this.environment = environment;
+		}
+
+		void handleAsyncException(String message, Throwable exception) {
+			environment.failExternally(new AsynchronousException(message, exception));
+		}
+	}
+
+	/**
 	 * This runnable executes the asynchronous parts of all involved backend snapshots for the subtask.
 	 */
 	@VisibleForTesting
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 87b7b5e..2259501 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -183,44 +183,25 @@ public class StreamTaskTest extends TestLogger {
 	 * and propagates this to the environment.
 	 */
 	@Test
-	public void handleAsyncException() throws Throwable {
-		MockEnvironment e = MockEnvironment.builder().build();
+	public void streamTaskAsyncExceptionHandler_handleException_forwardsMessageProperly() {
+		MockEnvironment mockEnvironment = MockEnvironment.builder().build();
 		RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
 
-		BlockingCloseStreamOperator.resetLatches();
-		Configuration taskConfiguration = new Configuration();
-		StreamConfig streamConfig = new StreamConfig(taskConfiguration);
-		streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
-		streamConfig.setOperatorID(new OperatorID());
-
-		try (MockEnvironment mockEnvironment =
-				new MockEnvironmentBuilder()
-					.setTaskName("Test Task")
-					.setMemorySize(32L * 1024L)
-					.setBufferSize(1)
-					.setTaskConfiguration(taskConfiguration)
-					.build()) {
-
-			RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));
+		final StreamTask.StreamTaskAsyncExceptionHandler asyncExceptionHandler = new StreamTask.StreamTaskAsyncExceptionHandler(mockEnvironment);
 
-			BlockingCloseStreamOperator.inClose.await();
+		mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
+		final String expectedErrorMessage = "EXPECTED_ERROR MESSAGE";
 
-			// check that the StreamTask is not yet in isRunning == false
-			assertTrue(task.streamTask.isRunning());
+		asyncExceptionHandler.handleAsyncException(expectedErrorMessage, expectedException);
 
-			// generate an error report and expect it to be caught by the Environment
-			mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
-			task.streamTask.handleAsyncException("EXPECTED_ERROR MESSAGE", expectedException);
+		// expect an AsynchronousException containing the supplied error details
+		Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
+		final Throwable actualException = actualExternalFailureCause
+			.orElseThrow(() -> new AssertionError("Expected exceptional completion"));
 
-			// expect an AsynchronousException containing the supplied error details
-			Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
-			final Throwable actualException = actualExternalFailureCause
-				.orElseThrow(() -> new AssertionError("Expected exceptional completion"));
-
-			assertThat(actualException, instanceOf(AsynchronousException.class));
-			assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
-			assertThat(actualException.getCause(), is(expectedException));
-		}
+		assertThat(actualException, instanceOf(AsynchronousException.class));
+		assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
+		assertThat(actualException.getCause(), is(expectedException));
 	}
 
 	/**