You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/18 13:56:14 UTC

[GitHub] [flink] kristoffSC opened a new pull request, #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

kristoffSC opened a new pull request, #21101:
URL: https://github.com/apache/flink/pull/21101

   ## What is the purpose of the change
   
   Fixing [FLINK-29627](https://issues.apache.org/jira/browse/FLINK-29627) where recovery more than one Committable causesed `IllegalStateException` and prevents cluster to start. 
   
   Implemented fix is based on merging committables from same subtaskId during recovery for Sink V2 architecture.
   
   ## Brief change log
   
     - Enhance `CheckpointSimpleVersionedSerializer::deserialize` by calling `SubtaskCommittableManager::merge` for committables for same subtaskId.
   
   ## Verifying this change
   
   - Add new test `Committablecollectorserializertest::testCommittablesForSameSubtaskIdV2SerDe` to verify deserialization of multiple committables for the same subtaskId.
   - Enhanced `CommitterOperatorTest::testStateRestore` to include committables for same subtaskId.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**)
     - The serializers: (**yes**)
     - The runtime per-record code paths (performance sensitive): (**no**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
     - The S3 file system connector: (**no**)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**no**)
     - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul merged pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recovery more than 1 committable.

Posted by GitBox <gi...@apache.org>.
fapaul merged PR #21101:
URL: https://github.com/apache/flink/pull/21101


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] kristoffSC commented on pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

Posted by GitBox <gi...@apache.org>.
kristoffSC commented on PR #21101:
URL: https://github.com/apache/flink/pull/21101#issuecomment-1283997870

   Backports
   1.15 - https://github.com/apache/flink/pull/21113
   1.16 - https://github.com/apache/flink/pull/21115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21101:
URL: https://github.com/apache/flink/pull/21101#issuecomment-1282456691

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9cab991d2083342ec5a4702b4fe7a848ea747ca8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9cab991d2083342ec5a4702b4fe7a848ea747ca8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9cab991d2083342ec5a4702b4fe7a848ea747ca8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #21101:
URL: https://github.com/apache/flink/pull/21101#discussion_r998408316


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java:
##########
@@ -134,55 +190,78 @@ void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManage
                 .isEqualTo(committableManager.getCheckpointId());
     }
 
+    /**
+     * @param assertMessageHeading prefix used for assertion fail message.
+     * @param subtaskId subtaskId to get {@link SubtaskCommittableManager} from {@link
+     *     CheckpointCommittableManagerImpl}
+     * @param expectedNumberOfSubtasks expected number of subtasks for {@link CommittableSummary}
+     * @param committableCollector collector to get {@link CheckpointCommittableManager}s from.
+     * @param expectedCommittableSize expected number of {@link CheckpointCommittableManager}.
+     * @param expectedNumberOfPendingRequestsPerCommittable expected number of pending request per
+     *     {@link SubtaskCommittableManager}.
+     */
     private void assertCommittableCollector(
             String assertMessageHeading,
             int subtaskId,
-            int numberOfSubtasks,
-            CommittableCollector<Integer> committableCollector) {
+            int expectedNumberOfSubtasks,
+            CommittableCollector<Integer> committableCollector,
+            int expectedCommittableSize,
+            Object[] expectedNumberOfPendingRequestsPerCommittable) {
 
         assertAll(
                 assertMessageHeading,
                 () -> {
                     final Collection<CheckpointCommittableManagerImpl<Integer>>
                             checkpointCommittables =
                                     committableCollector.getCheckpointCommittables();
-                    assertThat(checkpointCommittables).hasSize(2);
+                    assertThat(checkpointCommittables).hasSize(expectedCommittableSize);
 
                     final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator =
                             checkpointCommittables.iterator();
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager1 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 =
-                            checkpointCommittableManager1.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager1.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint1
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(1);
-                    assertThat(subtaskCommittableManagerCheckpoint1.getSubtaskId())
-                            .isEqualTo(subtaskId);
-
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager2 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 =
-                            checkpointCommittableManager2.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager2.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint2
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(2);
-                    assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId())
-                            .isEqualTo(subtaskId);
+
+                    int i = 0;
+                    while (committablesIterator.hasNext()) {
+                        final CheckpointCommittableManagerImpl<Integer>
+                                checkpointCommittableManager = committablesIterator.next();
+
+                        final SubtaskCommittableManager<Integer> subtaskCommittableManager =
+                                checkpointCommittableManager.getSubtaskCommittableManager(
+                                        subtaskId);
+
+                        SinkV2Assertions.assertThat(checkpointCommittableManager.getSummary())
+                                .hasSubtaskId(subtaskId)
+                                .hasNumberOfSubtasks(expectedNumberOfSubtasks);
+
+                        assertPendingRequests(
+                                subtaskCommittableManager,
+                                expectedNumberOfPendingRequestsPerCommittable[i++]);
+
+                        assertThat(subtaskCommittableManager.getSubtaskId()).isEqualTo(subtaskId);
+                    }
                 });
     }
