You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/21 06:14:32 UTC
[flink] 03/03: [hotfix] Encapsulate async exception handling into
StreamTask#StreamTaskAsyncExceptionHandler
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 97d11074deb8e4eecda9f870cfd3d069eaae0f5d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 20 12:11:08 2019 +0200
[hotfix] Encapsulate async exception handling into StreamTask#StreamTaskAsyncExceptionHandler
---
.../flink/streaming/runtime/tasks/StreamTask.java | 20 +++++++++-
.../streaming/runtime/tasks/StreamTaskTest.java | 45 +++++++---------------
2 files changed, 32 insertions(+), 33 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1dadee5..1b1cfc4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -181,6 +181,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
/** The currently active background materialization threads. */
private final CloseableRegistry cancelables = new CloseableRegistry();
+ private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
+
/**
* Flag to mark the task "in operation", in which case check needs to be initialized to true,
* so that early cancel() before invoke() behaves correctly.
@@ -248,6 +250,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
this.recordWriters = createRecordWriters(configuration, environment);
this.syncSavepointLatch = new SynchronousSavepointLatch();
this.mailbox = new MailboxImpl();
+ this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
}
// ------------------------------------------------------------------------
@@ -926,7 +929,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
public void handleAsyncException(String message, Throwable exception) {
if (isRunning) {
// only fail if the task is still running
- getEnvironment().failExternally(new AsynchronousException(message, exception));
+ asyncExceptionHandler.handleAsyncException(message, exception);
}
}
@@ -942,6 +945,21 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
/**
+ * Utility class to encapsulate the handling of asynchronous exceptions.
+ */
+ static class StreamTaskAsyncExceptionHandler {
+ private final Environment environment;
+
+ StreamTaskAsyncExceptionHandler(Environment environment) {
+ this.environment = environment;
+ }
+
+ void handleAsyncException(String message, Throwable exception) {
+ environment.failExternally(new AsynchronousException(message, exception));
+ }
+ }
+
+ /**
* This runnable executes the asynchronous parts of all involved backend snapshots for the subtask.
*/
@VisibleForTesting
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 87b7b5e..2259501 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -183,44 +183,25 @@ public class StreamTaskTest extends TestLogger {
* and propagates this to the environment.
*/
@Test
- public void handleAsyncException() throws Throwable {
- MockEnvironment e = MockEnvironment.builder().build();
+ public void streamTaskAsyncExceptionHandler_handleException_forwardsMessageProperly() {
+ MockEnvironment mockEnvironment = MockEnvironment.builder().build();
RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
- BlockingCloseStreamOperator.resetLatches();
- Configuration taskConfiguration = new Configuration();
- StreamConfig streamConfig = new StreamConfig(taskConfiguration);
- streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
- streamConfig.setOperatorID(new OperatorID());
-
- try (MockEnvironment mockEnvironment =
- new MockEnvironmentBuilder()
- .setTaskName("Test Task")
- .setMemorySize(32L * 1024L)
- .setBufferSize(1)
- .setTaskConfiguration(taskConfiguration)
- .build()) {
-
- RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));
+ final StreamTask.StreamTaskAsyncExceptionHandler asyncExceptionHandler = new StreamTask.StreamTaskAsyncExceptionHandler(mockEnvironment);
- BlockingCloseStreamOperator.inClose.await();
+ mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
+ final String expectedErrorMessage = "EXPECTED_ERROR MESSAGE";
- // check that the StreamTask is not yet in isRunning == false
- assertTrue(task.streamTask.isRunning());
+ asyncExceptionHandler.handleAsyncException(expectedErrorMessage, expectedException);
- // generate an error report and expect it to be caught by the Environment
- mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
- task.streamTask.handleAsyncException("EXPECTED_ERROR MESSAGE", expectedException);
+ // expect an AsynchronousException containing the supplied error details
+ Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
+ final Throwable actualException = actualExternalFailureCause
+ .orElseThrow(() -> new AssertionError("Expected exceptional completion"));
- // expect an AsynchronousException containing the supplied error details
- Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
- final Throwable actualException = actualExternalFailureCause
- .orElseThrow(() -> new AssertionError("Expected exceptional completion"));
-
- assertThat(actualException, instanceOf(AsynchronousException.class));
- assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
- assertThat(actualException.getCause(), is(expectedException));
- }
+ assertThat(actualException, instanceOf(AsynchronousException.class));
+ assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
+ assertThat(actualException.getCause(), is(expectedException));
}
/**