You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/03 14:52:10 UTC

[flink] 02/02: [hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 59141399bb2eff1e40eacf80decffc743f60cb36
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Nov 18 10:01:13 2021 +0100

    [hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest
    
    Mockito's spy prevented TaskLocalStateStoreImplTest from succeeding because a mock
    cannot be serialized.
---
 .../runtime/state/TaskLocalStateStoreImplTest.java | 55 ++++++++++++++--------
 1 file changed, 36 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 9af844b..ce57073 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.After;
@@ -31,7 +32,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -39,9 +39,11 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-public class TaskLocalStateStoreImplTest {
+/** Test for the {@link TaskLocalStateStoreImpl}. */
+public class TaskLocalStateStoreImplTest extends TestLogger {
 
     private SortedMap<Long, TaskStateSnapshot> internalSnapshotMap;
     private Object internalLock;
@@ -113,7 +115,7 @@ public class TaskLocalStateStoreImplTest {
             Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
         }
 
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
 
         checkStoredAsExpected(taskStateSnapshots, 0, chkCount);
 
@@ -126,7 +128,7 @@ public class TaskLocalStateStoreImplTest {
 
         final int chkCount = 3;
 
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
 
         // test retrieve with pruning
         taskLocalStateStore.pruneMatchingCheckpoints((long chk) -> chk != chkCount - 1);
@@ -144,7 +146,7 @@ public class TaskLocalStateStoreImplTest {
 
         final int chkCount = 3;
         final int confirmed = chkCount - 1;
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
         taskLocalStateStore.confirmCheckpoint(confirmed);
         checkPrunedAndDiscarded(taskStateSnapshots, 0, confirmed);
         checkStoredAsExpected(taskStateSnapshots, confirmed, chkCount);
@@ -156,7 +158,7 @@ public class TaskLocalStateStoreImplTest {
 
         final int chkCount = 4;
         final int aborted = chkCount - 2;
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
         taskLocalStateStore.abortCheckpoint(aborted);
         checkPrunedAndDiscarded(taskStateSnapshots, aborted, aborted + 1);
         checkStoredAsExpected(taskStateSnapshots, 0, aborted);
@@ -170,35 +172,34 @@ public class TaskLocalStateStoreImplTest {
     public void dispose() throws Exception {
         final int chkCount = 3;
         final int confirmed = chkCount - 1;
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
         taskLocalStateStore.confirmCheckpoint(confirmed);
         taskLocalStateStore.dispose();
 
         checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount);
     }
 
-    private void checkStoredAsExpected(List<TaskStateSnapshot> history, int start, int end)
-            throws Exception {
+    private void checkStoredAsExpected(List<TestingTaskStateSnapshot> history, int start, int end) {
         for (int i = start; i < end; ++i) {
-            TaskStateSnapshot expected = history.get(i);
-            Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
-            Mockito.verify(expected, Mockito.never()).discardState();
+            TestingTaskStateSnapshot expected = history.get(i);
+            assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
+            assertFalse(expected.isDiscarded());
         }
     }
 
-    private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int start, int end)
-            throws Exception {
+    private void checkPrunedAndDiscarded(
+            List<TestingTaskStateSnapshot> history, int start, int end) {
         for (int i = start; i < end; ++i) {
             Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
-            Mockito.verify(history.get(i)).discardState();
+            assertTrue(history.get(i).isDiscarded());
         }
     }
 
-    private List<TaskStateSnapshot> storeStates(int count) {
-        List<TaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count);
+    private List<TestingTaskStateSnapshot> storeStates(int count) {
+        List<TestingTaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count);
         for (int i = 0; i < count; ++i) {
             OperatorID operatorID = new OperatorID();
-            TaskStateSnapshot taskStateSnapshot = spy(new TaskStateSnapshot());
+            TestingTaskStateSnapshot taskStateSnapshot = new TestingTaskStateSnapshot();
             OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().build();
             taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
             taskLocalStateStore.storeLocalState(i, taskStateSnapshot);
@@ -206,4 +207,20 @@ public class TaskLocalStateStoreImplTest {
         }
         return taskStateSnapshots;
     }
+
+    private static final class TestingTaskStateSnapshot extends TaskStateSnapshot {
+        private static final long serialVersionUID = 2046321877379917040L;
+
+        private boolean isDiscarded = false;
+
+        @Override
+        public void discardState() throws Exception {
+            super.discardState();
+            isDiscarded = true;
+        }
+
+        boolean isDiscarded() {
+            return isDiscarded;
+        }
+    }
 }