+
+    private void assertPendingRequests(
+            SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint,
+            Object expectedPendingRequestCount) {
+
+        if (expectedPendingRequestCount instanceof Integer) {
+            assertThat(
+                            subtaskCommittableManagerCheckpoint
+                                    .getPendingRequests()
+                                    .map(CommitRequestImpl::getCommittable)
+                                    .collect(Collectors.toList()))
+                    .containsExactly((Integer) expectedPendingRequestCount);
+        } else if (expectedPendingRequestCount instanceof Integer[]) {

Review Comment:
   Can't you use `containsExactlyElementsOf` and always pass a list of integers? 



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java:
##########
@@ -134,55 +190,78 @@ void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManage
                 .isEqualTo(committableManager.getCheckpointId());
     }
 
+    /**
+     * @param assertMessageHeading prefix used for assertion fail message.
+     * @param subtaskId subtaskId to get {@link SubtaskCommittableManager} from {@link
+     *     CheckpointCommittableManagerImpl}
+     * @param expectedNumberOfSubtasks expected number of subtasks for {@link CommittableSummary}
+     * @param committableCollector collector to get {@link CheckpointCommittableManager}s from.
+     * @param expectedCommittableSize expected number of {@link CheckpointCommittableManager}.
+     * @param expectedNumberOfPendingRequestsPerCommittable expected number of pending request per
+     *     {@link SubtaskCommittableManager}.
+     */
     private void assertCommittableCollector(
             String assertMessageHeading,
             int subtaskId,
-            int numberOfSubtasks,
-            CommittableCollector<Integer> committableCollector) {
+            int expectedNumberOfSubtasks,
+            CommittableCollector<Integer> committableCollector,
+            int expectedCommittableSize,
+            Object[] expectedNumberOfPendingRequestsPerCommittable) {
 
         assertAll(
                 assertMessageHeading,
                 () -> {
                     final Collection<CheckpointCommittableManagerImpl<Integer>>
                             checkpointCommittables =
                                     committableCollector.getCheckpointCommittables();
-                    assertThat(checkpointCommittables).hasSize(2);
+                    assertThat(checkpointCommittables).hasSize(expectedCommittableSize);
 
                     final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator =
                             checkpointCommittables.iterator();
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager1 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 =
-                            checkpointCommittableManager1.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager1.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint1
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(1);
-                    assertThat(subtaskCommittableManagerCheckpoint1.getSubtaskId())
-                            .isEqualTo(subtaskId);
-
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager2 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 =
-                            checkpointCommittableManager2.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager2.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint2
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(2);
-                    assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId())
-                            .isEqualTo(subtaskId);
+
+                    int i = 0;

Review Comment:
   Since the `expectedNumberOfPendingRequestsPerCommittable`  and `committableIterator` always have same size you can zip (guavas `Streams.zip`) them it also makes the loop like nice  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] kristoffSC commented on pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recovery more than 1 committable.

Posted by GitBox <gi...@apache.org>.
kristoffSC commented on PR #21101:
URL: https://github.com/apache/flink/pull/21101#issuecomment-1284459875

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] kristoffSC commented on a diff in pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

Posted by GitBox <gi...@apache.org>.
kristoffSC commented on code in PR #21101:
URL: https://github.com/apache/flink/pull/21101#discussion_r999185850


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java:
##########
@@ -134,55 +190,78 @@ void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManage
                 .isEqualTo(committableManager.getCheckpointId());
     }
 
