You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/29 08:55:12 UTC

[GitHub] [flink] zentol opened a new pull request, #21416: [FLINK-30202][tests] Do not assert on checkpointId

zentol opened a new pull request, #21416:
URL: https://github.com/apache/flink/pull/21416

   Capturing the checkpointId for a generated record in a subsequent map function is impossible since the notifyCheckpointComplete notification may arrive at any time (or not at all). Instead just assert that each subtask got exactly as many records as expected, which can only happen (reliably) if the rate-limiting works as expected.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1038124348


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        reader.isAvailable().get();

Review Comment:
   yes and no.
   
   > we're not looping over isAvailable anymore if reaching the end of data
   
   Indeed, but the devil is in the details. Whether we reached the end of data isn't based on the input being exhausted, but on the source returning `END_OF_INPUT` on a call to pollNext().
   And IteratorSourceReaderBase#pollNext (which the data generator source uses internally) doesn't return end-of-input when the input is exhausted, but when _another_ call was made to pollNext while the input was _already_ exhausted. (Which is allowed by the source reader interface)
   
   That's why we need this additional call; the last call to pollNext within the loop does not `END_OF_INPUT` (as you expected I suppose) but `NOTHING_AVAILABLE`, and so we have to call it again.
   
   It's all a bit finicky because the source reader api isn't intuitive in the slightest.
   
   (fixing this in the IteratorSourceReaderBase isn't trivial :/)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1330691876

   There is a `RateLimitedSourceReaderITCase`.
   
   I'll try finding another way to test this; my current thinking goes towards using a FlatMapFunction that stops emitting values after the first call to `snapshotState`, so it _should_ truly only emit the values of the first checkpoint. but that's so far also not working; to many values get emitted...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1035964091


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -195,40 +195,31 @@ void testGatedRateLimiter() throws Exception {
 
         final DataStreamSource<Long> streamSource =
                 env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator");
-        final DataStream<Tuple2<Integer, Long>> map =
-                streamSource.map(new SubtaskAndCheckpointMapper());
-        final List<Tuple2<Integer, Long>> results = map.executeAndCollect(1000);
-
-        final Map<Tuple2<Integer, Long>, Integer> collect =
-                results.stream()
-                        .collect(
-                                Collectors.groupingBy(
-                                        x -> (new Tuple2<>(x.f0, x.f1)), summingInt(x -> 1)));
-        for (Map.Entry<Tuple2<Integer, Long>, Integer> entry : collect.entrySet()) {
-            assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle);
-        }
+        final DataStream<Long> map = streamSource.flatMap(new FirstCheckpointFilter());
+        final List<Long> results = map.executeAndCollect(1000);
+
+        assertThat(results).hasSize(capacityPerCycle);
     }
 
