You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/03 14:52:08 UTC

[flink] branch release-1.14 updated (bbfe652 -> 5914139)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from bbfe652  [FLINK-24813][table-planner] Improve ImplicitTypeConversionITCase
     new f6fa4ff  [hotfix] Make IncrementalLocalKeyedStateHandle serializable by copying sharedStateHandleIDs
     new 5914139  [hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../state/IncrementalLocalKeyedStateHandle.java    |  3 +-
 .../runtime/state/TaskLocalStateStoreImplTest.java | 55 ++++++++++++++--------
 2 files changed, 38 insertions(+), 20 deletions(-)

[flink] 02/02: [hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 59141399bb2eff1e40eacf80decffc743f60cb36
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Nov 18 10:01:13 2021 +0100

    [hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest
    
    Mockito's spy prevented TaskLocalStateStoreImplTest from succeeding because a mock
    cannot be serialized.
---
 .../runtime/state/TaskLocalStateStoreImplTest.java | 55 ++++++++++++++--------
 1 file changed, 36 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 9af844b..ce57073 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.After;
@@ -31,7 +32,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -39,9 +39,11 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-public class TaskLocalStateStoreImplTest {
+/** Test for the {@link TaskLocalStateStoreImpl}. */
+public class TaskLocalStateStoreImplTest extends TestLogger {
 
     private SortedMap<Long, TaskStateSnapshot> internalSnapshotMap;
     private Object internalLock;
@@ -113,7 +115,7 @@ public class TaskLocalStateStoreImplTest {
             Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
         }
 
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
 
         checkStoredAsExpected(taskStateSnapshots, 0, chkCount);
 
@@ -126,7 +128,7 @@ public class TaskLocalStateStoreImplTest {
 
         final int chkCount = 3;
 
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
 
         // test retrieve with pruning
         taskLocalStateStore.pruneMatchingCheckpoints((long chk) -> chk != chkCount - 1);
@@ -144,7 +146,7 @@ public class TaskLocalStateStoreImplTest {
 
         final int chkCount = 3;
         final int confirmed = chkCount - 1;
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
         taskLocalStateStore.confirmCheckpoint(confirmed);
         checkPrunedAndDiscarded(taskStateSnapshots, 0, confirmed);
         checkStoredAsExpected(taskStateSnapshots, confirmed, chkCount);
@@ -156,7 +158,7 @@ public class TaskLocalStateStoreImplTest {
 
         final int chkCount = 4;
         final int aborted = chkCount - 2;
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
         taskLocalStateStore.abortCheckpoint(aborted);
         checkPrunedAndDiscarded(taskStateSnapshots, aborted, aborted + 1);
         checkStoredAsExpected(taskStateSnapshots, 0, aborted);
@@ -170,35 +172,34 @@ public class TaskLocalStateStoreImplTest {
     public void dispose() throws Exception {
         final int chkCount = 3;
         final int confirmed = chkCount - 1;
-        List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
+        List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount);
         taskLocalStateStore.confirmCheckpoint(confirmed);
         taskLocalStateStore.dispose();
 
         checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount);
     }
 
-    private void checkStoredAsExpected(List<TaskStateSnapshot> history, int start, int end)
-            throws Exception {
+    private void checkStoredAsExpected(List<TestingTaskStateSnapshot> history, int start, int end) {
         for (int i = start; i < end; ++i) {
-            TaskStateSnapshot expected = history.get(i);
-            Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
-            Mockito.verify(expected, Mockito.never()).discardState();
+            TestingTaskStateSnapshot expected = history.get(i);
+            assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
+            assertFalse(expected.isDiscarded());
         }
     }
 
-    private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int start, int end)
-            throws Exception {
+    private void checkPrunedAndDiscarded(
+            List<TestingTaskStateSnapshot> history, int start, int end) {
         for (int i = start; i < end; ++i) {
             Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
-            Mockito.verify(history.get(i)).discardState();
+            assertTrue(history.get(i).isDiscarded());
         }
     }
 
-    private List<TaskStateSnapshot> storeStates(int count) {
-        List<TaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count);
+    private List<TestingTaskStateSnapshot> storeStates(int count) {
+        List<TestingTaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count);
         for (int i = 0; i < count; ++i) {
             OperatorID operatorID = new OperatorID();
-            TaskStateSnapshot taskStateSnapshot = spy(new TaskStateSnapshot());
+            TestingTaskStateSnapshot taskStateSnapshot = new TestingTaskStateSnapshot();
             OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().build();
             taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
             taskLocalStateStore.storeLocalState(i, taskStateSnapshot);
@@ -206,4 +207,20 @@ public class TaskLocalStateStoreImplTest {
         }
         return taskStateSnapshots;
     }
+
+    private static final class TestingTaskStateSnapshot extends TaskStateSnapshot {
+        private static final long serialVersionUID = 2046321877379917040L;
+
+        private boolean isDiscarded = false;
+
+        @Override
+        public void discardState() throws Exception {
+            super.discardState();
+            isDiscarded = true;
+        }
+
+        boolean isDiscarded() {
+            return isDiscarded;
+        }
+    }
 }

[flink] 01/02: [hotfix] Make IncrementalLocalKeyedStateHandle serializable by copying sharedStateHandleIDs

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6fa4ff53a6d31430bddf89b34dfe2c7192ee55d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Oct 19 16:58:03 2021 +0200

    [hotfix] Make IncrementalLocalKeyedStateHandle serializable by copying sharedStateHandleIDs
---
 .../apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
index 615e348..e732394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.ExceptionUtils;
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 
+import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
@@ -60,7 +61,7 @@ public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle
         this.backendIdentifier = backendIdentifier;
         this.checkpointId = checkpointId;
         this.metaDataState = metaDataState;
-        this.sharedStateHandleIDs = sharedStateHandleIDs;
+        this.sharedStateHandleIDs = new HashSet<>(sharedStateHandleIDs);
     }
 
     @Nonnull