+    /**
+     * @param assertMessageHeading prefix used for assertion fail message.
+     * @param subtaskId subtaskId to get {@link SubtaskCommittableManager} from {@link
+     *     CheckpointCommittableManagerImpl}
+     * @param expectedNumberOfSubtasks expected number of subtasks for {@link CommittableSummary}
+     * @param committableCollector collector to get {@link CheckpointCommittableManager}s from.
+     * @param expectedCommittableSize expected number of {@link CheckpointCommittableManager}.
+     * @param expectedNumberOfPendingRequestsPerCommittable expected number of pending request per
+     *     {@link SubtaskCommittableManager}.
+     */
     private void assertCommittableCollector(
             String assertMessageHeading,
             int subtaskId,
-            int numberOfSubtasks,
-            CommittableCollector<Integer> committableCollector) {
+            int expectedNumberOfSubtasks,
+            CommittableCollector<Integer> committableCollector,
+            int expectedCommittableSize,
+            Object[] expectedNumberOfPendingRequestsPerCommittable) {
 
         assertAll(
                 assertMessageHeading,
                 () -> {
                     final Collection<CheckpointCommittableManagerImpl<Integer>>
                             checkpointCommittables =
                                     committableCollector.getCheckpointCommittables();
-                    assertThat(checkpointCommittables).hasSize(2);
+                    assertThat(checkpointCommittables).hasSize(expectedCommittableSize);
 
                     final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator =
                             checkpointCommittables.iterator();
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager1 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 =
-                            checkpointCommittableManager1.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager1.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint1
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(1);
-                    assertThat(subtaskCommittableManagerCheckpoint1.getSubtaskId())
-                            .isEqualTo(subtaskId);
-
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager2 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 =
-                            checkpointCommittableManager2.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager2.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint2
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(2);
-                    assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId())
-                            .isEqualTo(subtaskId);
+
+                    int i = 0;

Review Comment:
   I have implemented your suggestion, 
   however Streams.zip requiters as a third argument a `BiFunction` that has to return something. 
   In my case it raterusn a Pair of `CheckpointCommittableManagerImpl` and `List<Integer>                                             expectedNumberOfPendingRequestsPerCommittable` elements from respective streams. 
   
   In my opinion it is slightly less readable then previous version with `while` loop but I'm ok with Streams.zip, since we don't have to maintain the index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] kristoffSC commented on a diff in pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

Posted by GitBox <gi...@apache.org>.
kristoffSC commented on code in PR #21101:
URL: https://github.com/apache/flink/pull/21101#discussion_r999410845


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java:
##########
@@ -134,55 +189,73 @@ void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManage
                 .isEqualTo(committableManager.getCheckpointId());
     }
 
+    /**
+     * @param assertMessageHeading prefix used for assertion fail message.
+     * @param subtaskId subtaskId to get {@link SubtaskCommittableManager} from {@link
+     *     CheckpointCommittableManagerImpl}
+     * @param expectedNumberOfSubtasks expected number of subtasks for {@link CommittableSummary}
+     * @param committableCollector collector to get {@link CheckpointCommittableManager}s from.
+     * @param expectedNumbersOfPendingRequestsPerCommittable every of the list element represents
+     *     expected number of pending request per {@link SubtaskCommittableManager}.
+     */
     private void assertCommittableCollector(
             String assertMessageHeading,
             int subtaskId,
-            int numberOfSubtasks,
-            CommittableCollector<Integer> committableCollector) {
+            int expectedNumberOfSubtasks,
+            CommittableCollector<Integer> committableCollector,
+            List<List<Integer>> expectedNumbersOfPendingRequestsPerCommittable) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #21101:
URL: https://github.com/apache/flink/pull/21101#discussion_r999357433


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java:
##########
@@ -134,55 +189,73 @@ void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManage
                 .isEqualTo(committableManager.getCheckpointId());
     }
 
+    /**
+     * @param assertMessageHeading prefix used for assertion fail message.
+     * @param subtaskId subtaskId to get {@link SubtaskCommittableManager} from {@link
+     *     CheckpointCommittableManagerImpl}
+     * @param expectedNumberOfSubtasks expected number of subtasks for {@link CommittableSummary}
+     * @param committableCollector collector to get {@link CheckpointCommittableManager}s from.
+     * @param expectedNumbersOfPendingRequestsPerCommittable every of the list element represents
+     *     expected number of pending request per {@link SubtaskCommittableManager}.
+     */
     private void assertCommittableCollector(
             String assertMessageHeading,
             int subtaskId,
-            int numberOfSubtasks,
-            CommittableCollector<Integer> committableCollector) {
+            int expectedNumberOfSubtasks,
+            CommittableCollector<Integer> committableCollector,
+            List<List<Integer>> expectedNumbersOfPendingRequestsPerCommittable) {

Review Comment:
   Nit: The variable name is misleading. Shouldn't this be the `committablesPerSubtaskPerCheckpoint` or something like this? `expectedNumbers` implies it is referring to the number but we are matching the content of the committable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] kristoffSC commented on a diff in pull request #21101: [Flink 29627][streaming] Fix duplicate key exception during recover more than 1 committable.

Posted by GitBox <gi...@apache.org>.
kristoffSC commented on code in PR #21101:
URL: https://github.com/apache/flink/pull/21101#discussion_r999174871


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java:
##########
@@ -134,55 +190,78 @@ void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManage
                 .isEqualTo(committableManager.getCheckpointId());
     }
 
