You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/25 13:12:26 UTC
[flink] 03/04: [FLINK-30100][checkpoint][JUnit5 Migration] Migrate checkpoint coordinator related tests under flink-runtime module to junit5
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cf78eb1630368e9f7b2b476f4714fbc65056261c
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Wed Nov 23 21:46:34 2022 +0800
[FLINK-30100][checkpoint][JUnit5 Migration] Migrate checkpoint coordinator related tests under flink-runtime module to junit5
---
.../CheckpointCoordinatorFailureTest.java | 45 +-
.../CheckpointCoordinatorMasterHooksTest.java | 82 +-
.../CheckpointCoordinatorRestoringTest.java | 181 ++--
.../checkpoint/CheckpointCoordinatorTest.java | 922 +++++++++++----------
.../CheckpointCoordinatorTriggeringTest.java | 233 +++---
.../checkpoint/CheckpointFailureManagerTest.java | 30 +-
.../checkpoint/CheckpointIDCounterTestBase.java | 2 +-
.../checkpoint/CheckpointStateRestoreTest.java | 59 +-
8 files changed, 779 insertions(+), 775 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d9a4a57c691..2538072c516 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -40,14 +40,14 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collections;
import java.util.List;
@@ -57,30 +57,26 @@ import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.assertStatsMetrics;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** Tests for failure of checkpoint coordinator. */
-public class CheckpointCoordinatorFailureTest extends TestLogger {
+class CheckpointCoordinatorFailureTest extends TestLogger {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
/**
* Tests that a failure while storing a completed checkpoint in the completed checkpoint store
* will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
*/
@Test
- public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+ void testFailingCompletedCheckpointStoreAdd() throws Exception {
JobVertexID jobVertexId = new JobVertexID();
final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor =
@@ -107,12 +103,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertThat(coord.getNumberOfPendingCheckpoints()).isOne();
PendingCheckpoint pendingCheckpoint =
coord.getPendingCheckpoints().values().iterator().next();
- assertFalse(pendingCheckpoint.isDisposed());
+ assertThat(pendingCheckpoint.isDisposed()).isFalse();
final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
@@ -172,7 +168,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
}
// make sure that the pending checkpoint has been discarded after we could not complete it
- assertTrue(pendingCheckpoint.isDisposed());
+ assertThat(pendingCheckpoint.isDisposed()).isTrue();
// make sure that the subtask state has been discarded after we could not complete it.
verify(operatorSubtaskState).discardState();
@@ -187,12 +183,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
}
@Test
- public void testCleanupForGenericFailure() throws Exception {
+ void testCleanupForGenericFailure() throws Exception {
testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
}
@Test
- public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
+ void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
}
@@ -264,9 +260,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
"unknown location");
fail("CheckpointException should have been thrown.");
} catch (CheckpointException e) {
- assertThat(
- e.getCheckpointFailureReason(),
- is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
+ assertThat(e.getCheckpointFailureReason())
+ .isEqualTo(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE);
}
AbstractCheckpointStats actualStats =
@@ -275,11 +270,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
.getHistory()
.getCheckpointById(checkpointIDCounter.getLast());
- assertEquals(checkpointIDCounter.getLast(), actualStats.getCheckpointId());
- assertEquals(CheckpointStatsStatus.FAILED, actualStats.getStatus());
+ assertThat(actualStats.getCheckpointId()).isEqualTo(checkpointIDCounter.getLast());
+ assertThat(actualStats.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats);
- assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
+ assertThat(cleanupCallCount.get()).isEqualTo(expectedCleanupCalls);
}
private static final class FailingCompletedCheckpointStore
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 90e60d52eeb..d76208e0d90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -31,13 +31,13 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -53,12 +53,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.any;
@@ -69,11 +65,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** Tests for the user-defined hooks that the checkpoint coordinator can call. */
-public class CheckpointCoordinatorMasterHooksTest {
+class CheckpointCoordinatorMasterHooksTest {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
// ------------------------------------------------------------------------
// hook registration
@@ -81,7 +77,7 @@ public class CheckpointCoordinatorMasterHooksTest {
/** This method tests that hooks with the same identifier are not registered multiple times. */
@Test
- public void testDeduplicateOnRegister() throws Exception {
+ void testDeduplicateOnRegister() throws Exception {
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(new JobVertexID())
@@ -97,14 +93,14 @@ public class CheckpointCoordinatorMasterHooksTest {
MasterTriggerRestoreHook<?> hook3 = mock(MasterTriggerRestoreHook.class);
when(hook3.getIdentifier()).thenReturn("anotherId");
- assertTrue(cc.addMasterHook(hook1));
- assertFalse(cc.addMasterHook(hook2));
- assertTrue(cc.addMasterHook(hook3));
+ assertThat(cc.addMasterHook(hook1)).isTrue();
+ assertThat(cc.addMasterHook(hook2)).isFalse();
+ assertThat(cc.addMasterHook(hook3)).isTrue();
}
/** Test that validates correct exceptions when supplying hooks with invalid IDs. */
@Test
- public void testNullOrInvalidId() throws Exception {
+ void testNullOrInvalidId() throws Exception {
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(new JobVertexID())
@@ -134,7 +130,7 @@ public class CheckpointCoordinatorMasterHooksTest {
}
@Test
- public void testHookReset() throws Exception {
+ void testHookReset() throws Exception {
final String id1 = "id1";
final String id2 = "id2";
@@ -169,7 +165,7 @@ public class CheckpointCoordinatorMasterHooksTest {
// ------------------------------------------------------------------------
@Test
- public void testHooksAreCalledOnTrigger() throws Exception {
+ void testHooksAreCalledOnTrigger() throws Exception {
final String id1 = "id1";
final String id2 = "id2";
@@ -215,8 +211,8 @@ public class CheckpointCoordinatorMasterHooksTest {
// trigger a checkpoint
final CompletableFuture<CompletedCheckpoint> checkpointFuture = cc.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
- assertEquals(1, cc.getNumberOfPendingCheckpoints());
+ assertThat(checkpointFuture).isNotCompletedExceptionally();
+ assertThat(cc.getNumberOfPendingCheckpoints()).isOne();
verify(statefulHook1, times(1))
.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class));
@@ -236,21 +232,21 @@ public class CheckpointCoordinatorMasterHooksTest {
cc.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID, checkpointId),
"Unknown location");
- assertEquals(0, cc.getNumberOfPendingCheckpoints());
+ assertThat(cc.getNumberOfPendingCheckpoints()).isZero();
- assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(cc.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();
final Collection<MasterState> masterStates = chk.getMasterHookStates();
- assertEquals(2, masterStates.size());
+ assertThat(masterStates.size()).isEqualTo(2);
for (MasterState ms : masterStates) {
if (ms.name().equals(id1)) {
- assertArrayEquals(state1serialized, ms.bytes());
- assertEquals(StringSerializer.VERSION, ms.version());
+ assertThat(ms.bytes()).isEqualTo(state1serialized);
+ assertThat(ms.version()).isEqualTo(StringSerializer.VERSION);
} else if (ms.name().equals(id2)) {
- assertArrayEquals(state2serialized, ms.bytes());
- assertEquals(LongSerializer.VERSION, ms.version());
+ assertThat(ms.bytes()).isEqualTo(state2serialized);
+ assertThat(ms.version()).isEqualTo(LongSerializer.VERSION);
} else {
fail("unrecognized state name: " + ms.name());
}
@@ -258,7 +254,7 @@ public class CheckpointCoordinatorMasterHooksTest {
}
@Test
- public void testHooksAreCalledOnRestore() throws Exception {
+ void testHooksAreCalledOnRestore() throws Exception {
final String id1 = "id1";
final String id2 = "id2";
@@ -326,7 +322,7 @@ public class CheckpointCoordinatorMasterHooksTest {
}
@Test
- public void checkUnMatchedStateOnRestore() throws Exception {
+ void checkUnMatchedStateOnRestore() throws Exception {
final String id1 = "id1";
final String id2 = "id2";
@@ -403,7 +399,7 @@ public class CheckpointCoordinatorMasterHooksTest {
* are called
*/
@Test
- public void ensureRegisteredAtHookTime() throws Exception {
+ void ensureRegisteredAtHookTime() throws Exception {
final String id = "id";
// create the checkpoint coordinator
@@ -425,10 +421,10 @@ public class CheckpointCoordinatorMasterHooksTest {
@Override
public CompletableFuture<Void> answer(InvocationOnMock invocation)
throws Throwable {
- assertEquals(1, cc.getNumberOfPendingCheckpoints());
+ assertThat(cc.getNumberOfPendingCheckpoints()).isOne();
long checkpointId = (Long) invocation.getArguments()[0];
- assertNotNull(cc.getPendingCheckpoints().get(checkpointId));
+ assertThat(cc.getPendingCheckpoints()).containsKey(checkpointId);
return null;
}
});
@@ -438,7 +434,7 @@ public class CheckpointCoordinatorMasterHooksTest {
// trigger a checkpoint
final CompletableFuture<CompletedCheckpoint> checkpointFuture = cc.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ assertThat(checkpointFuture).isNotCompletedExceptionally();
}
// ------------------------------------------------------------------------
@@ -446,22 +442,22 @@ public class CheckpointCoordinatorMasterHooksTest {
// ------------------------------------------------------------------------
@Test
- public void testSerializationFailsOnTrigger() {}
+ void testSerializationFailsOnTrigger() {}
@Test
- public void testHookCallFailsOnTrigger() {}
+ void testHookCallFailsOnTrigger() {}
@Test
- public void testDeserializationFailsOnRestore() {}
+ void testDeserializationFailsOnRestore() {}
@Test
- public void testHookCallFailsOnRestore() {}
+ void testHookCallFailsOnRestore() {}
@Test
- public void testTypeIncompatibleWithSerializerOnStore() {}
+ void testTypeIncompatibleWithSerializerOnStore() {}
@Test
- public void testTypeIncompatibleWithHookOnRestore() {}
+ void testTypeIncompatibleWithHookOnRestore() {}
// ------------------------------------------------------------------------
// utilities
@@ -534,8 +530,8 @@ public class CheckpointCoordinatorMasterHooksTest {
@Override
public Long deserialize(int version, byte[] serialized) throws IOException {
- assertEquals(VERSION, version);
- assertEquals(8, serialized.length);
+ assertThat(version).isEqualTo(VERSION);
+ assertThat(serialized.length).isEqualTo(8);
return ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN).getLong(0);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index a26e7ea5059..acdcef2fccf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -41,7 +41,8 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
@@ -49,12 +50,10 @@ import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.ArrayList;
@@ -84,10 +83,8 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUt
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.verifyStateRestore;
import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@@ -95,11 +92,11 @@ import static org.mockito.Mockito.verify;
/** Tests for restoring checkpoint. */
@SuppressWarnings("checkstyle:EmptyLineSeparator")
-public class CheckpointCoordinatorRestoringTest extends TestLogger {
+class CheckpointCoordinatorRestoringTest extends TestLogger {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
@@ -175,10 +172,10 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
- @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @TempDir private java.nio.file.Path tmpFolder;
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() throws Exception {
manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
}
@@ -187,7 +184,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
* {@link Execution} upon recovery.
*/
@Test
- public void testRestoreLatestCheckpointedState() throws Exception {
+ void testRestoreLatestCheckpointedState() throws Exception {
final List<TestingVertex> vertices =
Arrays.asList(
new TestingVertex(new JobVertexID(), 3, 42),
@@ -215,7 +212,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
// we should have a single pending checkpoint
- assertEquals(1, coordinator.getPendingCheckpoints().size());
+ assertThat(coordinator.getPendingCheckpoints().size()).isOne();
final long checkpointId =
Iterables.getOnlyElement(coordinator.getPendingCheckpoints().keySet());
@@ -228,7 +225,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
final List<CompletedCheckpoint> completedCheckpoints =
coordinator.getSuccessfulCheckpoints();
- assertEquals(1, completedCheckpoints.size());
+ assertThat(completedCheckpoints.size()).isOne();
// shutdown the store
store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
@@ -256,7 +253,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.map(TestingVertex::getId)
.map(executionGraph::getJobVertex)
.collect(Collectors.toSet());
- assertTrue(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false));
+ assertThat(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false))
+ .isTrue();
// validate that all shared states are registered again after the recovery.
for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
@@ -277,12 +275,12 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
}
@Test
- public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
+ void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
testRestoreLatestCheckpointedStateWithChangingParallelism(false);
}
@Test
- public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
+ void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
testRestoreLatestCheckpointedStateWithChangingParallelism(true);
}
@@ -324,7 +322,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
coord.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(1, coord.getPendingCheckpoints().size());
+ assertThat(coord.getPendingCheckpoints().size()).isOne();
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
List<KeyGroupRange> keyGroupPartitions1 =
@@ -411,7 +409,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
- assertEquals(1, completedCheckpoints.size());
+ assertThat(completedCheckpoints.size()).isOne();
List<KeyGroupRange> newKeyGroupPartitions2 =
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
@@ -436,7 +434,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Set<ExecutionJobVertex> tasks = new HashSet<>();
tasks.add(newJobVertex1);
tasks.add(newJobVertex2);
- assertTrue(newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
+ assertThat(newCoord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
// verify the restored state
verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
@@ -458,7 +456,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.getTaskVertices()[i]
.getCurrentExecutionAttempt()
.getTaskRestore();
- Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+ assertThat(taskRestore.getRestoreCheckpointId()).isOne();
TaskStateSnapshot taskStateHandles = taskRestore.getTaskStateSnapshot();
final int headOpIndex = operatorIDs.size() - 1;
@@ -496,8 +494,9 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
*
* @throws Exception
*/
- @Test(expected = IllegalStateException.class)
- public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
+ // @Test(expected = IllegalStateException.class)
+ @Test
+ void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
final JobVertexID jobVertexID1 = new JobVertexID();
final JobVertexID jobVertexID2 = new JobVertexID();
int parallelism1 = 3;
@@ -526,7 +525,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
coord.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(1, coord.getPendingCheckpoints().size());
+ assertThat(coord.getPendingCheckpoints().size()).isOne();
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
List<KeyGroupRange> keyGroupPartitions1 =
@@ -580,7 +579,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
- assertEquals(1, completedCheckpoints.size());
+ assertThat(completedCheckpoints.size()).isOne();
int newMaxParallelism1 = 20;
int newMaxParallelism2 = 42;
@@ -604,23 +603,23 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Set<ExecutionJobVertex> tasks = new HashSet<>();
tasks.add(newJobVertex1);
tasks.add(newJobVertex2);
- assertTrue(newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
-
- fail("The restoration should have failed because the max parallelism changed.");
+ assertThatThrownBy(() -> newCoord.restoreLatestCheckpointedStateToAll(tasks, false))
+ .as("The restoration should have failed because the max parallelism changed.")
+ .isInstanceOf(IllegalStateException.class);
}
@Test
- public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
+ void testStateRecoveryWhenTopologyChangeOut() throws Exception {
testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
}
@Test
- public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
+ void testStateRecoveryWhenTopologyChangeIn() throws Exception {
testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
}
@Test
- public void testStateRecoveryWhenTopologyChange() throws Exception {
+ void testStateRecoveryWhenTopologyChange() throws Exception {
testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
}
@@ -801,14 +800,14 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.getTaskVertices()[i]
.getCurrentExecutionAttempt()
.getTaskRestore();
- Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
+ assertThat(taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
OperatorSubtaskState headOpState =
stateSnapshot.getSubtaskStateByOperatorID(
operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID());
- assertTrue(headOpState.getManagedKeyedState().isEmpty());
- assertTrue(headOpState.getRawKeyedState().isEmpty());
+ assertThat(headOpState.getManagedKeyedState()).isEmpty();
+ assertThat(headOpState.getRawKeyedState()).isEmpty();
// operator5
{
@@ -817,8 +816,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
stateSnapshot.getSubtaskStateByOperatorID(
operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
- assertTrue(opState.getManagedOperatorState().isEmpty());
- assertTrue(opState.getRawOperatorState().isEmpty());
+ assertThat(opState.getManagedOperatorState()).isEmpty();
+ assertThat(opState.getRawOperatorState()).isEmpty();
}
// operator1
{
@@ -834,18 +833,20 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Collection<OperatorStateHandle> managedOperatorState =
opState.getManagedOperatorState();
- assertEquals(1, managedOperatorState.size());
- assertTrue(
- CommonTestUtils.isStreamContentEqual(
- expectedManagedOpState.openInputStream(),
- managedOperatorState.iterator().next().openInputStream()));
+ assertThat(managedOperatorState.size()).isOne();
+ assertThat(
+ CommonTestUtils.isStreamContentEqual(
+ expectedManagedOpState.openInputStream(),
+ managedOperatorState.iterator().next().openInputStream()))
+ .isTrue();
Collection<OperatorStateHandle> rawOperatorState = opState.getRawOperatorState();
- assertEquals(1, rawOperatorState.size());
- assertTrue(
- CommonTestUtils.isStreamContentEqual(
- expectedRawOpState.openInputStream(),
- rawOperatorState.iterator().next().openInputStream()));
+ assertThat(rawOperatorState.size()).isOne();
+ assertThat(
+ CommonTestUtils.isStreamContentEqual(
+ expectedRawOpState.openInputStream(),
+ rawOperatorState.iterator().next().openInputStream()))
+ .isTrue();
}
// operator2
{
@@ -861,18 +862,20 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Collection<OperatorStateHandle> managedOperatorState =
opState.getManagedOperatorState();
- assertEquals(1, managedOperatorState.size());
- assertTrue(
- CommonTestUtils.isStreamContentEqual(
- expectedManagedOpState.openInputStream(),
- managedOperatorState.iterator().next().openInputStream()));
+ assertThat(managedOperatorState.size()).isOne();
+ assertThat(
+ CommonTestUtils.isStreamContentEqual(
+ expectedManagedOpState.openInputStream(),
+ managedOperatorState.iterator().next().openInputStream()))
+ .isTrue();
Collection<OperatorStateHandle> rawOperatorState = opState.getRawOperatorState();
- assertEquals(1, rawOperatorState.size());
- assertTrue(
- CommonTestUtils.isStreamContentEqual(
- expectedRawOpState.openInputStream(),
- rawOperatorState.iterator().next().openInputStream()));
+ assertThat(rawOperatorState.size()).isOne();
+ assertThat(
+ CommonTestUtils.isStreamContentEqual(
+ expectedRawOpState.openInputStream(),
+ rawOperatorState.iterator().next().openInputStream()))
+ .isTrue();
}
}
@@ -890,7 +893,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.getTaskVertices()[i]
.getCurrentExecutionAttempt()
.getTaskRestore();
- Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
+ assertThat(taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
// operator 3
@@ -918,8 +921,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
OperatorSubtaskState opState =
stateSnapshot.getSubtaskStateByOperatorID(
operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
- assertTrue(opState.getManagedOperatorState().isEmpty());
- assertTrue(opState.getRawOperatorState().isEmpty());
+ assertThat(opState.getManagedOperatorState()).isEmpty();
+ assertThat(opState.getRawOperatorState()).isEmpty();
}
KeyGroupsStateHandle originalKeyedStateBackend =
@@ -959,7 +962,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
}
@Test
- public void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
+ void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
// given: Operator with not empty states.
final JobVertexID jobVertexID = new JobVertexID();
int parallelism1 = 3;
@@ -989,7 +992,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
coord.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(1, coord.getPendingCheckpoints().size());
+ assertThat(coord.getPendingCheckpoints().size()).isOne();
long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
List<KeyGroupRange> keyGroupPartitions1 =
@@ -1037,12 +1040,12 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
}
- assertEquals(1, coord.getSuccessfulCheckpoints().size());
+ assertThat(coord.getSuccessfulCheckpoints().size()).isOne();
// when: Restore latest checkpoint without in-flight data.
Set<ExecutionJobVertex> tasks = new HashSet<>();
tasks.add(jobVertex);
- assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+ assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
// then: All states should be restored successfully except InputChannel and
// ResultSubpartition which should be ignored.
@@ -1050,25 +1053,25 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
for (int i = 0; i < jobVertex.getParallelism(); i++) {
JobManagerTaskRestore taskRestore =
jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
- Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+ assertThat(taskRestore.getRestoreCheckpointId()).isOne();
TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
OperatorSubtaskState operatorState =
stateSnapshot.getSubtaskStateByOperatorID(
OperatorID.fromJobVertexID(jobVertexID));
- assertTrue(operatorState.getInputChannelState().isEmpty());
- assertTrue(operatorState.getResultSubpartitionState().isEmpty());
+ assertThat(operatorState.getInputChannelState()).isEmpty();
+ assertThat(operatorState.getResultSubpartitionState()).isEmpty();
- assertFalse(operatorState.getRawOperatorState().isEmpty());
- assertFalse(operatorState.getManagedOperatorState().isEmpty());
- assertFalse(operatorState.getRawKeyedState().isEmpty());
- assertFalse(operatorState.getManagedOperatorState().isEmpty());
+ assertThat(operatorState.getRawOperatorState()).isNotEmpty();
+ assertThat(operatorState.getManagedOperatorState()).isNotEmpty();
+ assertThat(operatorState.getRawKeyedState()).isNotEmpty();
+ assertThat(operatorState.getManagedOperatorState()).isNotEmpty();
}
}
@Test
- public void testRestoreFinishedStateWithoutInFlightData() throws Exception {
+ void testRestoreFinishedStateWithoutInFlightData() throws Exception {
// given: Operator with not empty states.
OperatorIDPair op1 = OperatorIDPair.generatedIDOnly(new OperatorID());
final JobVertexID jobVertexID = new JobVertexID();
@@ -1113,11 +1116,11 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.getCurrentExecutionAttempt()
.getTaskRestore()
.getTaskStateSnapshot();
- assertTrue(restoredState.isTaskDeployedAsFinished());
+ assertThat(restoredState.isTaskDeployedAsFinished()).isTrue();
}
@Test
- public void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
+ void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
final JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -1154,13 +1157,13 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.build(graph);
restoreCoordinator.restoreInitialCheckpointIfPresent(
new HashSet<>(graph.getAllVertices().values()));
- assertTrue(
- "The finished states should be checked when job is restored on startup",
- checked.get());
+ assertThat(checked.get())
+ .as("The finished states should be checked when job is restored on startup")
+ .isTrue();
}
@Test
- public void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
+ void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
final JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -1170,7 +1173,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
.build(graph);
- File savepointPath = tmpFolder.newFolder();
+ File savepointPath = TempDirUtils.newFolder(tmpFolder);
CompletableFuture<CompletedCheckpoint> savepointFuture =
coordinator.triggerSavepoint(
"file://" + savepointPath.getAbsolutePath(), SavepointFormatType.CANONICAL);
@@ -1186,7 +1189,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
.getAttemptId(),
pendingSavepointId),
"localhost");
- assertTrue(savepointFuture.isDone());
+ assertThat(savepointFuture).isDone();
BooleanValue checked = new BooleanValue(false);
CheckpointCoordinator restoreCoordinator =
@@ -1204,8 +1207,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
SavepointRestoreSettings.forPath(savepointFuture.get().getExternalPointer()),
graph.getAllVertices(),
getClass().getClassLoader());
- assertTrue(
- "The finished states should be checked when job is restored on startup",
- checked.get());
+ assertThat(checked.get())
+ .as("The finished states should be checked when job is restored on startup")
+ .isTrue();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 31767837f12..660671dfa8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -71,7 +71,8 @@ import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -82,12 +83,10 @@ import org.apache.flink.util.function.TriFunctionWithException;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import org.mockito.verification.VerificationMode;
import javax.annotation.Nullable;
@@ -103,6 +102,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
@@ -130,13 +130,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.cr
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Matchers.any;
@@ -151,14 +145,14 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** Tests for the checkpoint coordinator. */
-public class CheckpointCoordinatorTest extends TestLogger {
+class CheckpointCoordinatorTest extends TestLogger {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
@Test
- public void testSharedStateNotDiscaredOnAbort() throws Exception {
+ void testSharedStateNotDiscaredOnAbort() throws Exception {
JobVertexID v1 = new JobVertexID(), v2 = new JobVertexID();
ExecutionGraph graph =
@@ -184,9 +178,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
ackCheckpoint(1L, coordinator, v1, graph, metaState, privateState, sharedState);
declineCheckpoint(1L, coordinator, v2, graph);
- assertTrue(privateState.isDisposed());
- assertTrue(metaState.isDisposed());
- assertFalse(sharedState.isDisposed());
+ assertThat(privateState.isDisposed()).isTrue();
+ assertThat(metaState.isDisposed()).isTrue();
+ assertThat(sharedState.isDisposed()).isFalse();
cpFuture = coordinator.triggerCheckpoint(true);
manuallyTriggeredScheduledExecutor.triggerAll();
@@ -196,11 +190,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
ackCheckpoint(2L, coordinator, v2, graph, handle(), handle(), handle());
cpFuture.get();
- assertTrue(sharedState.isDisposed());
+ assertThat(sharedState.isDisposed()).isTrue();
}
@Test
- public void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
+ void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
testReportStatsAfterFailure(
1L,
(coordinator, execution, metrics) -> {
@@ -210,7 +204,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testCheckpointStatsUpdatedAfterFailure() throws Exception {
+ void testCheckpointStatsUpdatedAfterFailure() throws Exception {
testReportStatsAfterFailure(
1L,
(coordinator, execution, metrics) ->
@@ -305,9 +299,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
int subtasIdx,
CheckpointMetrics expected,
AbstractCheckpointStats actual) {
- assertEquals(checkpointId, actual.getCheckpointId());
- assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus());
- assertEquals(0, actual.getNumberOfAcknowledgedSubtasks());
+ assertThat(actual.getCheckpointId()).isEqualTo(checkpointId);
+ assertThat(actual.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
+ assertThat(actual.getNumberOfAcknowledgedSubtasks()).isZero();
assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
}
@@ -316,37 +310,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
int subtasIdx,
CheckpointMetrics expected,
AbstractCheckpointStats actual) {
- assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
+ assertThat(actual.getStateSize()).isEqualTo(expected.getTotalBytesPersisted());
SubtaskStateStats taskStats =
actual.getAllTaskStateStats().stream()
.filter(s -> s.getJobVertexId().equals(jobVertexID))
.findAny()
.get()
.getSubtaskStats()[subtasIdx];
- assertEquals(
- expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration());
- assertEquals(expected.getUnalignedCheckpoint(), taskStats.getUnalignedCheckpoint());
- assertEquals(expected.getAsyncDurationMillis(), taskStats.getAsyncCheckpointDuration());
- assertEquals(
- expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration());
- assertEquals(
- expected.getCheckpointStartDelayNanos() / 1_000_000,
- taskStats.getCheckpointStartDelay());
+ assertThat(taskStats.getAlignmentDuration())
+ .isEqualTo(expected.getAlignmentDurationNanos() / 1_000_000);
+ assertThat(taskStats.getUnalignedCheckpoint()).isEqualTo(expected.getUnalignedCheckpoint());
+ assertThat(taskStats.getAsyncCheckpointDuration())
+ .isEqualTo(expected.getAsyncDurationMillis());
+ assertThat(taskStats.getAlignmentDuration())
+ .isEqualTo(expected.getAlignmentDurationNanos() / 1_000_000);
+ assertThat(taskStats.getCheckpointStartDelay())
+ .isEqualTo(expected.getCheckpointStartDelayNanos() / 1_000_000);
}
private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
- @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @TempDir private java.nio.file.Path tmpFolder;
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() {
manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
}
@Test
- public void testScheduleTriggerRequestDuringShutdown() throws Exception {
+ void testScheduleTriggerRequestDuringShutdown() throws Exception {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
CheckpointCoordinator coordinator =
getCheckpointCoordinator(new ScheduledExecutorServiceAdapter(executor));
@@ -356,7 +350,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testMinCheckpointPause() throws Exception {
+ void testMinCheckpointPause() throws Exception {
// will use a different thread to allow checkpoint triggering before exiting from
// receiveAcknowledgeMessage
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@@ -407,7 +401,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new AcknowledgeCheckpoint(graph.getJobID(), attemptId, 1L),
TASK_MANAGER_LOCATION_INFO);
Thread.sleep(pause / 2);
- assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
+ assertThat(coordinator.getNumberOfPendingCheckpoints()).isZero();
// make sure that the 2nd request is eventually processed
while (coordinator.getNumberOfPendingCheckpoints() == 0) {
Thread.sleep(1);
@@ -421,7 +415,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
+ void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
// set up the coordinator and validate the initial state
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -433,24 +427,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
// nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this should not succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointFuture.isCompletedExceptionally());
+ assertThat(checkpointFuture).isCompletedExceptionally();
// still, nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
checkpointCoordinator.shutdown();
}
@Test
- public void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
+ void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -465,24 +459,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
.forEach(task -> task.getCurrentExecutionAttempt().markFinished());
// nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this should not succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointFuture.isCompletedExceptionally());
+ assertThat(checkpointFuture).isCompletedExceptionally();
// still, nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
checkpointCoordinator.shutdown();
}
@Test
- public void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
+ void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -508,19 +502,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
.build(graph);
// nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this will not fail because we allow checkpointing even with
// finished tasks
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isDone());
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ assertThat(checkpointFuture.isDone()).isFalse();
+ assertThat(checkpointFuture.isCompletedExceptionally()).isFalse();
// Triggering should succeed
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
PendingCheckpoint pendingCheckpoint =
checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
AbstractCheckpointStats checkpointStats =
@@ -528,7 +522,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
.createSnapshot()
.getHistory()
.getCheckpointById(pendingCheckpoint.getCheckpointID());
- assertEquals(3, checkpointStats.getNumberOfAcknowledgedSubtasks());
+ assertThat(checkpointStats.getNumberOfAcknowledgedSubtasks()).isEqualTo(3);
for (ExecutionVertex task :
Arrays.asList(
jobVertex1.getTaskVertices()[0],
@@ -536,14 +530,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
jobVertex2.getTaskVertices()[1])) {
// those tasks that are already finished are automatically marked as acknowledged
- assertNotNull(
- checkpointStats.getTaskStateStats(task.getJobvertexId())
- .getSubtaskStats()[task.getParallelSubtaskIndex()]);
+ assertThat(
+ checkpointStats.getTaskStateStats(task.getJobvertexId())
+ .getSubtaskStats()[task.getParallelSubtaskIndex()])
+ .isNotNull();
}
}
@Test
- public void testTasksFinishDuringTriggering() throws Exception {
+ void testTasksFinishDuringTriggering() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -603,21 +598,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
.build(graph);
// nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this will not fail because we allow checkpointing even with
// finished tasks
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointFuture.isCompletedExceptionally());
- assertTrue(checkpointAborted.get());
+ assertThat(checkpointFuture).isCompletedExceptionally();
+ assertThat(checkpointAborted.get()).isTrue();
}
@Test
- public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
- throws Exception {
+ void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
ExecutionGraph graph =
@@ -659,8 +653,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
TASK_MANAGER_LOCATION_INFO);
- assertFalse(checkpoint.isDisposed());
- assertFalse(checkpoint.areTasksFullyAcknowledged());
+ assertThat(checkpoint.isDisposed()).isFalse();
+ assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
// decline checkpoint from the other task
checkpointCoordinator.receiveDeclineMessage(
@@ -680,7 +674,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
+ void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
// create some mock Execution vertices that receive the checkpoint trigger messages
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -709,7 +703,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
+ void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
CheckpointCoordinator checkpointCoordinator =
setupCheckpointCoordinatorWithInactiveTasks(new IOExceptionCheckpointStorage());
@@ -737,7 +731,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
/** Tests that do not trigger checkpoint when IOException occurred. */
@Test
- public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
+ void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
// given: Checkpoint coordinator which fails on initializeCheckpointLocation.
TestFailJobCallback failureCallback = new TestFailJobCallback();
CheckpointStatsTracker statsTracker =
@@ -753,14 +747,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
// then: Failure manager should fail the job.
- assertEquals(1, failureCallback.getInvokeCounter());
+ assertThat(failureCallback.getInvokeCounter()).isOne();
// then: Should created PendingCheckpoint
- assertNotNull(statsTracker.getPendingCheckpointStats(1));
+ assertThat(statsTracker.getPendingCheckpointStats(1)).isNotNull();
}
@Test
- public void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
+ void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -781,8 +775,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
.forEach(task -> task.getCurrentExecutionAttempt().markFinished());
// nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
checkpointCoordinator.startCheckpointScheduler();
// trigger the first checkpoint. this should not succeed
@@ -790,17 +784,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointFuture.isCompletedExceptionally());
+ assertThat(checkpointFuture).isCompletedExceptionally();
// still, nothing should be happening
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
checkpointCoordinator.shutdown();
}
@Test
- public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
+ void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
// create some mock Execution vertices that receive the checkpoint trigger messages
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -829,12 +823,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
+ void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
testTriggerAndDeclineCheckpointSimple(CHECKPOINT_DECLINED);
}
@Test
- public void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
+ void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
testTriggerAndDeclineCheckpointSimple(CHECKPOINT_ASYNC_EXCEPTION);
}
@@ -881,8 +875,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointFailureManager(0, failJobCallback))
.build(graph);
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -891,53 +885,53 @@ public class CheckpointCoordinatorTest extends TestLogger {
FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
// validate that we have a pending checkpoint
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// we have one task scheduled that will cancel after timeout
- assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
long checkpointId =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint =
checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
- assertNotNull(checkpoint);
- assertEquals(checkpointId, checkpoint.getCheckpointID());
- assertEquals(graph.getJobID(), checkpoint.getJobId());
- assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
- assertEquals(0, checkpoint.getOperatorStates().size());
- assertFalse(checkpoint.isDisposed());
- assertFalse(checkpoint.areTasksFullyAcknowledged());
+ assertThat(checkpoint).isNotNull();
+ assertThat(checkpoint.getCheckpointID()).isEqualTo(checkpointId);
+ assertThat(checkpoint.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+ assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isZero();
+ assertThat(checkpoint.getOperatorStates().size()).isZero();
+ assertThat(checkpoint.isDisposed()).isFalse();
+ assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
// check that the vertices received the trigger checkpoint message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
CheckpointCoordinatorTestingUtils.TriggeredCheckpoint triggeredCheckpoint =
gateway.getOnlyTriggeredCheckpoint(
vertex.getCurrentExecutionAttempt().getAttemptId());
- assertEquals(checkpointId, triggeredCheckpoint.checkpointId);
- assertEquals(checkpoint.getCheckpointTimestamp(), triggeredCheckpoint.timestamp);
- assertEquals(
- CheckpointOptions.forCheckpointWithDefaultLocation(),
- triggeredCheckpoint.checkpointOptions);
+ assertThat(triggeredCheckpoint.checkpointId).isEqualTo(checkpointId);
+ assertThat(triggeredCheckpoint.timestamp)
+ .isEqualTo(checkpoint.getCheckpointTimestamp());
+ assertThat(triggeredCheckpoint.checkpointOptions)
+ .isEqualTo(CheckpointOptions.forCheckpointWithDefaultLocation());
}
// acknowledge from one of the tasks
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
"Unknown location");
- assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
- assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
- assertFalse(checkpoint.isDisposed());
- assertFalse(checkpoint.areTasksFullyAcknowledged());
+ assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isOne();
+ assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
+ assertThat(checkpoint.isDisposed()).isFalse();
+ assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
// acknowledge the same task again (should not matter)
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
"Unknown location");
- assertFalse(checkpoint.isDisposed());
- assertFalse(checkpoint.areTasksFullyAcknowledged());
+ assertThat(checkpoint.isDisposed()).isFalse();
+ assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
// decline checkpoint from the other task, this should cancel the checkpoint
// and trigger a new one
@@ -945,14 +939,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
new DeclineCheckpoint(
graph.getJobID(), attemptID1, checkpointId, checkpointException),
TASK_MANAGER_LOCATION_INFO);
- assertTrue(checkpoint.isDisposed());
+ assertThat(checkpoint.isDisposed()).isTrue();
// the canceler is also removed
- assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
// validate that we have no new pending checkpoint
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// decline again, nothing should happen
// decline from the other task, nothing should happen
@@ -964,8 +958,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new DeclineCheckpoint(
graph.getJobID(), attemptID2, checkpointId, checkpointException),
TASK_MANAGER_LOCATION_INFO);
- assertTrue(checkpoint.isDisposed());
- assertEquals(1, failJobCallback.getInvokeCounter());
+ assertThat(checkpoint.isDisposed()).isTrue();
+ assertThat(failJobCallback.getInvokeCounter()).isOne();
checkpointCoordinator.shutdown();
}
@@ -976,7 +970,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
* checkpoint because a later checkpoint is already in progress.
*/
@Test
- public void testTriggerAndDeclineCheckpointComplex() throws Exception {
+ void testTriggerAndDeclineCheckpointComplex() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -997,9 +991,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1014,9 +1008,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
// validate that we have a pending checkpoint
- assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(2, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(2);
Iterator<Map.Entry<Long, PendingCheckpoint>> it =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
@@ -1027,32 +1021,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
PendingCheckpoint checkpoint2 =
checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
- assertNotNull(checkpoint1);
- assertEquals(checkpoint1Id, checkpoint1.getCheckpointID());
- assertEquals(graph.getJobID(), checkpoint1.getJobId());
- assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
- assertEquals(0, checkpoint1.getOperatorStates().size());
- assertFalse(checkpoint1.isDisposed());
- assertFalse(checkpoint1.areTasksFullyAcknowledged());
-
- assertNotNull(checkpoint2);
- assertEquals(checkpoint2Id, checkpoint2.getCheckpointID());
- assertEquals(graph.getJobID(), checkpoint2.getJobId());
- assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
- assertEquals(0, checkpoint2.getOperatorStates().size());
- assertFalse(checkpoint2.isDisposed());
- assertFalse(checkpoint2.areTasksFullyAcknowledged());
+ assertThat(checkpoint1).isNotNull();
+ assertThat(checkpoint1.getCheckpointID()).isEqualTo(checkpoint1Id);
+ assertThat(checkpoint1.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(checkpoint1.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+ assertThat(checkpoint1.getNumberOfAcknowledgedTasks()).isZero();
+ assertThat(checkpoint1.getOperatorStates()).isEmpty();
+ assertThat(checkpoint1.isDisposed()).isFalse();
+ assertThat(checkpoint1.areTasksFullyAcknowledged()).isFalse();
+
+ assertThat(checkpoint2).isNotNull();
+ assertThat(checkpoint2.getCheckpointID()).isEqualTo(checkpoint2Id);
+ assertThat(checkpoint2.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(checkpoint2.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+ assertThat(checkpoint2.getNumberOfAcknowledgedTasks()).isZero();
+ assertThat(checkpoint2.getOperatorStates()).isEmpty();
+ assertThat(checkpoint2.isDisposed()).isFalse();
+ assertThat(checkpoint2.areTasksFullyAcknowledged()).isFalse();
// check that the vertices received the trigger checkpoint message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> triggeredCheckpoints =
gateway.getTriggeredCheckpoints(
vertex.getCurrentExecutionAttempt().getAttemptId());
- assertEquals(2, triggeredCheckpoints.size());
- assertEquals(checkpoint1Id, triggeredCheckpoints.get(0).checkpointId);
- assertEquals(checkpoint2Id, triggeredCheckpoints.get(1).checkpointId);
+ assertThat(triggeredCheckpoints).hasSize(2);
+ assertThat(triggeredCheckpoints.get(0).checkpointId).isEqualTo(checkpoint1Id);
+ assertThat(triggeredCheckpoints.get(1).checkpointId).isEqualTo(checkpoint2Id);
}
// decline checkpoint from one of the tasks, this should cancel the checkpoint
@@ -1064,36 +1058,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointException(CHECKPOINT_DECLINED)),
TASK_MANAGER_LOCATION_INFO);
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
- assertEquals(
- checkpoint1Id,
- gateway.getOnlyNotifiedAbortedCheckpoint(
- vertex.getCurrentExecutionAttempt().getAttemptId())
- .checkpointId);
+ assertThat(
+ gateway.getOnlyNotifiedAbortedCheckpoint(
+ vertex.getCurrentExecutionAttempt().getAttemptId())
+ .checkpointId)
+ .isEqualTo(checkpoint1Id);
}
- assertTrue(checkpoint1.isDisposed());
+ assertThat(checkpoint1.isDisposed()).isTrue();
// validate that we have only one pending checkpoint left
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
// validate that it is the same second checkpoint from earlier
long checkpointIdNew =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpointNew =
checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
- assertEquals(checkpoint2Id, checkpointIdNew);
-
- assertNotNull(checkpointNew);
- assertEquals(checkpointIdNew, checkpointNew.getCheckpointID());
- assertEquals(graph.getJobID(), checkpointNew.getJobId());
- assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
- assertEquals(0, checkpointNew.getOperatorStates().size());
- assertFalse(checkpointNew.isDisposed());
- assertFalse(checkpointNew.areTasksFullyAcknowledged());
- assertNotEquals(checkpoint1.getCheckpointID(), checkpointNew.getCheckpointID());
+ assertThat(checkpointIdNew).isEqualTo(checkpoint2Id);
+
+ assertThat(checkpointNew).isNotNull();
+ assertThat(checkpointNew.getCheckpointID()).isEqualTo(checkpointIdNew);
+ assertThat(checkpointNew.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(checkpointNew.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+ assertThat(checkpointNew.getNumberOfAcknowledgedTasks()).isZero();
+ assertThat(checkpointNew.getOperatorStates()).isEmpty();
+ assertThat(checkpointNew.isDisposed()).isFalse();
+ assertThat(checkpointNew.areTasksFullyAcknowledged()).isFalse();
+ assertThat(checkpointNew.getCheckpointID()).isNotEqualTo(checkpoint1.getCheckpointID());
// decline again, nothing should happen
// decline from the other task, nothing should happen
@@ -1111,22 +1105,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpoint1Id,
new CheckpointException(CHECKPOINT_DECLINED)),
TASK_MANAGER_LOCATION_INFO);
- assertTrue(checkpoint1.isDisposed());
+ assertThat(checkpoint1.isDisposed()).isTrue();
// will not notify abort message again
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
- assertEquals(
- 1,
- gateway.getNotifiedAbortedCheckpoints(
- vertex.getCurrentExecutionAttempt().getAttemptId())
- .size());
+ assertThat(
+ gateway.getNotifiedAbortedCheckpoints(
+ vertex.getCurrentExecutionAttempt().getAttemptId()))
+ .hasSize(1);
}
checkpointCoordinator.shutdown();
}
@Test
- public void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
+ void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -1147,9 +1140,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -1158,28 +1151,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
// validate that we have a pending checkpoint
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
long checkpointId =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint =
checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
- assertNotNull(checkpoint);
- assertEquals(checkpointId, checkpoint.getCheckpointID());
- assertEquals(graph.getJobID(), checkpoint.getJobId());
- assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
- assertEquals(0, checkpoint.getOperatorStates().size());
- assertFalse(checkpoint.isDisposed());
- assertFalse(checkpoint.areTasksFullyAcknowledged());
+ assertThat(checkpoint).isNotNull();
+ assertThat(checkpoint.getCheckpointID()).isEqualTo(checkpointId);
+ assertThat(checkpoint.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+ assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isZero();
+ assertThat(checkpoint.getOperatorStates()).isEmpty();
+ assertThat(checkpoint.isDisposed()).isFalse();
+ assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
// check that the vertices received the trigger checkpoint message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId);
}
OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -1201,18 +1195,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
taskOperatorSubtaskStates2);
checkpointCoordinator.receiveAcknowledgeMessage(
acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
- assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
- assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
- assertFalse(checkpoint.isDisposed());
- assertFalse(checkpoint.areTasksFullyAcknowledged());
+ assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isOne();
+ assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
+ assertThat(checkpoint.isDisposed()).isFalse();
+ assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
verify(subtaskState2, times(1))
.registerSharedStates(any(SharedStateRegistry.class), eq(checkpointId));
// acknowledge the same task again (should not matter)
checkpointCoordinator.receiveAcknowledgeMessage(
acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
- assertFalse(checkpoint.isDisposed());
- assertFalse(checkpoint.areTasksFullyAcknowledged());
+ assertThat(checkpoint.isDisposed()).isFalse();
+ assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
verify(subtaskState2, times(2))
.registerSharedStates(any(SharedStateRegistry.class), eq(checkpointId));
@@ -1228,14 +1222,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
- assertTrue(checkpoint.isDisposed());
+ assertThat(checkpoint.isDisposed()).isTrue();
// the now we should have a completed checkpoint
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
// the canceler should be removed now
- assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
// validate that the subtasks states have registered their shared states.
{
@@ -1248,15 +1242,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
// validate that the relevant tasks got a confirmation message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(
- checkpointId,
- gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId);
}
CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
- assertEquals(graph.getJobID(), success.getJobId());
- assertEquals(checkpoint.getCheckpointID(), success.getCheckpointID());
- assertEquals(2, success.getOperatorStates().size());
+ assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(success.getCheckpointID()).isEqualTo(checkpoint.getCheckpointID());
+ assertThat(success.getOperatorStates()).hasSize(2);
// ---------------
// trigger another checkpoint and see that this one replaces the other checkpoint
@@ -1274,31 +1267,31 @@ public class CheckpointCoordinatorTest extends TestLogger {
new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointIdNew),
TASK_MANAGER_LOCATION_INFO);
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
- assertEquals(graph.getJobID(), successNew.getJobId());
- assertEquals(checkpointIdNew, successNew.getCheckpointID());
- assertEquals(2, successNew.getOperatorStates().size());
- assertTrue(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+ assertThat(successNew.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(successNew.getCheckpointID()).isEqualTo(checkpointIdNew);
+ assertThat(successNew.getOperatorStates()).hasSize(2);
+ assertThat(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState))
+ .isTrue();
// validate that the relevant tasks got a confirmation message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(
- checkpointIdNew, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
- assertEquals(
- checkpointIdNew,
- gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointIdNew);
+ assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointIdNew);
}
checkpointCoordinator.shutdown();
}
@Test
- public void testMultipleConcurrentCheckpoints() throws Exception {
+ void testMultipleConcurrentCheckpoints() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
JobVertexID jobVertexID3 = new JobVertexID();
@@ -1333,8 +1326,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build(graph);
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1342,8 +1335,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
PendingCheckpoint pending1 =
checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1352,7 +1345,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger messages should have been sent
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(checkpointId1, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId1);
}
// acknowledge one of the three tasks
@@ -1368,8 +1362,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
- assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
PendingCheckpoint pending2;
{
@@ -1384,7 +1378,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger messages should have been sent
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(checkpointId2, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId2);
}
// we acknowledge the remaining two tasks from the first
@@ -1403,16 +1398,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
TASK_MANAGER_LOCATION_INFO);
// now, the first checkpoint should be confirmed
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertTrue(pending1.isDisposed());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+ assertThat(pending1.isDisposed()).isTrue();
// the first confirm message should be out
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(
- checkpointId1,
- gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId1);
}
// send the last remaining ack for the second checkpoint
@@ -1422,38 +1416,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
TASK_MANAGER_LOCATION_INFO);
// now, the second checkpoint should be confirmed
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertTrue(pending2.isDisposed());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isEqualTo(2);
+ assertThat(pending2.isDisposed()).isTrue();
// the second commit message should be out
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(
- checkpointId2,
- gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId2);
}
// validate the committed checkpoints
List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
CompletedCheckpoint sc1 = scs.get(0);
- assertEquals(checkpointId1, sc1.getCheckpointID());
- assertEquals(graph.getJobID(), sc1.getJobId());
- assertEquals(3, sc1.getOperatorStates().size());
- assertTrue(sc1.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+ assertThat(sc1.getCheckpointID()).isEqualTo(checkpointId1);
+ assertThat(sc1.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(sc1.getOperatorStates()).hasSize(3);
+ assertThat(sc1.getOperatorStates().values()).allMatch(this::hasNoSubState);
CompletedCheckpoint sc2 = scs.get(1);
- assertEquals(checkpointId2, sc2.getCheckpointID());
- assertEquals(graph.getJobID(), sc2.getJobId());
- assertEquals(3, sc2.getOperatorStates().size());
- assertTrue(sc2.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+ assertThat(sc2.getCheckpointID()).isEqualTo(checkpointId2);
+ assertThat(sc2.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(sc2.getOperatorStates()).hasSize(3);
+ assertThat(sc2.getOperatorStates().values()).allMatch(this::hasNoSubState);
checkpointCoordinator.shutdown();
}
@Test
- public void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
+ void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
JobVertexID jobVertexID3 = new JobVertexID();
@@ -1490,8 +1483,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build(graph);
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1499,8 +1492,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
PendingCheckpoint pending1 =
checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1509,7 +1502,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger messages should have been sent
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(checkpointId1, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId1);
}
OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -1544,8 +1538,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
- assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
PendingCheckpoint pending2;
{
@@ -1572,7 +1566,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger messages should have been sent
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(checkpointId2, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId2);
}
// we acknowledge one more task from the first checkpoint and the second
@@ -1617,11 +1612,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
// now, the second checkpoint should be confirmed, and the first discarded
// actually both pending checkpoints are discarded, and the second has been transformed
// into a successful checkpoint
- assertTrue(pending1.isDisposed());
- assertTrue(pending2.isDisposed());
+ assertThat(pending1.isDisposed()).isTrue();
+ assertThat(pending2.isDisposed()).isTrue();
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
// validate that all received subtask states in the first checkpoint have been discarded
verify(subtaskState11, times(1)).discardState();
@@ -1635,16 +1630,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
// validate the committed checkpoints
List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
CompletedCheckpoint success = scs.get(0);
- assertEquals(checkpointId2, success.getCheckpointID());
- assertEquals(graph.getJobID(), success.getJobId());
- assertEquals(3, success.getOperatorStates().size());
+ assertThat(success.getCheckpointID()).isEqualTo(checkpointId2);
+ assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(success.getOperatorStates()).hasSize(3);
// the first confirm message should be out
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(
- checkpointId2,
- gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId2);
}
// send the last remaining ack for the first checkpoint. This should not do anything
@@ -1668,7 +1662,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testCheckpointTimeoutIsolated() throws Exception {
+ void testCheckpointTimeoutIsolated() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -1699,11 +1693,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
PendingCheckpoint checkpoint =
checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
- assertFalse(checkpoint.isDisposed());
+ assertThat(checkpoint.isDisposed()).isFalse();
OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -1722,9 +1716,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
// triggers cancelling
manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
- assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDisposed());
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpoint.isDisposed())
+ .as("Checkpoint was not canceled by the timeout")
+ .isTrue();
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// validate that the received states have been discarded
verify(subtaskState1, times(1)).discardState();
@@ -1732,14 +1728,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
// no confirm message must have been sent
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(0, gateway.getNotifiedCompletedCheckpoints(attemptId).size());
+ assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
}
checkpointCoordinator.shutdown();
}
@Test
- public void testHandleMessagesForNonExistingCheckpoints() throws Exception {
+ void testHandleMessagesForNonExistingCheckpoints() throws Exception {
// create some mock execution vertices and trigger some checkpoint
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -1804,7 +1800,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
* @throws Exception
*/
@Test
- public void testStateCleanupForLateOrUnknownMessages() throws Exception {
+ void testStateCleanupForLateOrUnknownMessages() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -1839,7 +1835,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
PendingCheckpoint pendingCheckpoint =
checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1921,7 +1917,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointException(CHECKPOINT_DECLINED)),
TASK_MANAGER_LOCATION_INFO);
- assertTrue(pendingCheckpoint.isDisposed());
+ assertThat(pendingCheckpoint.isDisposed()).isTrue();
// check that we've cleaned up the already acknowledged state
verify(subtaskStateTrigger, times(1)).discardState();
@@ -1972,22 +1968,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testMaxConcurrentAttempts1() {
+ void testMaxConcurrentAttempts1() {
testMaxConcurrentAttempts(1);
}
@Test
- public void testMaxConcurrentAttempts2() {
+ void testMaxConcurrentAttempts2() {
testMaxConcurrentAttempts(2);
}
@Test
- public void testMaxConcurrentAttempts5() {
+ void testMaxConcurrentAttempts5() {
testMaxConcurrentAttempts(5);
}
@Test
- public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
+ void testTriggerAndConfirmSimpleSavepoint() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -2020,32 +2016,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setCheckpointStatsTracker(statsTracker)
.build(graph);
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
// trigger the first checkpoint. this should succeed
- String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+ String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
CompletableFuture<CompletedCheckpoint> savepointFuture =
checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(savepointFuture.isDone());
+ assertThat(savepointFuture).isNotDone();
// validate that we have a pending savepoint
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
long checkpointId =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint pending = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
- assertNotNull(pending);
- assertEquals(checkpointId, pending.getCheckpointID());
- assertEquals(graph.getJobID(), pending.getJobId());
- assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, pending.getNumberOfAcknowledgedTasks());
- assertEquals(0, pending.getOperatorStates().size());
- assertFalse(pending.isDisposed());
- assertFalse(pending.areTasksFullyAcknowledged());
- assertFalse(pending.canBeSubsumed());
+ assertThat(pending).isNotNull();
+ assertThat(pending.getCheckpointID()).isEqualTo(checkpointId);
+ assertThat(pending.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(pending.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+ assertThat(pending.getNumberOfAcknowledgedTasks()).isZero();
+ assertThat(pending.getOperatorStates()).isEmpty();
+ assertThat(pending.isDisposed()).isFalse();
+ assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+ assertThat(pending.canBeSubsumed()).isFalse();
OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
@@ -2066,18 +2062,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
taskOperatorSubtaskStates2);
checkpointCoordinator.receiveAcknowledgeMessage(
acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
- assertEquals(1, pending.getNumberOfAcknowledgedTasks());
- assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
- assertFalse(pending.isDisposed());
- assertFalse(pending.areTasksFullyAcknowledged());
- assertFalse(savepointFuture.isDone());
+ assertThat(pending.getNumberOfAcknowledgedTasks()).isOne();
+ assertThat(pending.getNumberOfNonAcknowledgedTasks()).isOne();
+ assertThat(pending.isDisposed()).isFalse();
+ assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+ assertThat(savepointFuture.isDone()).isFalse();
// acknowledge the same task again (should not matter)
checkpointCoordinator.receiveAcknowledgeMessage(
acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
- assertFalse(pending.isDisposed());
- assertFalse(pending.areTasksFullyAcknowledged());
- assertFalse(savepointFuture.isDone());
+ assertThat(pending.isDisposed()).isFalse();
+ assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+ assertThat(savepointFuture).isNotDone();
// acknowledge the other task.
checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2091,31 +2087,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
// the checkpoint is internally converted to a successful checkpoint and the
// pending checkpoint object is disposed
- assertTrue(pending.isDisposed());
- assertNotNull(savepointFuture.get());
+ assertThat(pending.isDisposed()).isTrue();
+ assertThat(savepointFuture.get()).isNotNull();
// the now we should have a completed checkpoint
// savepoints should not registered as retained checkpoints
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
// validate that the relevant tasks got a confirmation message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId);
assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
}
CompletedCheckpoint success = savepointFuture.get();
- assertEquals(graph.getJobID(), success.getJobId());
- assertEquals(pending.getCheckpointID(), success.getCheckpointID());
- assertEquals(2, success.getOperatorStates().size());
+ assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(success.getCheckpointID()).isEqualTo(pending.getCheckpointID());
+ assertThat(success.getOperatorStates()).hasSize(2);
AbstractCheckpointStats actualStats =
statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
- assertEquals(checkpointId, actualStats.getCheckpointId());
- assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+ assertThat(actualStats.getCheckpointId()).isEqualTo(checkpointId);
+ assertThat(actualStats.getStatus()).isEqualTo(CheckpointStatsStatus.COMPLETED);
checkpointCoordinator.shutdown();
}
@@ -2127,7 +2124,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
* savepoint.
*/
@Test
- public void testSavepointsAreNotSubsumed() throws Exception {
+ void testSavepointsAreNotSubsumed() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -2159,7 +2156,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build(graph));
- String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+ String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
// Trigger savepoint and checkpoint
CompletableFuture<CompletedCheckpoint> savepointFuture1 =
@@ -2167,12 +2164,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
long savepointId1 = counter.getLast();
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
@@ -2180,7 +2177,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
long checkpointId2 = counter.getLast();
- assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
// 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2194,24 +2191,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
.sendAcknowledgeMessages(
anyList(), eq(checkpointId2), anyLong(), eq(INVALID_CHECKPOINT_ID));
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
- assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());
- assertFalse(savepointFuture1.isDone());
+ assertThat(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed())
+ .isFalse();
+ assertThat(savepointFuture1).isNotDone();
CompletableFuture<CompletedCheckpoint> checkpointFuture3 =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
FutureUtils.throwIfCompletedExceptionally(checkpointFuture3);
- assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
CompletableFuture<CompletedCheckpoint> savepointFuture2 =
checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
manuallyTriggeredScheduledExecutor.triggerAll();
long savepointId2 = counter.getLast();
FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
- assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
// savepoints should not subsume checkpoints
checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2225,13 +2223,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(checkpointCoordinator, times(0))
.sendAcknowledgeMessages(anyList(), eq(savepointId2), anyLong(), anyLong());
- assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
- assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());
+ assertThat(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed())
+ .isFalse();
- assertFalse(savepointFuture1.isDone());
- assertNotNull(savepointFuture2.get());
+ assertThat(savepointFuture1).isNotDone();
+ assertThat(savepointFuture2).isCompletedWithValueMatching(Objects::nonNull);
// Ack first savepoint
checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2245,9 +2244,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(checkpointCoordinator, times(0))
.sendAcknowledgeMessages(anyList(), eq(savepointId1), anyLong(), anyLong());
- assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertNotNull(savepointFuture1.get());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+ assertThat(savepointFuture1).isCompletedWithValueMatching(Objects::nonNull);
CompletableFuture<CompletedCheckpoint> checkpointFuture4 =
checkpointCoordinator.triggerCheckpoint(false);
@@ -2307,8 +2306,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
}
- assertEquals(maxConcurrentAttempts, gateway.getTriggeredCheckpoints(attemptID1).size());
- assertEquals(0, gateway.getNotifiedCompletedCheckpoints(attemptID1).size());
+ assertThat(gateway.getTriggeredCheckpoints(attemptID1).size())
+ .isEqualTo(maxConcurrentAttempts);
+ assertThat(gateway.getNotifiedCompletedCheckpoints(attemptID1).size()).isZero();
// now, once we acknowledge one checkpoint, it should trigger the next one
checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2317,20 +2317,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
final Collection<ScheduledFuture<?>> periodicScheduledTasks =
manuallyTriggeredScheduledExecutor.getActivePeriodicScheduledTask();
- assertEquals(1, periodicScheduledTasks.size());
+ assertThat(periodicScheduledTasks.size()).isOne();
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(
- maxConcurrentAttempts + 1, gateway.getTriggeredCheckpoints(attemptID1).size());
+ assertThat(gateway.getTriggeredCheckpoints(attemptID1))
+ .hasSize(maxConcurrentAttempts + 1);
// no further checkpoints should happen
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(
- maxConcurrentAttempts + 1, gateway.getTriggeredCheckpoints(attemptID1).size());
+ assertThat(gateway.getTriggeredCheckpoints(attemptID1))
+ .hasSize(maxConcurrentAttempts + 1);
checkpointCoordinator.shutdown();
} catch (Exception e) {
@@ -2340,7 +2340,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
+ void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
final int maxConcurrentAttempts = 2;
JobVertexID jobVertexID1 = new JobVertexID();
@@ -2375,9 +2375,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
} while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
// validate that the pending checkpoints are there
- assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
- assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints())
+ .isEqualTo(maxConcurrentAttempts);
+ assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(1L);
+ assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(2L);
// now we acknowledge the second checkpoint, which should subsume the first checkpoint
// and allow two more checkpoints to be triggered
@@ -2393,15 +2394,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
} while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
// do the final check
- assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
- assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints())
+ .isEqualTo(maxConcurrentAttempts);
+ assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(3L);
+ assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(4L);
checkpointCoordinator.shutdown();
}
@Test
- public void testPeriodicSchedulingWithInactiveTasks() throws Exception {
+ void testPeriodicSchedulingWithInactiveTasks() throws Exception {
CheckpointCoordinator checkpointCoordinator =
setupCheckpointCoordinatorWithInactiveTasks(new MemoryStateBackend());
@@ -2409,7 +2411,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isGreaterThan(0);
}
private CheckpointCoordinator setupCheckpointCoordinatorWithInactiveTasks(
@@ -2447,7 +2449,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
// no checkpoint should have started so far
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
// now move the state to RUNNING
vertex1.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
@@ -2461,7 +2463,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
/** Tests that the savepoints can be triggered concurrently. */
@Test
- public void testConcurrentSavepoints() throws Exception {
+ void testConcurrentSavepoints() throws Exception {
int numSavepoints = 5;
JobVertexID jobVertexID1 = new JobVertexID();
@@ -2492,7 +2494,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
- String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+ String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
// Trigger savepoints
for (int i = 0; i < numSavepoints; i++) {
@@ -2503,7 +2505,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// After triggering multiple savepoints, all should in progress
for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
- assertFalse(savepointFuture.isDone());
+ assertThat(savepointFuture).isNotDone();
}
manuallyTriggeredScheduledExecutor.triggerAll();
@@ -2518,13 +2520,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
// After ACKs, all should be completed
for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
- assertNotNull(savepointFuture.get());
+ assertThat(savepointFuture).isCompletedWithValueMatching(Objects::nonNull);
}
}
/** Tests that no minimum delay between savepoints is enforced. */
@Test
- public void testMinDelayBetweenSavepoints() throws Exception {
+ void testMinDelayBetweenSavepoints() throws Exception {
CheckpointCoordinatorConfiguration chkConfig =
new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
.setMinPauseBetweenCheckpoints(
@@ -2538,20 +2540,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTimer(manuallyTriggeredScheduledExecutor)
.build(EXECUTOR_RESOURCE.getExecutor());
- String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+ String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
CompletableFuture<CompletedCheckpoint> savepoint0 =
checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
- assertFalse("Did not trigger savepoint", savepoint0.isDone());
+ assertThat(savepoint0).as("Did not trigger savepoint").isNotDone();
CompletableFuture<CompletedCheckpoint> savepoint1 =
checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
- assertFalse("Did not trigger savepoint", savepoint1.isDone());
+ assertThat(savepoint1).as("Did not trigger savepoint").isNotDone();
}
/** Tests that the externalized checkpoint configuration is respected. */
@Test
- public void testExternalizedCheckpoints() throws Exception {
+ void testExternalizedCheckpoints() throws Exception {
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(new JobVertexID())
@@ -2579,7 +2581,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointProperties expected =
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
- assertEquals(expected, props);
+ assertThat(props).isEqualTo(expected);
}
// the now we should have a completed checkpoint
@@ -2587,7 +2589,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testCreateKeyGroupPartitions() {
+ void testCreateKeyGroupPartitions() {
testCreateKeyGroupPartitions(1, 1);
testCreateKeyGroupPartitions(13, 1);
testCreateKeyGroupPartitions(13, 2);
@@ -2612,13 +2614,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, i));
if (!range.contains(i)) {
- Assert.fail("Could not find expected key-group " + i + " in range " + range);
+ fail("Could not find expected key-group " + i + " in range " + range);
}
}
}
@Test
- public void testPartitionableStateRepartitioning() {
+ void testPartitionableStateRepartitioning() {
Random r = new Random(42);
for (int run = 0; run < 10000; ++run) {
@@ -2685,7 +2687,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
int expectedTotalPartitions = 0;
for (List<OperatorStateHandle> previousParallelOpInstanceState :
previousParallelOpInstanceStates) {
- Assert.assertEquals(1, previousParallelOpInstanceState.size());
+ assertThat(previousParallelOpInstanceState.size()).isOne();
for (OperatorStateHandle psh : previousParallelOpInstanceState) {
Map<String, OperatorStateHandle.StateMetaInfo> offsMap =
@@ -2783,16 +2785,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
// possible.
if (oldParallelism != newParallelism) {
int maxLoadDiff = maxCount - minCount;
- Assert.assertTrue(
- "Difference in partition load is > 1 : " + maxLoadDiff, maxLoadDiff <= 1);
+ assertThat(maxLoadDiff <= 1)
+ .as("Difference in partition load is > 1 : " + maxLoadDiff)
+ .isTrue();
}
- Assert.assertEquals(expectedTotalPartitions, actualTotalPartitions);
- Assert.assertEquals(expected, actual);
+ assertThat(actualTotalPartitions).isEqualTo(expectedTotalPartitions);
+ assertThat(actual).isEqualTo(expected);
}
/** Tests that the pending checkpoint stats callbacks are created. */
@Test
- public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
+ void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
// set up the coordinator and validate the initial state
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
CheckpointCoordinator checkpointCoordinator =
@@ -2823,7 +2826,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
/** Tests that the restore callbacks are called if registered. */
@Test
- public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
+ void testCheckpointStatsTrackerRestoreCallback() throws Exception {
StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
// set up the coordinator and validate the initial state
@@ -2850,15 +2853,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointsCleaner(),
() -> {});
- assertTrue(
- checkpointCoordinator.restoreLatestCheckpointedStateToAll(
- Collections.emptySet(), true));
+ assertThat(
+ checkpointCoordinator.restoreLatestCheckpointedStateToAll(
+ Collections.emptySet(), true))
+ .isTrue();
verify(tracker, times(1)).reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
}
@Test
- public void testSharedStateRegistrationOnRestore() throws Exception {
+ void testSharedStateRegistrationOnRestore() throws Exception {
for (RestoreMode restoreMode : RestoreMode.values()) {
JobVertexID jobVertexID1 = new JobVertexID();
@@ -2899,7 +2903,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
List<CompletedCheckpoint> completedCheckpoints = coordinator.getSuccessfulCheckpoints();
- assertEquals(numCheckpoints, completedCheckpoints.size());
+ assertThat(completedCheckpoints.size()).isEqualTo(numCheckpoints);
int sharedHandleCount = 0;
@@ -2929,8 +2933,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
for (StreamStateHandle streamStateHandle :
incrementalKeyedStateHandle.getSharedState().values()) {
- assertFalse(
- streamStateHandle instanceof PlaceholderStreamStateHandle);
+ assertThat(
+ streamStateHandle
+ instanceof PlaceholderStreamStateHandle)
+ .isFalse();
verify(streamStateHandle, never()).discardState();
++sharedHandleCount;
}
@@ -2951,7 +2957,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
// 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
- assertEquals(10, sharedHandleCount);
+ assertThat(sharedHandleCount).isEqualTo(10);
// discard CP0
store.removeOldestCheckpoint();
@@ -2972,7 +2978,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Set<ExecutionJobVertex> tasks = new HashSet<>();
tasks.add(jobVertex1);
- assertEquals(JobStatus.SUSPENDED, store.getShutdownStatus().orElse(null));
+ assertThat(store.getShutdownStatus().orElse(null)).isEqualTo(JobStatus.SUSPENDED);
SharedStateRegistry secondInstance =
SharedStateRegistry.DEFAULT_FACTORY.create(
org.apache.flink.util.concurrent.Executors.directExecutor(),
@@ -2983,7 +2989,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
10, store.getAllCheckpoints(), secondInstance);
final CheckpointCoordinator secondCoordinator =
coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(graph);
- assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
+ assertThat(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false))
+ .isTrue();
// validate that all shared states are registered again after the recovery.
cp = 0;
@@ -3031,7 +3038,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
+ void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
final Tuple2<Integer, Throwable> invocationCounterAndException = Tuple2.of(0, null);
final Throwable expectedRootCause = new IOException("Custom-Exception");
@@ -3080,7 +3087,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
declineSynchronousSavepoint(
graph.getJobID(), coordinator, attemptID1, expectedRootCause);
- assertTrue(syncSavepoint.isDisposed());
+ assertThat(syncSavepoint.isDisposed()).isTrue();
String expectedRootCauseMessage =
String.format(
"%s: %s",
@@ -3090,26 +3097,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
fail("Expected Exception not found.");
} catch (ExecutionException e) {
final Throwable cause = ExceptionUtils.stripExecutionException(e);
- assertTrue(cause instanceof CheckpointException);
- assertEquals(expectedRootCauseMessage, cause.getCause().getCause().getMessage());
+ assertThat(cause instanceof CheckpointException).isTrue();
+ assertThat(cause.getCause().getCause().getMessage())
+ .isEqualTo(expectedRootCauseMessage);
}
- assertEquals(1L, invocationCounterAndException.f0.intValue());
- assertTrue(
- invocationCounterAndException.f1 instanceof CheckpointException
- && invocationCounterAndException
- .f1
- .getCause()
- .getCause()
- .getMessage()
- .equals(expectedRootCauseMessage));
+ assertThat(invocationCounterAndException.f0.intValue()).isEqualTo(1L);
+ assertThat(
+ invocationCounterAndException.f1 instanceof CheckpointException
+ && invocationCounterAndException
+ .f1
+ .getCause()
+ .getCause()
+ .getMessage()
+ .equals(expectedRootCauseMessage))
+ .isTrue();
coordinator.shutdown();
}
/** Tests that do not trigger checkpoint when stop the coordinator after the eager pre-check. */
@Test
- public void testTriggerCheckpointAfterStopping() throws Exception {
+ void testTriggerCheckpointAfterStopping() throws Exception {
StoppingCheckpointIDCounter testingCounter = new StoppingCheckpointIDCounter();
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
@@ -3123,7 +3132,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
/** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */
@Test
- public void testTriggerCheckpointWithCounterIOException() throws Exception {
+ void testTriggerCheckpointWithCounterIOException() throws Exception {
// given: Checkpoint coordinator which fails on getCheckpointId.
IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
TestFailJobCallback failureCallback = new TestFailJobCallback();
@@ -3144,18 +3153,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
// then: Failure manager should fail the job.
- assertEquals(1, failureCallback.getInvokeCounter());
+ assertThat(failureCallback.getInvokeCounter()).isOne();
// then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1.
CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
- assertEquals(0, counts.getNumberOfRestoredCheckpoints());
- assertEquals(1, counts.getTotalNumberOfCheckpoints());
- assertEquals(0, counts.getNumberOfInProgressCheckpoints());
- assertEquals(0, counts.getNumberOfCompletedCheckpoints());
- assertEquals(1, counts.getNumberOfFailedCheckpoints());
+ assertThat(counts.getNumberOfRestoredCheckpoints()).isZero();
+ assertThat(counts.getTotalNumberOfCheckpoints()).isOne();
+ assertThat(counts.getNumberOfInProgressCheckpoints()).isZero();
+ assertThat(counts.getNumberOfCompletedCheckpoints()).isZero();
+ assertThat(counts.getNumberOfFailedCheckpoints()).isOne();
// then: The PendingCheckpoint shouldn't be created.
- assertNull(statsTracker.getPendingCheckpointStats(1));
+ assertThat(statsTracker.getPendingCheckpointStats(1)).isNull();
}
private void testTriggerCheckpoint(
@@ -3190,7 +3199,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testSavepointScheduledInUnalignedMode() throws Exception {
+ void testSavepointScheduledInUnalignedMode() throws Exception {
int maxConcurrentCheckpoints = 1;
int checkpointRequestsToSend = 10;
int activeRequests = 0;
@@ -3216,15 +3225,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
activeRequests++;
}
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(
- activeRequests - maxConcurrentCheckpoints, coordinator.getNumQueuedRequests());
+ assertThat(coordinator.getNumQueuedRequests())
+ .isEqualTo(activeRequests - maxConcurrentCheckpoints);
Future<?> savepointFuture =
coordinator.triggerSavepoint("/tmp", SavepointFormatType.CANONICAL);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(
- ++activeRequests - maxConcurrentCheckpoints,
- coordinator.getNumQueuedRequests());
+ assertThat(coordinator.getNumQueuedRequests())
+ .isEqualTo(++activeRequests - maxConcurrentCheckpoints);
coordinator.receiveDeclineMessage(
new DeclineCheckpoint(
@@ -3236,16 +3244,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
manuallyTriggeredScheduledExecutor.triggerAll();
activeRequests--; // savepoint triggered
- assertEquals(
- activeRequests - maxConcurrentCheckpoints, coordinator.getNumQueuedRequests());
- assertEquals(1, checkpointFutures.stream().filter(Future::isDone).count());
+ assertThat(coordinator.getNumQueuedRequests())
+ .isEqualTo(activeRequests - maxConcurrentCheckpoints);
+ assertThat(checkpointFutures.stream().filter(Future::isDone).count()).isOne();
- assertFalse(savepointFuture.isDone());
- assertEquals(maxConcurrentCheckpoints, coordinator.getNumberOfPendingCheckpoints());
+ assertThat(savepointFuture.isDone()).isFalse();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .isEqualTo(maxConcurrentCheckpoints);
CheckpointProperties props =
coordinator.getPendingCheckpoints().values().iterator().next().getProps();
- assertTrue(props.isSavepoint());
- assertFalse(props.forceCheckpoint());
+ assertThat(props.isSavepoint()).isTrue();
+ assertThat(props.forceCheckpoint()).isFalse();
} finally {
coordinator.shutdown();
}
@@ -3257,7 +3266,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
* coordinators are checkpointed before starting the task checkpoint.
*/
@Test
- public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
+ void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -3327,9 +3336,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Nullable
public CompletableFuture<Integer> triggerCheckpoint(
long checkpointId, long timestamp, Executor executor) throws Exception {
- assertTrue(
- "The coordinator checkpoint should have finished.",
- coordCheckpointDone.get());
+ assertThat(coordCheckpointDone.get())
+ .as("The coordinator checkpoint should have finished.")
+ .isTrue();
// Acknowledge the checkpoint in the master hooks so the task snapshots
// complete before
// the master state snapshot completes.
@@ -3380,9 +3389,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
});
// Verify initial state.
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -3391,28 +3400,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
// now we should have a completed checkpoint
- assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+ assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
// the canceler should be removed now
- assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+ assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
// validate that the relevant tasks got a confirmation message
long checkpointId = checkpointIdRef.get();
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
- assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+ assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+ .isEqualTo(checkpointId);
}
CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
- assertEquals(graph.getJobID(), success.getJobId());
- assertEquals(2, success.getOperatorStates().size());
+ assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+ assertThat(success.getOperatorStates().size()).isEqualTo(2);
checkpointCoordinator.shutdown();
}
@Test
- public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
+ void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();
@@ -3507,9 +3517,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
@Nullable
public CompletableFuture<Integer> triggerCheckpoint(
long checkpointId, long timestamp, Executor executor) throws Exception {
- assertTrue(
- "The coordinator checkpoint should have finished.",
- coordCheckpointDone.get());
+ assertThat(coordCheckpointDone.get())
+ .as("The coordinator checkpoint should have finished.")
+ .isTrue();
// Acknowledge the checkpoint in the master hooks so the task snapshots
// complete before
// the master state snapshot completes.
@@ -3565,12 +3575,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointFuture.isCompletedExceptionally());
- assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
+ assertThat(checkpointFuture).isCompletedExceptionally();
+ assertThat(checkpointCoordinator.getSuccessfulCheckpoints()).isEmpty();
}
@Test
- public void testResetCalledInRegionRecovery() throws Exception {
+ void testResetCalledInRegionRecovery() throws Exception {
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -3579,13 +3589,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
TestResetHook hook = new TestResetHook("id");
checkpointCoordinator.addMasterHook(hook);
- assertFalse(hook.resetCalled);
+ assertThat(hook.resetCalled).isFalse();
checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(Collections.emptySet());
- assertTrue(hook.resetCalled);
+ assertThat(hook.resetCalled).isTrue();
}
@Test
- public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
+ void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3636,16 +3646,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
// OperatorCoordinator should have been notified of the abortion of checkpoint 1.
- assertEquals(Collections.singletonList(checkpointId1), context.getAbortedCheckpoints());
- assertEquals(
- Collections.singletonList(checkpointId2), context.getCompletedCheckpoints());
+ assertThat(context.getAbortedCheckpoints())
+ .isEqualTo(Collections.singletonList(checkpointId1));
+ assertThat(context.getCompletedCheckpoints())
+ .isEqualTo(Collections.singletonList(checkpointId2));
} finally {
checkpointCoordinator.shutdown();
}
}
@Test
- public void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
+ void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3675,12 +3686,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
try {
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- Assert.assertTrue(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
- Assert.assertFalse(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isFalse();
} finally {
checkpointCoordinator.shutdown();
executorService.shutdownNow();
@@ -3688,7 +3699,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
@Test
- public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
+ void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
// Warn: The case is fragile since a specific order of executing the tasks is required to
// reproduce the issue.
JobVertexID jobVertexID = new JobVertexID();
@@ -3743,17 +3754,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- Assert.assertTrue(
- !notificationSequence.contains(trigger + "1")
- || notificationSequence.indexOf(trigger + "1")
- < notificationSequence.indexOf(abort));
+ assertThat(
+ !notificationSequence.contains(trigger + "1")
+ || notificationSequence.indexOf(trigger + "1")
+ < notificationSequence.indexOf(abort))
+ .isTrue();
} finally {
checkpointCoordinator.shutdown();
}
}
@Test
- public void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
+ void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3802,8 +3814,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new CheckpointMetrics(),
new TaskStateSnapshot()),
"localhost");
- assertTrue(result.isDone());
- assertFalse(result.isCompletedExceptionally());
+ assertThat(result).isDone();
+ assertThat(result).isNotCompletedExceptionally();
result = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
@@ -3816,14 +3828,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
abortedCheckpointId,
new CheckpointException(CHECKPOINT_EXPIRED)),
"localhost");
- assertTrue(result.isCompletedExceptionally());
+ assertThat(result).isCompletedExceptionally();
- assertEquals(completedCheckpointId, reportedCheckpointId.get());
+ assertThat(reportedCheckpointId).hasValue(completedCheckpointId);
}
@Test
- public void testBaseLocationsNotInitialized() throws Exception {
- File checkpointDir = tmpFolder.newFolder();
+ void testBaseLocationsNotInitialized() throws Exception {
+ File checkpointDir = TempDirUtils.newFolder(tmpFolder);
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3842,7 +3854,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
FileSystem fs = FileSystem.get(checkpointDir.toURI());
// directory will not be created if checkpointing is disabled
- Assert.assertFalse(fs.exists(jobCheckpointPath));
+ assertThat(fs.exists(jobCheckpointPath)).isFalse();
}
private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph) throws Exception {
@@ -3925,7 +3937,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(1, checkpointCoordinator.getPendingCheckpoints().size());
+ assertThat(checkpointCoordinator.getPendingCheckpoints()).hasSize(1);
long checkpointId =
Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 36ae8f3defe..0cac9429a9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -35,7 +35,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;
@@ -44,11 +45,10 @@ import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.Assertions;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nullable;
@@ -64,34 +64,29 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** Tests for checkpoint coordinator triggering. */
-public class CheckpointCoordinatorTriggeringTest extends TestLogger {
+class CheckpointCoordinatorTriggeringTest extends TestLogger {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir private java.nio.file.Path temporaryFolder;
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() {
manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
}
@Test
- public void testPeriodicTriggering() {
+ void testPeriodicTriggering() {
try {
final long start = System.currentTimeMillis();
@@ -135,7 +130,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// no further calls may come.
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(5, gateway.getTriggeredCheckpoints(attemptID).size());
+ assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
// start another sequence of periodic scheduling
gateway.resetCount();
@@ -152,7 +147,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// no further calls may come
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
manuallyTriggeredScheduledExecutor.triggerAll();
- assertEquals(5, gateway.getTriggeredCheckpoints(attemptID).size());
+ assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
checkpointCoordinator.shutdown();
} catch (Exception e) {
@@ -165,21 +160,21 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
int numTrigger,
long start,
List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> checkpoints) {
- assertEquals(numTrigger, checkpoints.size());
+ assertThat(checkpoints).hasSize(numTrigger);
long lastId = -1;
long lastTs = -1;
for (CheckpointCoordinatorTestingUtils.TriggeredCheckpoint checkpoint : checkpoints) {
- assertTrue(
- "Trigger checkpoint id should be in increase order",
- checkpoint.checkpointId > lastId);
- assertTrue(
- "Trigger checkpoint timestamp should be in increase order",
- checkpoint.timestamp >= lastTs);
- assertTrue(
- "Trigger checkpoint timestamp should be larger than the start time",
- checkpoint.timestamp >= start);
+ assertThat(checkpoint.checkpointId)
+ .as("Trigger checkpoint id should be in increase order")
+ .isGreaterThan(lastId);
+ assertThat(checkpoint.timestamp)
+ .as("Trigger checkpoint timestamp should be in increase order")
+ .isGreaterThanOrEqualTo(lastTs);
+ assertThat(checkpoint.timestamp)
+ .as("Trigger checkpoint timestamp should be larger than the start time")
+ .isGreaterThanOrEqualTo(start);
lastId = checkpoint.checkpointId;
lastTs = checkpoint.timestamp;
@@ -187,7 +182,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
}
@Test
- public void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
+ void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
@@ -234,12 +229,14 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
checkpoint.get();
assertThat(
- gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
- is(CheckpointType.FULL_CHECKPOINT));
+ gateway.getOnlyTriggeredCheckpoint(attemptID)
+ .checkpointOptions
+ .getCheckpointType())
+ .isEqualTo(CheckpointType.FULL_CHECKPOINT);
}
@Test
- public void testTriggeringFullCheckpoints() throws Exception {
+ void testTriggeringFullCheckpoints() throws Exception {
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
@@ -284,12 +281,14 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
checkpoint.get();
assertThat(
- gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
- is(CheckpointType.FULL_CHECKPOINT));
+ gateway.getOnlyTriggeredCheckpoint(attemptID)
+ .checkpointOptions
+ .getCheckpointType())
+ .isEqualTo(CheckpointType.FULL_CHECKPOINT);
}
@Test
- public void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
+ void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
@@ -316,7 +315,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
}
@Test
- public void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
+ void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
@@ -358,7 +357,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
}
@Test
- public void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
+ void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
@@ -400,8 +399,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
}
@Test
- public void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint()
- throws Exception {
+ void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint() throws Exception {
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
@@ -474,7 +472,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
throws Exception {
final CompletableFuture<CompletedCheckpoint> savepointFuture =
checkpointCoordinator.triggerSavepoint(
- temporaryFolder.newFolder().getPath(), SavepointFormatType.CANONICAL);
+ TempDirUtils.newFolder(temporaryFolder).getPath(),
+ SavepointFormatType.CANONICAL);
manuallyTriggeredScheduledExecutor.triggerAll();
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID, savepointId),
@@ -507,7 +506,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
* is triggered.
*/
@Test
- public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+ void testMinTimeBetweenCheckpointsInterval() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -546,7 +545,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// wait until the first checkpoint was triggered
Long firstCallId = gateway.getTriggeredCheckpoints(attemptID).get(0).checkpointId;
- assertEquals(1L, firstCallId.longValue());
+ assertThat(firstCallId).isEqualTo(1L);
AcknowledgeCheckpoint ackMsg =
new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1L);
@@ -567,7 +566,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// wait until the next checkpoint is triggered
Long nextCallId = gateway.getTriggeredCheckpoints(attemptID).get(0).checkpointId;
final long nextCheckpointTime = System.nanoTime();
- assertEquals(2L, nextCallId.longValue());
+ assertThat(nextCallId).isEqualTo(2L);
final long delayMillis = (nextCheckpointTime - ackTime) / 1_000_000;
@@ -586,7 +585,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
}
@Test
- public void testStopPeriodicScheduler() throws Exception {
+ void testStopPeriodicScheduler() throws Exception {
// set up the coordinator and validate the initial state
CheckpointCoordinator checkpointCoordinator = createCheckpointCoordinator();
@@ -599,10 +598,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
} catch (ExecutionException e) {
final Optional<CheckpointException> checkpointExceptionOptional =
ExceptionUtils.findThrowable(e, CheckpointException.class);
- assertTrue(checkpointExceptionOptional.isPresent());
- assertEquals(
- CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
- checkpointExceptionOptional.get().getCheckpointFailureReason());
+ assertThat(checkpointExceptionOptional)
+ .isPresent()
+ .map(CheckpointException::getCheckpointFailureReason)
+ .get()
+ .isEqualTo(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
}
// Not periodic
@@ -613,11 +613,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
null,
false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(onCompletionPromise2.isCompletedExceptionally());
+ assertThat(onCompletionPromise2).isNotCompletedExceptionally();
}
@Test
- public void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
+ void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
// set up the coordinator and validate the initial state
CheckpointCoordinator checkpointCoordinator = createCheckpointCoordinator();
checkpointCoordinator.startCheckpointScheduler();
@@ -632,15 +632,16 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
} catch (ExecutionException e) {
final Optional<CheckpointException> checkpointExceptionOptional =
ExceptionUtils.findThrowable(e, CheckpointException.class);
- assertTrue(checkpointExceptionOptional.isPresent());
- assertEquals(
- CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN,
- checkpointExceptionOptional.get().getCheckpointFailureReason());
+ assertThat(checkpointExceptionOptional)
+ .isPresent()
+ .map(CheckpointException::getCheckpointFailureReason)
+ .get()
+ .isEqualTo(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
}
}
@Test
- public void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
+ void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -662,25 +663,25 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// start a periodic checkpoint first
final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
triggerPeriodicCheckpoint(checkpointCoordinator);
- assertTrue(checkpointCoordinator.isTriggering());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
triggerPeriodicCheckpoint(checkpointCoordinator);
// another trigger before the prior one finished
- assertTrue(checkpointCoordinator.isTriggering());
- assertEquals(1, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).hasSize(1);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(onCompletionPromise1.isCompletedExceptionally());
- assertFalse(onCompletionPromise2.isCompletedExceptionally());
- assertFalse(checkpointCoordinator.isTriggering());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
- assertEquals(2, gateway.getTriggeredCheckpoints(attemptID).size());
+ assertThat(onCompletionPromise1).isNotCompletedExceptionally();
+ assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+ assertThat(checkpointCoordinator.isTriggering()).isFalse();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
+ assertThat(gateway.getTriggeredCheckpoints(attemptID)).hasSize(2);
}
@Test
- public void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
+ void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -706,8 +707,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// start a periodic checkpoint first
final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
triggerNonPeriodicCheckpoint(checkpointCoordinator);
- assertTrue(checkpointCoordinator.isTriggering());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
// another trigger before the prior one finished
final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
@@ -716,21 +717,21 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// another trigger before the first one finished
final CompletableFuture<CompletedCheckpoint> onCompletionPromise3 =
triggerNonPeriodicCheckpoint(checkpointCoordinator);
- assertTrue(checkpointCoordinator.isTriggering());
- assertEquals(2, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).hasSize(2);
manuallyTriggeredScheduledExecutor.triggerAll();
// the first triggered checkpoint fails by design through UnstableCheckpointIDCounter
- assertTrue(onCompletionPromise1.isCompletedExceptionally());
- assertFalse(onCompletionPromise2.isCompletedExceptionally());
- assertFalse(onCompletionPromise3.isCompletedExceptionally());
- assertFalse(checkpointCoordinator.isTriggering());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
- assertEquals(2, gateway.getTriggeredCheckpoints(attemptID).size());
+ assertThat(onCompletionPromise1).isCompletedExceptionally();
+ assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+ assertThat(onCompletionPromise3).isNotCompletedExceptionally();
+ assertThat(checkpointCoordinator.isTriggering()).isFalse();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
+ assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(2);
}
@Test
- public void testTriggerCheckpointRequestCancelled() throws Exception {
+ void testTriggerCheckpointRequestCancelled() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -756,11 +757,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// checkpoint trigger will not finish since master hook checkpoint is not finished yet
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
// trigger cancellation
manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
- assertTrue(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
try {
onCompletionPromise.get();
@@ -768,24 +769,25 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
} catch (ExecutionException e) {
final Optional<CheckpointException> checkpointExceptionOptional =
ExceptionUtils.findThrowable(e, CheckpointException.class);
- assertTrue(checkpointExceptionOptional.isPresent());
- assertEquals(
- CheckpointFailureReason.CHECKPOINT_EXPIRED,
- checkpointExceptionOptional.get().getCheckpointFailureReason());
+ assertThat(checkpointExceptionOptional)
+ .isPresent()
+ .map(CheckpointException::getCheckpointFailureReason)
+ .get()
+ .isEqualTo(CheckpointFailureReason.CHECKPOINT_EXPIRED);
}
// continue triggering
masterHookCheckpointFuture.complete("finish master hook");
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isFalse();
// it doesn't really trigger task manager to do checkpoint
- assertEquals(0, gateway.getTriggeredCheckpoints(attemptID).size());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
}
@Test
- public void testTriggerCheckpointInitializationFailed() throws Exception {
+ void testTriggerCheckpointInitializationFailed() throws Exception {
// set up the coordinator and validate the initial state
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
@@ -796,8 +798,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
checkpointCoordinator.startCheckpointScheduler();
final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
triggerPeriodicCheckpoint(checkpointCoordinator);
- assertTrue(checkpointCoordinator.isTriggering());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
manuallyTriggeredScheduledExecutor.triggerAll();
try {
@@ -806,25 +808,24 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
} catch (ExecutionException e) {
final Optional<CheckpointException> checkpointExceptionOptional =
ExceptionUtils.findThrowable(e, CheckpointException.class);
- assertTrue(checkpointExceptionOptional.isPresent());
- assertEquals(
- CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
- checkpointExceptionOptional.get().getCheckpointFailureReason());
+ assertThat(checkpointExceptionOptional.isPresent()).isTrue();
+ assertThat(checkpointExceptionOptional.get().getCheckpointFailureReason())
+ .isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
}
- assertFalse(checkpointCoordinator.isTriggering());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(checkpointCoordinator.isTriggering()).isFalse();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
triggerPeriodicCheckpoint(checkpointCoordinator);
- assertTrue(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(onCompletionPromise2.isCompletedExceptionally());
- assertFalse(checkpointCoordinator.isTriggering());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+ assertThat(checkpointCoordinator.isTriggering()).isFalse();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
}
@Test
- public void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
+ void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -850,13 +851,13 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
// checkpoint trigger will not finish since master hook checkpoint is not finished yet
manuallyTriggeredScheduledExecutor.triggerAll();
- assertTrue(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isTrue();
// continue triggering
masterHookCheckpointFuture.completeExceptionally(new Exception("by design"));
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointCoordinator.isTriggering());
+ assertThat(checkpointCoordinator.isTriggering()).isFalse();
try {
onCompletionPromise.get();
@@ -864,19 +865,20 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
} catch (ExecutionException e) {
final Optional<CheckpointException> checkpointExceptionOptional =
ExceptionUtils.findThrowable(e, CheckpointException.class);
- assertTrue(checkpointExceptionOptional.isPresent());
- assertEquals(
- CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
- checkpointExceptionOptional.get().getCheckpointFailureReason());
+ assertThat(checkpointExceptionOptional)
+ .isPresent()
+ .map(CheckpointException::getCheckpointFailureReason)
+ .get()
+ .isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
}
// it doesn't really trigger task manager to do checkpoint
- assertEquals(0, gateway.getTriggeredCheckpoints(attemptID).size());
- assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+ assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
+ assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
}
/** This test only fails eventually. */
@Test
- public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
+ void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
final CheckpointCoordinator checkpointCoordinator =
@@ -920,9 +922,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
secondCheckpoint.get();
fail("Expected the second checkpoint to fail.");
} catch (ExecutionException ee) {
- assertThat(
- ExceptionUtils.stripExecutionException(ee),
- instanceOf(CheckpointException.class));
+ assertThat(ExceptionUtils.stripExecutionException(ee))
+ .isInstanceOf(CheckpointException.class);
}
} finally {
checkpointCoordinator.shutdown();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index a2084c6616a..2297c51881c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -17,24 +17,21 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.io.IOException;
+import org.junit.jupiter.api.Test;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
import static org.apache.flink.runtime.checkpoint.CheckpointProperties.forCheckpoint;
import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the checkpoint failure manager. */
-public class CheckpointFailureManagerTest extends TestLogger {
+class CheckpointFailureManagerTest extends TestLogger {
@Test
- public void testIgnoresPastCheckpoints() throws IOException, JobException {
+ void testIgnoresPastCheckpoints() {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -50,11 +47,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 3L);
failureManager.handleJobLevelCheckpointException(
checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 4L);
- assertEquals(0, callback.getInvokeCounter());
+ assertThat(callback.getInvokeCounter()).isZero();
}
@Test
- public void testContinuousFailure() throws IOException, JobException {
+ void testContinuousFailure() {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -77,11 +74,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
checkpointProperties,
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
4);
- assertEquals(1, callback.getInvokeCounter());
+ assertThat(callback.getInvokeCounter()).isOne();
}
@Test
- public void testBreakContinuousFailure() throws IOException, JobException {
+ void testBreakContinuousFailure() {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -106,11 +103,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
failureManager.handleJobLevelCheckpointException(
checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 5);
- assertEquals(0, callback.getInvokeCounter());
+ assertThat(callback.getInvokeCounter()).isZero();
}
@Test
- public void testTotalCountValue() throws IOException, JobException {
+ void testTotalCountValue() {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
CheckpointFailureManager failureManager = new CheckpointFailureManager(0, callback);
@@ -121,12 +118,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
// IO_EXCEPTION, CHECKPOINT_DECLINED, FINALIZE_CHECKPOINT_FAILURE, CHECKPOINT_EXPIRED and
// CHECKPOINT_ASYNC_EXCEPTION
- assertEquals(5, callback.getInvokeCounter());
+ assertThat(callback.getInvokeCounter()).isEqualTo(5);
}
@Test
- public void testIgnoreOneCheckpointRepeatedlyCountMultiTimes()
- throws IOException, JobException {
+ void testIgnoreOneCheckpointRepeatedlyCountMultiTimes() {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -151,7 +147,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
checkpointProperties,
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
2);
- assertEquals(0, callback.getInvokeCounter());
+ assertThat(callback.getInvokeCounter()).isZero();
}
/** A failure handler callback for testing. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
index 59933e856b2..97cfa726682 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
@@ -70,7 +70,7 @@ abstract class CheckpointIDCounterTestBase {
try {
counter.start();
- assertThat(counter.getAndIncrement()).isEqualTo(1);
+ assertThat(counter.getAndIncrement()).isOne();
assertThat(counter.get()).isEqualTo(2);
assertThat(counter.getAndIncrement()).isEqualTo(2);
assertThat(counter.get()).isEqualTo(3);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index fdc92585e1d..ebcfc0f5afe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -34,12 +34,12 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.SerializableObject;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Arrays;
import java.util.Collections;
@@ -51,26 +51,23 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** Tests concerning the restoring of state from a checkpoint to the task executions. */
-public class CheckpointStateRestoreTest {
+class CheckpointStateRestoreTest {
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
/** Tests that on restore the task state is reset for each stateful task. */
@Test
- public void testSetState() {
+ void testSetState() {
try {
KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 0);
List<SerializableObject> testStates =
@@ -159,20 +156,24 @@ public class CheckpointStateRestoreTest {
graph.getJobID(), statelessExec2.getAttemptId(), checkpointId),
TASK_MANAGER_LOCATION_INFO);
- assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
- assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertThat(coord.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+ assertThat(coord.getNumberOfPendingCheckpoints()).isZero();
// let the coordinator inject the state
- assertTrue(
- coord.restoreLatestCheckpointedStateToAll(
- new HashSet<>(Arrays.asList(stateful, stateless)), false));
+ assertThat(
+ coord.restoreLatestCheckpointedStateToAll(
+ new HashSet<>(Arrays.asList(stateful, stateless)), false))
+ .isTrue();
// verify that each stateful vertex got the state
- assertEquals(subtaskStates, statefulExec1.getTaskRestore().getTaskStateSnapshot());
- assertEquals(subtaskStates, statefulExec2.getTaskRestore().getTaskStateSnapshot());
- assertEquals(subtaskStates, statefulExec3.getTaskRestore().getTaskStateSnapshot());
- assertNull(statelessExec1.getTaskRestore());
- assertNull(statelessExec2.getTaskRestore());
+ assertThat(statefulExec1.getTaskRestore().getTaskStateSnapshot())
+ .isEqualTo(subtaskStates);
+ assertThat(statefulExec2.getTaskRestore().getTaskStateSnapshot())
+ .isEqualTo(subtaskStates);
+ assertThat(statefulExec3.getTaskRestore().getTaskStateSnapshot())
+ .isEqualTo(subtaskStates);
+ assertThat(statelessExec1.getTaskRestore()).isNull();
+ assertThat(statelessExec2.getTaskRestore()).isNull();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -180,14 +181,14 @@ public class CheckpointStateRestoreTest {
}
@Test
- public void testNoCheckpointAvailable() {
+ void testNoCheckpointAvailable() {
try {
CheckpointCoordinator coord =
new CheckpointCoordinatorBuilder().build(EXECUTOR_RESOURCE.getExecutor());
final boolean restored =
coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
- assertFalse(restored);
+ assertThat(restored).isFalse();
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -200,7 +201,7 @@ public class CheckpointStateRestoreTest {
* <p>The flag only applies for state that is part of the checkpoint.
*/
@Test
- public void testNonRestoredState() throws Exception {
+ void testNonRestoredState() throws Exception {
// --- (1) Create tasks to restore checkpoint with ---
JobVertexID jobVertexId1 = new JobVertexID();
JobVertexID jobVertexId2 = new JobVertexID();
@@ -254,8 +255,8 @@ public class CheckpointStateRestoreTest {
coord.getCheckpointStore()
.addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
- assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
- assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, true));
+ assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
+ assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, true)).isTrue();
// --- (3) JobVertex missing for task state that is part of the checkpoint ---
JobVertexID newJobVertexID = new JobVertexID();
@@ -287,7 +288,7 @@ public class CheckpointStateRestoreTest {
// (i) Allow non restored state (should succeed)
final boolean restored = coord.restoreLatestCheckpointedStateToAll(tasks, true);
- assertTrue(restored);
+ assertThat(restored).isTrue();
// (ii) Don't allow non restored state (should fail)
try {