You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/21 06:14:29 UTC

[flink] branch release-1.9 updated (d8d609f -> 97d1107)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d8d609f  [FLINK-13380][docs] Improve docs on how to use Flink with Kubernetes
     new 6f7d625  [FLINK-13588] Report exception message when failing StreamTask#handleAsyncException
     new c439519  [hotfix] Extend MockEnvironment to provide better testing tools
     new 97d1107  [hotfix] Encapsulate async exception handling into StreamTask#StreamTaskAsyncExceptionHandler

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../operators/testutils/MockEnvironment.java       | 13 ++--
 .../runtime/tasks/AsynchronousException.java       |  5 --
 .../flink/streaming/runtime/tasks/StreamTask.java  | 20 ++++++-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 27 +++++++++
 .../streaming/runtime/StreamTaskTimerITCase.java   | 69 +++++++---------------
 5 files changed, 72 insertions(+), 62 deletions(-)


[flink] 01/03: [FLINK-13588] Report exception message when failing StreamTask#handleAsyncException

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f7d62528eba6387dae20f8e1678af2fd36a46c6
Author: John Lonergan <jo...@gmail.com>
AuthorDate: Thu Aug 15 23:38:01 2019 +0100

    [FLINK-13588] Report exception message when failing StreamTask#handleAsyncException
    
    Don't throw away exception info in logging as it make diagnosis extremely hard
---
 .../runtime/tasks/AsynchronousException.java       |  5 --
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 50 ++++++++++++++--
 .../streaming/runtime/StreamTaskTimerITCase.java   | 69 +++++++---------------
 4 files changed, 66 insertions(+), 60 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
