You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/08/30 08:01:46 UTC

[flink] branch master updated: [FLINK-13514] Fix instability in StreamTaskTest.testAsyncCheckpointingConcurrentCloseAfterAcknowledge

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 60b65c4  [FLINK-13514] Fix instability in StreamTaskTest.testAsyncCheckpointingConcurrentCloseAfterAcknowledge
60b65c4 is described below

commit 60b65c42ca2709d20cb59f1617d96a80ec870b6c
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Aug 29 16:33:33 2019 +0200

    [FLINK-13514] Fix instability in StreamTaskTest.testAsyncCheckpointingConcurrentCloseAfterAcknowledge
    
    Before, thread pool shutdown would interrupt our waiting method.
    Production code cannot throw an InterruptedException here and would also
    not be correct if one is thrown.
    
    We now swallow interrupted exceptions and wait until we successfully
    return from await().
---
 .../flink/streaming/runtime/tasks/StreamTaskTest.java    | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 946895a..ca6af8c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -488,11 +488,23 @@ public class StreamTaskTest extends TestLogger {
 		CheckpointResponder checkpointResponder = mock(CheckpointResponder.class);
 		doAnswer(new Answer() {
 			@Override
-			public Object answer(InvocationOnMock invocation) throws Throwable {
+			public Object answer(InvocationOnMock invocation) {
 				acknowledgeCheckpointLatch.trigger();
 
 				// block here so that we can issue the concurrent cancel call
-				completeAcknowledge.await();
+				while (true) {
+					try {
+						// wait until we successfully await (no pun intended)
+						completeAcknowledge.await();
+
+						// when await() returns normally, we break out of the loop
+						break;
+					} catch (InterruptedException e) {
+						// survive interruptions that arise from thread pool shutdown
+						// production code cannot actually throw InterruptedException from
+						// checkpoint acknowledgement
+					}
+				}
 
 				return null;
 			}