You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2023/01/27 11:00:56 UTC

[flink] branch master updated (952c7831e74 -> 17064128599)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 952c7831e74 [hotfix][runtime] Check style ServerTransportErrorHandlingTest and ClientTransportErrorHandlingTest fix
     new c9ce64aef00 [FLINK-15550][runtime] Revert "Debug logging for TaskTest"
     new 83d8e29f6d1 [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects.
     new 17064128599 [hotfix][runtime] Check style TaskTest fix.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/runtime/taskmanager/TaskTest.java | 238 ++++++++++++---------
 1 file changed, 138 insertions(+), 100 deletions(-)


[flink] 01/03: [FLINK-15550][runtime] Revert "Debug logging for TaskTest"

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9ce64aef00e3cc187937f9fce0d1c34a3dcd166
Author: Anton Kalashnikov <ak...@gmail.com>
AuthorDate: Thu Jan 26 14:35:20 2023 +0100

    [FLINK-15550][runtime] Revert "Debug logging for TaskTest"
    
    This reverts commit 16e9b7325881b45741f3591f73a7b7bcfd39322b.
---
 .../apache/flink/runtime/taskmanager/TaskTest.java   | 20 +-------------------
 1 file changed, 1 insertion(+), 19 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 95265af8f3c..fd925ec60b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -60,8 +60,6 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
@@ -119,11 +117,6 @@ public class TaskTest extends TestLogger {
     public void setup() {
         awaitLatch = new OneShotLatch();
         triggerLatch = new OneShotLatch();
-        // Logging is only for debugging FLINK-15550.
-        log.info(
-                "Preparing trigger latch, {}, isTriggered = {}",
-                triggerLatch,
-                triggerLatch.isTriggered());
 
         shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
         wasCleanedUp = false;
@@ -675,7 +668,6 @@ public class TaskTest extends TestLogger {
         awaitLatch.await();
 
         task.failExternally(new Exception("external"));
-        assertFalse(triggerLatch.isTriggered());
         assertEquals(ExecutionState.FAILED, task.getExecutionState());
 
         // Either we cause the CancelTaskException or the TaskCanceler
@@ -1469,23 +1461,13 @@ public class TaskTest extends TestLogger {
 
     /** {@link AbstractInvokable} which throws {@link CancelTaskException} on invoke. */
     public static final class InvokableWithCancelTaskExceptionInInvoke extends AbstractInvokable {
-        static final Logger LOG = LoggerFactory.getLogger(InvokableWithExceptionOnTrigger.class);
-
         public InvokableWithCancelTaskExceptionInInvoke(Environment environment) {
             super(environment);
         }
 
         @Override
         public void invoke() {
-            // Logging and try-catch block are only for debugging FLINK-15550.
-            LOG.info("Await for {}, isTriggered = {}", triggerLatch, triggerLatch.isTriggered());
-            try {
-                awaitTriggerLatch();
-            } catch (Throwable ex) {
-                LOG.error("Fail on awaiting trigger latch", ex);
-
-                throw ex;
-            }
+            awaitTriggerLatch();
 
             throw new CancelTaskException();
         }


[flink] 02/03: [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83d8e29f6d11354eafcfd5428535a65efffc3a80
Author: Anton Kalashnikov <ak...@gmail.com>
AuthorDate: Thu Jan 26 17:26:59 2023 +0100

    [FLINK-15550][runtime] The static latches in TaskTest were replaced by latches from invokable objects.
---
 .../apache/flink/runtime/taskmanager/TaskTest.java | 192 +++++++++++++--------
 1 file changed, 123 insertions(+), 69 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fd925ec60b2..8e728bb87d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
@@ -76,6 +77,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -100,9 +102,6 @@ import static org.mockito.Mockito.when;
 public class TaskTest extends TestLogger {
     private static final String RESTORE_EXCEPTION_MSG = "TestExceptionInRestore";
 
-    private static OneShotLatch awaitLatch;
-    private static OneShotLatch triggerLatch;
-
     private ShuffleEnvironment<?, ?> shuffleEnvironment;
 
     @ClassRule
@@ -115,9 +114,6 @@ public class TaskTest extends TestLogger {
 
     @Before
     public void setup() {
-        awaitLatch = new OneShotLatch();
-        triggerLatch = new OneShotLatch();
-
         shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
         wasCleanedUp = false;
     }
@@ -154,7 +150,7 @@ public class TaskTest extends TestLogger {
                         .setInvokable(InvokableBlockingInRestore.class)
                         .build(Executors.directExecutor());
         task.startTaskThread();
-        awaitLatch.await();
+        awaitInvokableLatch(task);
         task.cancelExecution();
         task.getExecutingThread().join();
         assertTrue(wasCleanedUp);
@@ -445,7 +441,7 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // wait till the task is in restore
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.cancelExecution();
         assertTrue(
@@ -475,7 +471,7 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // wait till the task is in invoke
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.cancelExecution();
         assertTrue(
@@ -506,7 +502,7 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // wait till the task is in invoke
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.failExternally(new Exception(RESTORE_EXCEPTION_MSG));
 
@@ -534,7 +530,7 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // wait till the task is in invoke
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.failExternally(new Exception("test"));
 
@@ -587,13 +583,13 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // wait till the task is in invoke
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.cancelExecution();
         assertEquals(ExecutionState.CANCELING, task.getExecutionState());
 
         // this causes an exception
-        triggerLatch.trigger();
+        triggerInvokableLatch(task);
 
         task.getExecutingThread().join();
 
@@ -620,13 +616,13 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // wait till the task is in invoke
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.failExternally(new Exception("external"));
         assertEquals(ExecutionState.FAILED, task.getExecutionState());
 
         // this causes an exception
-        triggerLatch.trigger();
+        triggerInvokableLatch(task);
 
         task.getExecutingThread().join();
 
@@ -647,11 +643,12 @@ public class TaskTest extends TestLogger {
                         .setInvokable(InvokableWithCancelTaskExceptionInInvoke.class)
                         .build(Executors.directExecutor());
 
-        // Cause CancelTaskException.
-        triggerLatch.trigger();
+        task.startTaskThread();
 
-        task.run();
+        // Cause CancelTaskException.
+        triggerInvokableLatch(task);
 
+        task.getExecutingThread().join();
         assertEquals(ExecutionState.CANCELED, task.getExecutionState());
     }
 
@@ -665,14 +662,14 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // Wait till the task is in invoke.
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.failExternally(new Exception("external"));
         assertEquals(ExecutionState.FAILED, task.getExecutionState());
 
         // Either we cause the CancelTaskException or the TaskCanceler
         // by interrupting the invokable.
-        triggerLatch.trigger();
+        triggerInvokableLatch(task);
 
         task.getExecutingThread().join();
 
@@ -834,7 +831,7 @@ public class TaskTest extends TestLogger {
 
             try {
                 task.startTaskThread();
-                awaitLatch.await();
+                awaitInvokableLatch(task);
 
                 CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
                 when(partitionChecker.requestPartitionProducerState(
@@ -877,7 +874,7 @@ public class TaskTest extends TestLogger {
 
             try {
                 task.startTaskThread();
-                awaitLatch.await();
+                awaitInvokableLatch(task);
 
                 CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
                 when(partitionChecker.requestPartitionProducerState(
@@ -927,7 +924,7 @@ public class TaskTest extends TestLogger {
 
         task.startTaskThread();
 
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.cancelExecution();
         task.getExecutingThread().join();
@@ -955,7 +952,7 @@ public class TaskTest extends TestLogger {
 
         task.startTaskThread();
 
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         task.cancelExecution();
         task.getExecutingThread().join();
@@ -987,7 +984,7 @@ public class TaskTest extends TestLogger {
         try {
             task.startTaskThread();
 
-            awaitLatch.await();
+            awaitInvokableLatch(task);
 
             task.cancelExecution();
 
@@ -996,7 +993,7 @@ public class TaskTest extends TestLogger {
             assertThat(fatalError, is(notNullValue()));
         } finally {
             // Interrupt again to clean up Thread
-            triggerLatch.trigger();
+            triggerInvokableLatch(task);
             task.getExecutingThread().interrupt();
             task.getExecutingThread().join();
         }
@@ -1016,31 +1013,33 @@ public class TaskTest extends TestLogger {
         config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
         config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
 
-        final Task task =
-                spy(
-                        createTaskBuilder()
-                                .setInvokable(InvokableBlockingWithTrigger.class)
-                                .setTaskManagerConfig(config)
-                                .setTaskManagerActions(taskManagerActions)
-                                .build(Executors.directExecutor()));
+        // We need to remember the original object since all changes in  `startTaskThread` applies
+        // to it rather than to spy object.
+        Task task =
+                createTaskBuilder()
+                        .setInvokable(InvokableBlockingWithTrigger.class)
+                        .setTaskManagerConfig(config)
+                        .setTaskManagerActions(taskManagerActions)
+                        .build(Executors.directExecutor());
+        final Task spyTask = spy(task);
 
         final Class<OutOfMemoryError> fatalErrorType = OutOfMemoryError.class;
         doThrow(fatalErrorType)
-                .when(task)
+                .when(spyTask)
                 .cancelOrFailAndCancelInvokableInternal(eq(ExecutionState.CANCELING), eq(null));
 
         try {
-            task.startTaskThread();
+            spyTask.startTaskThread();
 
-            awaitLatch.await();
+            awaitInvokableLatch(task);
 
-            task.cancelExecution();
+            spyTask.cancelExecution();
 
             // wait for the notification of notifyFatalError
             final Throwable fatalError = fatalErrorFuture.join();
             assertThat(fatalError, instanceOf(fatalErrorType));
         } finally {
-            triggerLatch.trigger();
+            triggerInvokableLatch(task);
         }
     }
 
@@ -1070,7 +1069,7 @@ public class TaskTest extends TestLogger {
 
         task.startTaskThread();
 
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         assertEquals(
                 executionConfig.getTaskCancellationInterval(), task.getTaskCancellationInterval());
@@ -1093,11 +1092,11 @@ public class TaskTest extends TestLogger {
         task.startTaskThread();
 
         // wait till the task is in invoke
-        awaitLatch.await();
+        awaitInvokableLatch(task);
 
         assertFalse(task.getTerminationFuture().isDone());
 
-        triggerLatch.trigger();
+        triggerInvokableLatch(task);
 
         task.getExecutingThread().join();
 
@@ -1162,7 +1161,7 @@ public class TaskTest extends TestLogger {
 
         task.startTaskThread();
         try {
-            awaitLatch.await();
+            awaitInvokableLatch(task);
             assertEquals(ExecutionState.RUNNING, task.getExecutionState());
 
             assertCheckpointDeclined(
@@ -1181,7 +1180,7 @@ public class TaskTest extends TestLogger {
                     InvokableDeclingingCheckpoints.TRIGGERING_FAILED_CHECKPOINT_ID,
                     CheckpointFailureReason.TASK_FAILURE);
         } finally {
-            triggerLatch.trigger();
+            triggerInvokableLatch(task);
             task.getExecutingThread().join();
         }
         assertEquals(ExecutionState.FINISHED, task.getTerminationFuture().getNow(null));
@@ -1211,6 +1210,32 @@ public class TaskTest extends TestLogger {
         testCheckpointResponder.clear();
     }
 
+    private TaskInvokable waitForInvokable(Task task) throws Exception {
+        waitUntilCondition(() -> task.getInvokable() != null, 10L);
+
+        return task.getInvokable();
+    }
+
+    private void awaitInvokableLatch(Task task) throws Exception {
+        TaskInvokable taskInvokable = waitForInvokable(task);
+        if (!(taskInvokable instanceof AwaitLatchInvokable)) {
+            throw new Exception(
+                    "Invokable doesn't implement class - " + AwaitLatchInvokable.class.getName());
+        }
+
+        ((AwaitLatchInvokable) taskInvokable).await();
+    }
+
+    private void triggerInvokableLatch(Task task) throws Exception {
+        TaskInvokable taskInvokable = waitForInvokable(task);
+        if (!(taskInvokable instanceof TriggerLatchInvokable)) {
+            throw new Exception(
+                    "Invokable doesn't implement class - " + TriggerLatchInvokable.class.getName());
+        }
+
+        ((TriggerLatchInvokable) taskInvokable).trigger();
+    }
+
     // ------------------------------------------------------------------------
     //  customized TaskManagerActions
     // ------------------------------------------------------------------------
@@ -1343,7 +1368,7 @@ public class TaskTest extends TestLogger {
         public void cancel() {}
     }
 
-    private static class InvokableBlockingWithTrigger extends AbstractInvokable {
+    private static class InvokableBlockingWithTrigger extends TriggerLatchInvokable {
         public InvokableBlockingWithTrigger(Environment environment) {
             super(environment);
         }
@@ -1385,7 +1410,7 @@ public class TaskTest extends TestLogger {
         }
     }
 
-    private static final class InvokableBlockingInInvoke extends AbstractInvokable {
+    private static final class InvokableBlockingInInvoke extends AwaitLatchInvokable {
         public InvokableBlockingInInvoke(Environment environment) {
             super(environment);
         }
@@ -1403,7 +1428,7 @@ public class TaskTest extends TestLogger {
         }
     }
 
-    private static final class InvokableBlockingInRestore extends AbstractInvokable {
+    private static final class InvokableBlockingInRestore extends AwaitLatchInvokable {
         public InvokableBlockingInRestore(Environment environment) {
             super(environment);
         }
@@ -1431,7 +1456,7 @@ public class TaskTest extends TestLogger {
     }
 
     /** {@link AbstractInvokable} which throws {@link RuntimeException} on invoke. */
-    public static final class InvokableWithExceptionOnTrigger extends AbstractInvokable {
+    public static final class InvokableWithExceptionOnTrigger extends TriggerLatchInvokable {
         public InvokableWithExceptionOnTrigger(Environment environment) {
             super(environment);
         }
@@ -1444,23 +1469,9 @@ public class TaskTest extends TestLogger {
         }
     }
 
-    private static void awaitTriggerLatch() {
-        awaitLatch.trigger();
-
-        // make sure that the interrupt call does not
-        // grab us out of the lock early
-        while (true) {
-            try {
-                triggerLatch.await();
-                break;
-            } catch (InterruptedException e) {
-                // fall through the loop
-            }
-        }
-    }
-
     /** {@link AbstractInvokable} which throws {@link CancelTaskException} on invoke. */
-    public static final class InvokableWithCancelTaskExceptionInInvoke extends AbstractInvokable {
+    public static final class InvokableWithCancelTaskExceptionInInvoke
+            extends TriggerLatchInvokable {
         public InvokableWithCancelTaskExceptionInInvoke(Environment environment) {
             super(environment);
         }
@@ -1474,7 +1485,7 @@ public class TaskTest extends TestLogger {
     }
 
     /** {@link AbstractInvokable} which blocks in cancel. */
-    public static final class InvokableBlockingInCancel extends AbstractInvokable {
+    public static final class InvokableBlockingInCancel extends TriggerLatchInvokable {
         public InvokableBlockingInCancel(Environment environment) {
             super(environment);
         }
@@ -1506,7 +1517,7 @@ public class TaskTest extends TestLogger {
 
     /** {@link AbstractInvokable} which blocks in cancel and is interruptible. */
     public static final class InvokableInterruptibleSharedLockInInvokeAndCancel
-            extends AbstractInvokable {
+            extends TriggerLatchInvokable {
         private final Object lock = new Object();
 
         public InvokableInterruptibleSharedLockInInvokeAndCancel(Environment environment) {
@@ -1515,9 +1526,11 @@ public class TaskTest extends TestLogger {
 
         @Override
         public void invoke() throws Exception {
-            synchronized (lock) {
-                awaitLatch.trigger();
-                wait();
+            while (!triggerLatch.isTriggered()) {
+                synchronized (lock) {
+                    awaitLatch.trigger();
+                    lock.wait();
+                }
             }
         }
 
@@ -1531,7 +1544,7 @@ public class TaskTest extends TestLogger {
     }
 
     /** {@link AbstractInvokable} which blocks in cancel and is not interruptible. */
-    public static final class InvokableUnInterruptibleBlockingInvoke extends AbstractInvokable {
+    public static final class InvokableUnInterruptibleBlockingInvoke extends TriggerLatchInvokable {
         public InvokableUnInterruptibleBlockingInvoke(Environment environment) {
             super(environment);
         }
@@ -1564,4 +1577,45 @@ public class TaskTest extends TestLogger {
             super(cause);
         }
     }
+
+    private abstract static class AwaitLatchInvokable extends AbstractInvokable {
+
+        final OneShotLatch awaitLatch = new OneShotLatch();
+
+        public AwaitLatchInvokable(Environment environment) {
+            super(environment);
+        }
+
+        void await() throws InterruptedException {
+            awaitLatch.await();
+        }
+    }
+
+    private abstract static class TriggerLatchInvokable extends AwaitLatchInvokable {
+
+        final OneShotLatch triggerLatch = new OneShotLatch();
+
+        public TriggerLatchInvokable(Environment environment) {
+            super(environment);
+        }
+
+        void trigger() {
+            triggerLatch.trigger();
+        }
+
+        void awaitTriggerLatch() {
+            awaitLatch.trigger();
+
+            // make sure that the interrupt call does not
+            // grab us out of the lock early
+            while (true) {
+                try {
+                    triggerLatch.await();
+                    break;
+                } catch (InterruptedException e) {
+                    // fall through the loop
+                }
+            }
+        }
+    }
 }


[flink] 03/03: [hotfix][runtime] Check style TaskTest fix.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 170641285998795dd071cbf9e75a0ee83e5d1c40
Author: Anton Kalashnikov <ak...@gmail.com>
AuthorDate: Thu Jan 26 17:39:19 2023 +0100

    [hotfix][runtime] Check style TaskTest fix.
---
 .../apache/flink/runtime/taskmanager/TaskTest.java | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 8e728bb87d7..057d6f3dd49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -82,11 +82,11 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.eq;
@@ -931,7 +931,7 @@ public class TaskTest extends TestLogger {
     }
 
     /**
-     * The invoke() method holds a lock (trigger awaitLatch after acquisition) and cancel cannot
+     * The 'invoke' method holds a lock (trigger awaitLatch after acquisition) and cancel cannot
      * complete because it also tries to acquire the same lock. This is resolved by the watch dog,
      * no fatal error.
      */
@@ -959,7 +959,7 @@ public class TaskTest extends TestLogger {
     }
 
     /**
-     * The invoke() method blocks infinitely, but cancel() does not block. Only resolved by a fatal
+     * The 'invoke' method blocks infinitely, but cancel() does not block. Only resolved by a fatal
      * error.
      */
     @Test
@@ -1150,7 +1150,7 @@ public class TaskTest extends TestLogger {
         TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
         final Task task =
                 createTaskBuilder()
-                        .setInvokable(InvokableDeclingingCheckpoints.class)
+                        .setInvokable(InvokableDecliningCheckpoints.class)
                         .setCheckpointResponder(testCheckpointResponder)
                         .build(Executors.directExecutor());
         assertCheckpointDeclined(
@@ -1167,17 +1167,17 @@ public class TaskTest extends TestLogger {
             assertCheckpointDeclined(
                     task,
                     testCheckpointResponder,
-                    InvokableDeclingingCheckpoints.REJECTED_EXECUTION_CHECKPOINT_ID,
+                    InvokableDecliningCheckpoints.REJECTED_EXECUTION_CHECKPOINT_ID,
                     CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING);
             assertCheckpointDeclined(
                     task,
                     testCheckpointResponder,
-                    InvokableDeclingingCheckpoints.THROWING_CHECKPOINT_ID,
+                    InvokableDecliningCheckpoints.THROWING_CHECKPOINT_ID,
                     CheckpointFailureReason.TASK_FAILURE);
             assertCheckpointDeclined(
                     task,
                     testCheckpointResponder,
-                    InvokableDeclingingCheckpoints.TRIGGERING_FAILED_CHECKPOINT_ID,
+                    InvokableDecliningCheckpoints.TRIGGERING_FAILED_CHECKPOINT_ID,
                     CheckpointFailureReason.TASK_FAILURE);
         } finally {
             triggerInvokableLatch(task);
@@ -1246,7 +1246,7 @@ public class TaskTest extends TestLogger {
 
         @Override
         public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
-            queue.offer(taskExecutionState);
+            assertTrue(queue.offer(taskExecutionState));
         }
 
         private void validateListenerMessage(ExecutionState state, Task task, Throwable error) {
@@ -1345,7 +1345,7 @@ public class TaskTest extends TestLogger {
         }
 
         @Override
-        public void invoke() throws Exception {}
+        public void invoke() {}
 
         @Override
         public void cleanUp(Throwable throwable) throws Exception {
@@ -1381,12 +1381,12 @@ public class TaskTest extends TestLogger {
         }
     }
 
-    private static class InvokableDeclingingCheckpoints extends InvokableBlockingWithTrigger {
+    private static class InvokableDecliningCheckpoints extends InvokableBlockingWithTrigger {
         public static final int REJECTED_EXECUTION_CHECKPOINT_ID = 2;
         public static final int THROWING_CHECKPOINT_ID = 3;
         public static final int TRIGGERING_FAILED_CHECKPOINT_ID = 4;
 
-        public InvokableDeclingingCheckpoints(Environment environment) {
+        public InvokableDecliningCheckpoints(Environment environment) {
             super(environment);
         }
 
@@ -1421,6 +1421,7 @@ public class TaskTest extends TestLogger {
 
             // block forever
             synchronized (this) {
+                //noinspection InfiniteLoopStatement
                 while (true) {
                     wait();
                 }
@@ -1439,6 +1440,7 @@ public class TaskTest extends TestLogger {
 
             // block forever
             synchronized (this) {
+                //noinspection InfiniteLoopStatement
                 while (true) {
                     wait();
                 }
@@ -1446,7 +1448,7 @@ public class TaskTest extends TestLogger {
         }
 
         @Override
-        public void invoke() throws Exception {}
+        public void invoke() {}
 
         @Override
         public void cleanUp(Throwable throwable) throws Exception {