-    private static class SubtaskAndCheckpointMapper
-            extends RichMapFunction<Long, Tuple2<Integer, Long>> implements CheckpointListener {
+    private static class FirstCheckpointFilter
+            implements FlatMapFunction<Long, Long>, CheckpointedFunction {
 
-        private long checkpointId = 0;
-        private int subtaskIndex;
+        private volatile boolean firstCheckpoint = true;
 
         @Override
-        public void open(Configuration parameters) {
-            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+        public void flatMap(Long value, Collector<Long> out) throws Exception {
+            if (firstCheckpoint) {
+                out.collect(value);
+            }
         }
 
         @Override
-        public Tuple2<Integer, Long> map(Long value) {
-            return new Tuple2<>(subtaskIndex, checkpointId);
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {

Review Comment:
   ok, now I know what confused me. I was worried about the individual records not being included in the most-recent checkpoint. But that's actually not relevant for the rate-limitation. :facepalm: The only thing we care about is resetting the counter that limits the amount of data being produced. Therefore, resetting it when a new checkpoint is triggered is reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1035881699


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -195,40 +195,31 @@ void testGatedRateLimiter() throws Exception {
 
         final DataStreamSource<Long> streamSource =
                 env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator");
-        final DataStream<Tuple2<Integer, Long>> map =
-                streamSource.map(new SubtaskAndCheckpointMapper());
-        final List<Tuple2<Integer, Long>> results = map.executeAndCollect(1000);
-
-        final Map<Tuple2<Integer, Long>, Integer> collect =
-                results.stream()
-                        .collect(
-                                Collectors.groupingBy(
-                                        x -> (new Tuple2<>(x.f0, x.f1)), summingInt(x -> 1)));
-        for (Map.Entry<Tuple2<Integer, Long>, Integer> entry : collect.entrySet()) {
-            assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle);
-        }
+        final DataStream<Long> map = streamSource.flatMap(new FirstCheckpointFilter());
+        final List<Long> results = map.executeAndCollect(1000);
+
+        assertThat(results).hasSize(capacityPerCycle);
     }
 
-    private static class SubtaskAndCheckpointMapper
-            extends RichMapFunction<Long, Tuple2<Integer, Long>> implements CheckpointListener {
+    private static class FirstCheckpointFilter
+            implements FlatMapFunction<Long, Long>, CheckpointedFunction {
 
-        private long checkpointId = 0;
-        private int subtaskIndex;
+        private volatile boolean firstCheckpoint = true;
 
         @Override
-        public void open(Configuration parameters) {
-            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+        public void flatMap(Long value, Collector<Long> out) throws Exception {
+            if (firstCheckpoint) {
+                out.collect(value);
+            }
         }
 
         @Override
-        public Tuple2<Integer, Long> map(Long value) {
-            return new Tuple2<>(subtaskIndex, checkpointId);
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {

Review Comment:
   What we want to happen is that at most X records are emitted per checkpoint.
   `emit|emit|snapshot|emit|emit|snapshot`
   
   In the ideal world, this happens:
   `emit|emit|snapshot|notify|emit|emit|...`
   
   But since `notifyCheckpointComplete` can arrive at arbitrary times, or not at all, there are some edge-cases.
   We may not emit anything for one checkpoint if the rpc is lost:
   `emit|emit|snapshot|snapshot|notify|emit|emit|...`
   We may emit more records per checkpoint if the rpc is late:
   `emit|emit|snapshot|snapshot|notify|emit|notify|emit|emit|...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1330673297

   yeah, that's something I was wondering as well. But the behavior of the `RateLimitedStrategy` doesn't necessarily need to be tested here, I guess. It feels like we're missing a `RateLimitedSourceReaderTest` for that kind of functionality. :thinking: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1035801369


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -195,40 +195,31 @@ void testGatedRateLimiter() throws Exception {
 
         final DataStreamSource<Long> streamSource =
                 env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator");
-        final DataStream<Tuple2<Integer, Long>> map =
-                streamSource.map(new SubtaskAndCheckpointMapper());
-        final List<Tuple2<Integer, Long>> results = map.executeAndCollect(1000);
-
-        final Map<Tuple2<Integer, Long>, Integer> collect =
-                results.stream()
-                        .collect(
-                                Collectors.groupingBy(
-                                        x -> (new Tuple2<>(x.f0, x.f1)), summingInt(x -> 1)));
-        for (Map.Entry<Tuple2<Integer, Long>, Integer> entry : collect.entrySet()) {
-            assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle);
-        }
+        final DataStream<Long> map = streamSource.flatMap(new FirstCheckpointFilter());
+        final List<Long> results = map.executeAndCollect(1000);
+
+        assertThat(results).hasSize(capacityPerCycle);
     }
 
-    private static class SubtaskAndCheckpointMapper
-            extends RichMapFunction<Long, Tuple2<Integer, Long>> implements CheckpointListener {
+    private static class FirstCheckpointFilter
+            implements FlatMapFunction<Long, Long>, CheckpointedFunction {
 
-        private long checkpointId = 0;
-        private int subtaskIndex;
+        private volatile boolean firstCheckpoint = true;
 
         @Override
-        public void open(Configuration parameters) {
-            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+        public void flatMap(Long value, Collector<Long> out) throws Exception {
+            if (firstCheckpoint) {
+                out.collect(value);
+            }
         }
 
         @Override
-        public Tuple2<Integer, Long> map(Long value) {
-            return new Tuple2<>(subtaskIndex, checkpointId);
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {

Review Comment:
   > Additionally, the RateLimitedSourceReader may reset the checkpoint limit at the wrong time. We don't really that to happen when the checkpoint is complete, but rather when the next checkpoint starts (== when snapshotState was called).
   
   I don't understand that reasoning: Why do we stop processing the record when a new checkpoint is triggered rather than when the actual checkpoint is committed? :thinking: Could you elaborate here a bit more?



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -184,8 +183,9 @@ void testGatedRateLimiter() throws Exception {
 
         final GeneratorFunction<Long, Long> generatorFunction = index -> 1L;
 
+        int numCycles = 3;
         // Allow each subtask to produce at least 3 cycles, gated by checkpoints
-        int count = capacityPerCycle * 3;
+        int count = capacityPerCycle * numCycles;

Review Comment:
   nit: The `numCycles = 3` is not really necessary anymore, is it? We're stopping the data generation now after reaching the first checkpoint. I guess, `capacityPerSubtaskPerCycle` and `capacityPerCycle` could be replaced by variables that reflect the new test setup in a better way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1038131318


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);

Review Comment:
   We can do that, yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol merged PR #21416:
URL: https://github.com/apache/flink/pull/21416


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1037924554


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);

Review Comment:
   ```suggestion
               assertThat(reader.pollNext(out))
                       .as(
                               "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable")
                       .isEqualTo(InputStatus.NOTHING_AVAILABLE);
               for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
                   reader.isAvailable().get();
                   assertThat(reader.pollNext(out))
                           .as(
                                   "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable")
                           .isEqualTo(InputStatus.NOTHING_AVAILABLE);
   ```
   I feel like we should also add assertion here with a description to reason why the `NOTHING_AVAILABLE` is expected. Otherwise, it's kind of unintuitive that `NOTHING_AVAILABLE` is returned even though there is actually data available. WDYT?



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);

Review Comment:
   Considering the test's name, should we also assert here that the right splits are returned in each cycle? :thinking: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        reader.isAvailable().get();

Review Comment:
   ```suggestion
   ```
   Is this really a requirement? Looking at [StreamTask#processInput:561](https://github.com/apache/flink/blob/5601da7cc251eac479fe24167c0f58dbd963072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L561) shows that we're not looping over `isAvailable` anymore if reaching the end of data. Also from a logical standpoint, I would expect `isAvailable` to complete if we reach the end of the data stream. :thinking: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();

Review Comment:
   ```suggestion
                   assertThat(reader.isAvailable())
                           .as(
                                   "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.")
                           .isCompleted();
   ```
   nit: I would use `assertj` functionality. `get()` might imply that we're waiting for concurrent functionality to finish. We don't have concurrent behavior here, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1332429814

   Another test relied on the previous (bugged) behavior :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1332369793

   > I think it would make sense to create a new Jira ticket for that. WDYT?
   
   yes, it's a separate issue (and for the one in this ticket we at least already have a test that shows the issue).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1038158237


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +100,48 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < numCycles; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            assertThat(reader.pollNext(out))
+                    .as(
+                            "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable")
+                    .isSameAs(InputStatus.NOTHING_AVAILABLE);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                assertThat(reader.isAvailable())
+                        .as(
+                                "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.")

Review Comment:
   ```suggestion
                                   "There should be always data available because the test utilizes no rate-limiting strategy and splits are provided.")
   ```



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +100,48 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < numCycles; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            assertThat(reader.pollNext(out))
+                    .as(
+                            "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable")
+                    .isSameAs(InputStatus.NOTHING_AVAILABLE);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                assertThat(reader.isAvailable())
+                        .as(
+                                "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.")
+                        .isCompleted();
+                // this never returns END_OF_INPUT because IteratorSourceReaderBase#pollNext does
+                // not immediately return END_OF_INPUT when the input is exhausted
+                assertThat(reader.pollNext(out))
+                        .as(
+                                "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable")
+                        .isSameAs(InputStatus.NOTHING_AVAILABLE);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
+            // first cycle partially consumes the first split
+            // second cycle consumes the remaining first split and partially consumes the second
+            // third cycle consumes remaining second split
+            assertThat(splits).hasSize(numCycles - cycle - 1);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        // we need to go again through isAvailable because IteratorSourceReaderBase#pollNext does
+        // not immediately return END_OF_INPUT when the input is exhausted
+        assertThat(reader.isAvailable())
+                .as(
+                        "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.")

Review Comment:
   ```suggestion
                           "There should be always data available because the test utilizes no rate-limiting strategy and splits are provided.")
   ```



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        reader.isAvailable().get();

Review Comment:
   ah true, it relies on [IteratorSourceReaderBase#noMoreSplits](https://github.com/apache/flink/blob/a5667e82e25cb87dc5523b82b08aec3e1408e9c6/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L136). Thanks for clarification. :+1: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -184,8 +183,9 @@ void testGatedRateLimiter() throws Exception {
 
         final GeneratorFunction<Long, Long> generatorFunction = index -> 1L;
 
+        int numCycles = 3;
         // Allow each subtask to produce at least 3 cycles, gated by checkpoints
-        int count = capacityPerCycle * 3;
+        int count = capacityPerCycle * numCycles;

Review Comment:
   fyi, in case you have overlooked it. 8)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1330298212

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b372c77de18aef2ceea6b00ad64a3278fd91a8d1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b372c77de18aef2ceea6b00ad64a3278fd91a8d1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b372c77de18aef2ceea6b00ad64a3278fd91a8d1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1330711423

   I think we actually found a bug.
   
   If a split was already assigned to a reader, then the first call to `SourceReader#pollNext` (which happens _before_ `SourceReader#isAvailable`) circumvents rate-limiting.
   We need to force this first call to also go through `isAvailable`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1330669367

   > You're saying that we would introduce a test instability here if the RateLimitedStrategy wouldn't perform as expected?
   
   Yes. At least that was the idea. Now I'm not so sure anymore whether this makes sense. Given that the limit the count we invariably end up with `capacityPerCycle * numCycles` elements, regardless of whether rate-limiting was applied or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #21416:
URL: https://github.com/apache/flink/pull/21416#issuecomment-1330713451

   Additionally, the RateLimitedSourceReader may reset the checkpoint limit at the wrong time. We don't really that to happen when the checkpoint is complete, but rather when the next checkpoint starts (== when snapshotState was called).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org