+    /**
+     * @param assertMessageHeading prefix used for assertion fail message.
+     * @param subtaskId subtaskId to get {@link SubtaskCommittableManager} from {@link
+     *     CheckpointCommittableManagerImpl}
+     * @param expectedNumberOfSubtasks expected number of subtasks for {@link CommittableSummary}
+     * @param committableCollector collector to get {@link CheckpointCommittableManager}s from.
+     * @param expectedCommittableSize expected number of {@link CheckpointCommittableManager}.
+     * @param expectedNumberOfPendingRequestsPerCommittable expected number of pending request per
+     *     {@link SubtaskCommittableManager}.
+     */
     private void assertCommittableCollector(
             String assertMessageHeading,
             int subtaskId,
-            int numberOfSubtasks,
-            CommittableCollector<Integer> committableCollector) {
+            int expectedNumberOfSubtasks,
+            CommittableCollector<Integer> committableCollector,
+            int expectedCommittableSize,
+            Object[] expectedNumberOfPendingRequestsPerCommittable) {
 
         assertAll(
                 assertMessageHeading,
                 () -> {
                     final Collection<CheckpointCommittableManagerImpl<Integer>>
                             checkpointCommittables =
                                     committableCollector.getCheckpointCommittables();
-                    assertThat(checkpointCommittables).hasSize(2);
+                    assertThat(checkpointCommittables).hasSize(expectedCommittableSize);
 
                     final Iterator<CheckpointCommittableManagerImpl<Integer>> committablesIterator =
                             checkpointCommittables.iterator();
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager1 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint1 =
-                            checkpointCommittableManager1.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager1.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint1
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(1);
-                    assertThat(subtaskCommittableManagerCheckpoint1.getSubtaskId())
-                            .isEqualTo(subtaskId);
-
-                    final CheckpointCommittableManagerImpl<Integer> checkpointCommittableManager2 =
-                            committablesIterator.next();
-                    final SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint2 =
-                            checkpointCommittableManager2.getSubtaskCommittableManager(subtaskId);
-
-                    SinkV2Assertions.assertThat(checkpointCommittableManager2.getSummary())
-                            .hasSubtaskId(subtaskId)
-                            .hasNumberOfSubtasks(numberOfSubtasks);
-                    assertThat(
-                                    subtaskCommittableManagerCheckpoint2
-                                            .getPendingRequests()
-                                            .map(CommitRequestImpl::getCommittable)
-                                            .collect(Collectors.toList()))
-                            .containsExactly(2);
-                    assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId())
-                            .isEqualTo(subtaskId);
+
+                    int i = 0;
+                    while (committablesIterator.hasNext()) {
+                        final CheckpointCommittableManagerImpl<Integer>
+                                checkpointCommittableManager = committablesIterator.next();
+
+                        final SubtaskCommittableManager<Integer> subtaskCommittableManager =
+                                checkpointCommittableManager.getSubtaskCommittableManager(
+                                        subtaskId);
+
+                        SinkV2Assertions.assertThat(checkpointCommittableManager.getSummary())
+                                .hasSubtaskId(subtaskId)
+                                .hasNumberOfSubtasks(expectedNumberOfSubtasks);
+
+                        assertPendingRequests(
+                                subtaskCommittableManager,
+                                expectedNumberOfPendingRequestsPerCommittable[i++]);
+
+                        assertThat(subtaskCommittableManager.getSubtaskId()).isEqualTo(subtaskId);
+                    }
                 });
     }
+
+    private void assertPendingRequests(
+            SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint,
+            Object expectedPendingRequestCount) {
+
+        if (expectedPendingRequestCount instanceof Integer) {
+            assertThat(
+                            subtaskCommittableManagerCheckpoint
+                                    .getPendingRequests()
+                                    .map(CommitRequestImpl::getCommittable)
+                                    .collect(Collectors.toList()))
+                    .containsExactly((Integer) expectedPendingRequestCount);
+        } else if (expectedPendingRequestCount instanceof Integer[]) {

Review Comment:
   Thanks for this hint, I was looking for some alternative method in the API but I must have missed that one, I was focused to find the one that checks also the order as the `containsExactly` does.
   
   I've implemented your proposition which made `assertPendingRequests` method much simpler. 
   However `assertCommittableCollector` now has to accept `List<List<Integer>> expectedNumbersOfPendingRequestsPerCommittable` as an argument.
   
   The reason for this is that for `testCommittableCollectorV2SerDe` test we will have Two committable Managers with one committable each, where for `testCommittablesForSameSubtaskIdV2SerDe`  test we will have one committable manager but with two committables after recovery.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org