You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vahmed-hamdy (via GitHub)" <gi...@apache.org> on 2024/04/18 08:06:23 UTC

Re: [PR] [FLINK-35115][Connectors/Kinesis] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]

vahmed-hamdy commented on code in PR #138:
URL: https://github.com/apache/flink-connector-aws/pull/138#discussion_r1570201177


##########
flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##########
@@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() throws Exception {
         }
     }
 
+    @Test
+    public void testSnapshotStateDuringStopWithSavepoint() throws Exception {
+
+        // ----------------------------------------------------------------------
+        // setup config, initial state and expected state snapshot
+        // ----------------------------------------------------------------------
+        Properties config = TestUtils.getStandardProperties();
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
+        initialState.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                .generateFromShardOrder(0)))),
+                        new SequenceNumber("11")));
+
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot1 =
+                new ArrayList<>(1);
+        expectedStateSnapshot1.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                .generateFromShardOrder(0)))),
+                        new SequenceNumber("12")));
+        ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot2 =
+                new ArrayList<>(1);
+        expectedStateSnapshot2.add(
+                Tuple2.of(
+                        KinesisDataFetcher.convertToStreamShardMetadata(
+                                new StreamShardHandle(
+                                        "fakeStream1",
+                                        new Shard()
+                                                .withShardId(
+                                                        KinesisShardIdGenerator
+                                                                .generateFromShardOrder(0)))),
+                        new SequenceNumber("13")));
+
+        // ----------------------------------------------------------------------
+        // mock operator state backend and initial state for initializeState()
+        // ----------------------------------------------------------------------
+
+        TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState =
+                new TestingListState<>();
+        for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
+            listState.add(state);
+        }
+
+        OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);

Review Comment:
   It is against [coding standards to use mockito](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations), aware that the standard is already broken in this test suite but I believe we shouldn't add more debt.
   Can we try using an [existing test util instead](https://github.com/apache/flink/blob/43a3d50ce3982b9abf04b81407fed46c5c25f819/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java#L34)
   
   Also we can extend the existing `TestableFlinkKinesisConsumer` hierarchy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org