index 1af81d0..a187aad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
@@ -35,9 +35,4 @@ public class AsynchronousException extends Exception {
 	public AsynchronousException(String message, Throwable cause) {
 		super(message, cause);
 	}
-
-	@Override
-	public String toString() {
-		return "AsynchronousException{" + getCause() + "}";
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index f9fe110..1dadee5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -926,7 +926,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	public void handleAsyncException(String message, Throwable exception) {
 		if (isRunning) {
 			// only fail if the task is still running
-			getEnvironment().failExternally(exception);
+			getEnvironment().failExternally(new AsynchronousException(message, exception));
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 601d72b..94cb0a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -128,11 +128,7 @@ import org.mockito.stubbing.Answer;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -178,6 +174,50 @@ public class StreamTaskTest extends TestLogger {
 	public final Timeout timeoutPerTest = Timeout.seconds(30);
 
 	/**
+	 * This test checks the async exceptions handling wraps the message and cause as an AsynchronousException
+	 * and propagates this to the environment.
+	 */
+	@Test
+	public void handleAsyncException() throws Throwable {
+		MockEnvironment e = MockEnvironment.builder().build();
+		RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
+
+		BlockingCloseStreamOperator.resetLatches();
+		Configuration taskConfiguration = new Configuration();
+		StreamConfig streamConfig = new StreamConfig(taskConfiguration);
+		streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
+		streamConfig.setOperatorID(new OperatorID());
+
+		try (MockEnvironment mockEnvironment =
+				 new MockEnvironmentBuilder()
+					 .setTaskName("Test Task")
+					 .setMemorySize(32L * 1024L)
+					 .setInputSplitProvider(new MockInputSplitProvider())
+					 .setBufferSize(1)
+					 .setTaskConfiguration(taskConfiguration)
+					 .build()) {
+
+			RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));
+
+			BlockingCloseStreamOperator.inClose.await();
+
+			// check that the StreamTask is not yet in isRunning == false
+			assertTrue(task.streamTask.isRunning());
+
+
+			// generate an error report and expect it to be caught by the Environment
+			mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
+			task.streamTask.handleAsyncException("EXPECTED_ERROR MESSAGE", expectedException);
+
+			// expect an AsynchronousException containing the supplied error details
+			Optional<Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
+			AsynchronousException cause = (AsynchronousException)actualExternalFailureCause.get();
+			assertEquals(cause.getMessage(), "EXPECTED_ERROR MESSAGE");
+			assertEquals(cause.getCause(), expectedException);
+		}
+	}
+
+	/**
 	 * This test checks that cancel calls that are issued before the operator is
 	 * instantiated still lead to proper canceling.
 	 */
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index b87f486..c6da16c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.TimerException;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -41,8 +42,11 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for the timer service of {@code StreamTask}.
  *
@@ -73,27 +77,26 @@ public class StreamTaskTimerITCase extends AbstractTestBase {
 
 		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));
 
-		boolean testSuccess = false;
 		try {
 			env.execute("Timer test");
 		} catch (JobExecutionException e) {
-			if (e.getCause() instanceof TimerException) {
-				TimerException te = (TimerException) e.getCause();
-				if (te.getCause() instanceof RuntimeException) {
-					RuntimeException re = (RuntimeException) te.getCause();
-					if (re.getMessage().equals("TEST SUCCESS")) {
-						testSuccess = true;
-					} else {
-						throw e;
-					}
-				} else {
-					throw e;
-				}
-			} else {
+			verifyJobExecutionException(e);
+		}
+	}
+
+	private void verifyJobExecutionException(JobExecutionException e) throws JobExecutionException {
+		final Optional<TimerException> optionalTimerException = ExceptionUtils.findThrowable(e, TimerException.class);
+		assertTrue(optionalTimerException.isPresent());
+
+		TimerException te = optionalTimerException.get();
+		if (te.getCause() instanceof RuntimeException) {
+			RuntimeException re = (RuntimeException) te.getCause();
+			if (!re.getMessage().equals("TEST SUCCESS")) {
 				throw e;
 			}
+		} else {
+			throw e;
 		}
-		Assert.assertTrue(testSuccess);
 	}
 
 	/**
@@ -110,27 +113,11 @@ public class StreamTaskTimerITCase extends AbstractTestBase {
 
 		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));
 
-		boolean testSuccess = false;
 		try {
 			env.execute("Timer test");
 		} catch (JobExecutionException e) {
-			if (e.getCause() instanceof TimerException) {
-				TimerException te = (TimerException) e.getCause();
-				if (te.getCause() instanceof RuntimeException) {
-					RuntimeException re = (RuntimeException) te.getCause();
-					if (re.getMessage().equals("TEST SUCCESS")) {
-						testSuccess = true;
-					} else {
-						throw e;
-					}
-				} else {
-					throw e;
-				}
-			} else {
-				throw e;
-			}
+			verifyJobExecutionException(e);
 		}
-		Assert.assertTrue(testSuccess);
 	}
 
 	@Test
@@ -146,27 +133,11 @@ public class StreamTaskTimerITCase extends AbstractTestBase {
 				BasicTypeInfo.STRING_TYPE_INFO,
 				new TwoInputTimerOperator(ChainingStrategy.NEVER));
 
-		boolean testSuccess = false;
 		try {
 			env.execute("Timer test");
 		} catch (JobExecutionException e) {
-			if (e.getCause() instanceof TimerException) {
-				TimerException te = (TimerException) e.getCause();
-				if (te.getCause() instanceof RuntimeException) {
-					RuntimeException re = (RuntimeException) te.getCause();
-					if (re.getMessage().equals("TEST SUCCESS")) {
-						testSuccess = true;
-					} else {
-						throw e;
-					}
-				} else {
-					throw e;
-				}
-			} else {
-				throw e;
-			}
+			verifyJobExecutionException(e);
 		}
-		Assert.assertTrue(testSuccess);
 	}
 
 	private static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, ProcessingTimeCallback {


[flink] 03/03: [hotfix] Encapsulate async exception handling into StreamTask#StreamTaskAsyncExceptionHandler

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 97d11074deb8e4eecda9f870cfd3d069eaae0f5d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 20 12:11:08 2019 +0200

    [hotfix] Encapsulate async exception handling into StreamTask#StreamTaskAsyncExceptionHandler
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 20 +++++++++-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 45 +++++++---------------
 2 files changed, 32 insertions(+), 33 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1dadee5..1b1cfc4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -181,6 +181,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** The currently active background materialization threads. */
 	private final CloseableRegistry cancelables = new CloseableRegistry();
 
+	private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
+
 	/**
 	 * Flag to mark the task "in operation", in which case check needs to be initialized to true,
 	 * so that early cancel() before invoke() behaves correctly.
@@ -248,6 +250,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		this.recordWriters = createRecordWriters(configuration, environment);
 		this.syncSavepointLatch = new SynchronousSavepointLatch();
 		this.mailbox = new MailboxImpl();
+		this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
 	}
 
 	// ------------------------------------------------------------------------
@@ -926,7 +929,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	public void handleAsyncException(String message, Throwable exception) {
 		if (isRunning) {
 			// only fail if the task is still running
-			getEnvironment().failExternally(new AsynchronousException(message, exception));
+			asyncExceptionHandler.handleAsyncException(message, exception);
 		}
 	}
 
@@ -942,6 +945,21 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Utility class to encapsulate the handling of asynchronous exceptions.
+	 */
+	static class StreamTaskAsyncExceptionHandler {
+		private final Environment environment;
+
+		StreamTaskAsyncExceptionHandler(Environment environment) {
+			this.environment = environment;
+		}
+
+		void handleAsyncException(String message, Throwable exception) {
+			environment.failExternally(new AsynchronousException(message, exception));
+		}
+	}
+
+	/**
 	 * This runnable executes the asynchronous parts of all involved backend snapshots for the subtask.
 	 */
 	@VisibleForTesting
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 87b7b5e..2259501 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -183,44 +183,25 @@ public class StreamTaskTest extends TestLogger {
 	 * and propagates this to the environment.
 	 */
 	@Test
-	public void handleAsyncException() throws Throwable {
-		MockEnvironment e = MockEnvironment.builder().build();
+	public void streamTaskAsyncExceptionHandler_handleException_forwardsMessageProperly() {
+		MockEnvironment mockEnvironment = MockEnvironment.builder().build();
 		RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
 
-		BlockingCloseStreamOperator.resetLatches();
-		Configuration taskConfiguration = new Configuration();
-		StreamConfig streamConfig = new StreamConfig(taskConfiguration);
-		streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
-		streamConfig.setOperatorID(new OperatorID());
-
-		try (MockEnvironment mockEnvironment =
-				new MockEnvironmentBuilder()
-					.setTaskName("Test Task")
-					.setMemorySize(32L * 1024L)
-					.setBufferSize(1)
-					.setTaskConfiguration(taskConfiguration)
-					.build()) {
-
-			RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));
+		final StreamTask.StreamTaskAsyncExceptionHandler asyncExceptionHandler = new StreamTask.StreamTaskAsyncExceptionHandler(mockEnvironment);
 
-			BlockingCloseStreamOperator.inClose.await();
+		mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
+		final String expectedErrorMessage = "EXPECTED_ERROR MESSAGE";
 
-			// check that the StreamTask is not yet in isRunning == false
-			assertTrue(task.streamTask.isRunning());
+		asyncExceptionHandler.handleAsyncException(expectedErrorMessage, expectedException);
 
-			// generate an error report and expect it to be caught by the Environment
-			mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
-			task.streamTask.handleAsyncException("EXPECTED_ERROR MESSAGE", expectedException);
+		// expect an AsynchronousException containing the supplied error details
+		Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
+		final Throwable actualException = actualExternalFailureCause
+			.orElseThrow(() -> new AssertionError("Expected exceptional completion"));
 
-			// expect an AsynchronousException containing the supplied error details
-			Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
-			final Throwable actualException = actualExternalFailureCause
-				.orElseThrow(() -> new AssertionError("Expected exceptional completion"));
-
-			assertThat(actualException, instanceOf(AsynchronousException.class));
-			assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
-			assertThat(actualException.getCause(), is(expectedException));
-		}
+		assertThat(actualException, instanceOf(AsynchronousException.class));
+		assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
+		assertThat(actualException.getCause(), is(expectedException));
 	}
 
 	/**


[flink] 02/03: [hotfix] Extend MockEnvironment to provide better testing tools

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c439519cbd2fbefc8542023cf69de1dbc0879116
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Aug 20 11:59:12 2019 +0200

    [hotfix] Extend MockEnvironment to provide better testing tools
---
 .../operators/testutils/MockEnvironment.java       | 13 ++++-----
 .../streaming/runtime/tasks/StreamTaskTest.java    | 34 +++++++++++++---------
 2 files changed, 26 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index d3d462f..3948dee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -66,9 +66,9 @@ import static org.junit.Assert.fail;
  * IMPORTANT! Remember to close environment after usage!
  */
 public class MockEnvironment implements Environment, AutoCloseable {
-	
+
 	private final TaskInfo taskInfo;
-	
+
 	private final ExecutionConfig executionConfig;
 
 	private final MemoryManager memManager;
@@ -107,9 +107,9 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
 
-	private Optional<Class<Throwable>> expectedExternalFailureCause = Optional.empty();
+	private Optional<Class<? extends Throwable>> expectedExternalFailureCause = Optional.empty();
 
-	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
+	private Optional<? extends Throwable> actualExternalFailureCause = Optional.empty();
 
 	private final TaskMetricGroup taskMetricGroup;
 
@@ -164,7 +164,6 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		this.taskMetricGroup = taskMetricGroup;
 	}
 
-
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
 		try {
 			final IteratorWrappingTestSingleInputGate<Record> reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
@@ -346,11 +345,11 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		ioManager.close();
 	}
 
-	public void setExpectedExternalFailureCause(Class<Throwable> expectedThrowableClass) {
+	public void setExpectedExternalFailureCause(Class<? extends Throwable> expectedThrowableClass) {
 		this.expectedExternalFailureCause = Optional.of(expectedThrowableClass);
 	}
 
-	public Optional<Throwable> getActualExternalFailureCause() {
+	public Optional<? extends Throwable> getActualExternalFailureCause() {
 		return actualExternalFailureCause;
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 94cb0a0..87b7b5e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -128,7 +128,12 @@ import org.mockito.stubbing.Answer;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -189,13 +194,12 @@ public class StreamTaskTest extends TestLogger {
 		streamConfig.setOperatorID(new OperatorID());
 
 		try (MockEnvironment mockEnvironment =
-				 new MockEnvironmentBuilder()
-					 .setTaskName("Test Task")
-					 .setMemorySize(32L * 1024L)
-					 .setInputSplitProvider(new MockInputSplitProvider())
-					 .setBufferSize(1)
-					 .setTaskConfiguration(taskConfiguration)
-					 .build()) {
+				new MockEnvironmentBuilder()
+					.setTaskName("Test Task")
+					.setMemorySize(32L * 1024L)
+					.setBufferSize(1)
+					.setTaskConfiguration(taskConfiguration)
+					.build()) {
 
 			RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));
 
@@ -204,16 +208,18 @@ public class StreamTaskTest extends TestLogger {
 			// check that the StreamTask is not yet in isRunning == false
 			assertTrue(task.streamTask.isRunning());
 
-
 			// generate an error report and expect it to be caught by the Environment
-			mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
+			mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
 			task.streamTask.handleAsyncException("EXPECTED_ERROR MESSAGE", expectedException);
 
 			// expect an AsynchronousException containing the supplied error details
-			Optional<Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
-			AsynchronousException cause = (AsynchronousException)actualExternalFailureCause.get();
-			assertEquals(cause.getMessage(), "EXPECTED_ERROR MESSAGE");
-			assertEquals(cause.getCause(), expectedException);
+			Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
+			final Throwable actualException = actualExternalFailureCause
+				.orElseThrow(() -> new AssertionError("Expected exceptional completion"));
+
+			assertThat(actualException, instanceOf(AsynchronousException.class));
+			assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
+			assertThat(actualException.getCause(), is(expectedException));
 		}
 	}