You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/03 14:52:10 UTC
[flink] 02/02: [hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 59141399bb2eff1e40eacf80decffc743f60cb36
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Nov 18 10:01:13 2021 +0100
[hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest
Mockito's spy prevented TaskLocalStateStoreImplTest from succeeding because a mock
cannot be serialized.
---
.../runtime/state/TaskLocalStateStoreImplTest.java | 55 ++++++++++++++--------
1 file changed, 36 insertions(+), 19 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 9af844b..ce57073 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.junit.After;
@@ -31,7 +32,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
import java.io.File;
import java.util.ArrayList;
@@ -39,9 +39,11 @@ import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
-import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-public class TaskLocalStateStoreImplTest {
+/** Test for the {@link TaskLocalStateStoreImpl}. */
+public class TaskLocalStateStoreImplTest extends TestLogger {
private SortedMap<Long, TaskStateSnapshot> internalSnapshotMap;
private Object internalLock;
@@ -113,7 +115,7 @@ public class TaskLocalStateStoreImplTest {
Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
}
- List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+ List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
checkStoredAsExpected(taskStateSnapshots, 0, chkCount);
@@ -126,7 +128,7 @@ public class TaskLocalStateStoreImplTest {
final int chkCount = 3;
- List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+ List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
// test retrieve with pruning
taskLocalStateStore.pruneMatchingCheckpoints((long chk) -> chk != chkCount - 1);
@@ -144,7 +146,7 @@ public class TaskLocalStateStoreImplTest {
final int chkCount = 3;
final int confirmed = chkCount - 1;
- List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+ List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
taskLocalStateStore.confirmCheckpoint(confirmed);
checkPrunedAndDiscarded(taskStateSnapshots, 0, confirmed);
checkStoredAsExpected(taskStateSnapshots, confirmed, chkCount);
@@ -156,7 +158,7 @@ public class TaskLocalStateStoreImplTest {
final int chkCount = 4;
final int aborted = chkCount - 2;
- List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+ List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
taskLocalStateStore.abortCheckpoint(aborted);
checkPrunedAndDiscarded(taskStateSnapshots, aborted, aborted + 1);
checkStoredAsExpected(taskStateSnapshots, 0, aborted);
@@ -170,35 +172,34 @@ public class TaskLocalStateStoreImplTest {
public void dispose() throws Exception {
final int chkCount = 3;
final int confirmed = chkCount - 1;
- List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+ List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
taskLocalStateStore.confirmCheckpoint(confirmed);
taskLocalStateStore.dispose();
checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount);
}
- private void checkStoredAsExpected(List<TaskStateSnapshot> history, int start, int end)
- throws Exception {
+ private void checkStoredAsExpected(List<TestingTaskStateSnapshot> history, int start, int end) {
for (int i = start; i < end; ++i) {
- TaskStateSnapshot expected = history.get(i);
- Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
- Mockito.verify(expected, Mockito.never()).discardState();
+ TestingTaskStateSnapshot expected = history.get(i);
+ assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
+ assertFalse(expected.isDiscarded());
}
}
- private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int start, int end)
- throws Exception {
+ private void checkPrunedAndDiscarded(
+ List<TestingTaskStateSnapshot> history, int start, int end) {
for (int i = start; i < end; ++i) {
Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
- Mockito.verify(history.get(i)).discardState();
+ assertTrue(history.get(i).isDiscarded());
}
}
- private List<TaskStateSnapshot> storeStates(int count) {
- List<TaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count);
+ private List<TestingTaskStateSnapshot> storeStates(int count) {
+ List<TestingTaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count);
for (int i = 0; i < count; ++i) {
OperatorID operatorID = new OperatorID();
- TaskStateSnapshot taskStateSnapshot = spy(new TaskStateSnapshot());
+ TestingTaskStateSnapshot taskStateSnapshot = new TestingTaskStateSnapshot();
OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().build();
taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
taskLocalStateStore.storeLocalState(i, taskStateSnapshot);
@@ -206,4 +207,20 @@ public class TaskLocalStateStoreImplTest {
}
return taskStateSnapshots;
}
+
+ private static final class TestingTaskStateSnapshot extends TaskStateSnapshot {
+ private static final long serialVersionUID = 2046321877379917040L;
+
+ private boolean isDiscarded = false;
+
+ @Override
+ public void discardState() throws Exception {
+ super.discardState();
+ isDiscarded = true;
+ }
+
+ boolean isDiscarded() {
+ return isDiscarded;
+ }
+ }
}