You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/06/30 12:08:33 UTC

[flink] branch master updated: [FLINK-27031][runtime] Assign even empty old state to the task if the upstream has output states since the task should be prepared to filter these old incoming states from upstream

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d98a34be120 [FLINK-27031][runtime] Assign even empty old state to the task if the upstream has output states since the task should be prepared to filter these old incoming states from upstream
d98a34be120 is described below

commit d98a34be120c1906dc3f01e9c48ab847a75b5822
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed May 11 18:21:27 2022 +0200

    [FLINK-27031][runtime] Assign even empty old state to the task if the upstream has output states since the task should be prepared to filter these old incoming states from upstream
---
 .../checkpoint/StateAssignmentOperation.java       |  7 +++-
 .../runtime/checkpoint/TaskStateAssignment.java    |  5 +++
 .../checkpoint/StateAssignmentOperationTest.java   | 41 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index dc279727478..681e0b18df1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -143,7 +143,12 @@ public class StateAssignmentOperation {
 
         // actually assign the state
         for (TaskStateAssignment stateAssignment : vertexAssignments.values()) {
-            if (stateAssignment.hasNonFinishedState || stateAssignment.isFullyFinished) {
+            // If upstream has output states, even the empty task state should be assigned for the
+            // current task in order to notify this task that the old states will send to it which
+            // likely should be filtered.
+            if (stateAssignment.hasNonFinishedState
+                    || stateAssignment.isFullyFinished
+                    || stateAssignment.hasUpstreamOutputStates()) {
                 assignTaskStateToExecutionJobVertices(stateAssignment);
             }
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java
index 45aef7330f7..75ffc71d058 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java
@@ -201,6 +201,11 @@ class TaskStateAssignment {
                 .build();
     }
 
+    public boolean hasUpstreamOutputStates() {
+        return Arrays.stream(getUpstreamAssignments())
+                .anyMatch(assignment -> assignment.hasOutputState);
+    }
+
     private InflightDataGateOrPartitionRescalingDescriptor log(
             InflightDataGateOrPartitionRescalingDescriptor descriptor, int subtask, int partition) {
         LOG.debug(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 7aa742e211c..f9cb551c5ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -87,6 +87,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /** Tests to verify state assignment operation. */
 public class StateAssignmentOperationTest extends TestLogger {
@@ -744,6 +745,46 @@ public class StateAssignmentOperationTest extends TestLogger {
                         .getInputRescalingDescriptor());
     }
 
+    @Test
+    public void testOnlyUpstreamChannelStateAssignment()
+            throws JobException, JobExecutionException {
+        // given: There is only input channel state for one subpartition.
+        List<OperatorID> operatorIds = buildOperatorIds(2);
+        Map<OperatorID, OperatorState> states = new HashMap<>();
+        Random random = new Random();
+        OperatorState upstreamState = new OperatorState(operatorIds.get(0), 2, MAX_P);
+        OperatorSubtaskState state =
+                OperatorSubtaskState.builder()
+                        .setResultSubpartitionState(
+                                new StateObjectCollection<>(
+                                        asList(
+                                                createNewResultSubpartitionStateHandle(10, random),
+                                                createNewResultSubpartitionStateHandle(
+                                                        10, random))))
+                        .build();
+        upstreamState.putState(0, state);
+
+        states.put(operatorIds.get(0), upstreamState);
+
+        Map<OperatorID, ExecutionJobVertex> vertices =
+                buildVertices(operatorIds, 3, RANGE, ROUND_ROBIN);
+
+        // when: States are assigned.
+        new StateAssignmentOperation(0, new HashSet<>(vertices.values()), states, false)
+                .assignStates();
+
+        // then: All subtask have not null TaskRestore information(even if it is empty).
+        ExecutionJobVertex jobVertexWithFinishedOperator = vertices.get(operatorIds.get(0));
+        for (ExecutionVertex task : jobVertexWithFinishedOperator.getTaskVertices()) {
+            assertNotNull(task.getCurrentExecutionAttempt().getTaskRestore());
+        }
+
+        ExecutionJobVertex jobVertexWithoutFinishedOperator = vertices.get(operatorIds.get(1));
+        for (ExecutionVertex task : jobVertexWithoutFinishedOperator.getTaskVertices()) {
+            assertNotNull(task.getCurrentExecutionAttempt().getTaskRestore());
+        }
+    }
+
     @Test
     public void testStateWithFullyFinishedOperators() throws JobException, JobExecutionException {
         List<OperatorID> operatorIds = buildOperatorIds(2);