You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/30 10:40:00 UTC

[GitHub] [flink] XComp commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1035801369


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -195,40 +195,31 @@ void testGatedRateLimiter() throws Exception {
 
         final DataStreamSource<Long> streamSource =
                 env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator");
-        final DataStream<Tuple2<Integer, Long>> map =
-                streamSource.map(new SubtaskAndCheckpointMapper());
-        final List<Tuple2<Integer, Long>> results = map.executeAndCollect(1000);
-
-        final Map<Tuple2<Integer, Long>, Integer> collect =
-                results.stream()
-                        .collect(
-                                Collectors.groupingBy(
-                                        x -> (new Tuple2<>(x.f0, x.f1)), summingInt(x -> 1)));
-        for (Map.Entry<Tuple2<Integer, Long>, Integer> entry : collect.entrySet()) {
-            assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle);
-        }
+        final DataStream<Long> map = streamSource.flatMap(new FirstCheckpointFilter());
+        final List<Long> results = map.executeAndCollect(1000);
+
+        assertThat(results).hasSize(capacityPerCycle);
     }
 
-    private static class SubtaskAndCheckpointMapper
-            extends RichMapFunction<Long, Tuple2<Integer, Long>> implements CheckpointListener {
+    private static class FirstCheckpointFilter
+            implements FlatMapFunction<Long, Long>, CheckpointedFunction {
 
-        private long checkpointId = 0;
-        private int subtaskIndex;
+        private volatile boolean firstCheckpoint = true;
 
         @Override
-        public void open(Configuration parameters) {
-            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+        public void flatMap(Long value, Collector<Long> out) throws Exception {
+            if (firstCheckpoint) {
+                out.collect(value);
+            }
         }
 
         @Override
-        public Tuple2<Integer, Long> map(Long value) {
-            return new Tuple2<>(subtaskIndex, checkpointId);
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {

Review Comment:
   > Additionally, the RateLimitedSourceReader may reset the checkpoint limit at the wrong time. We don't really that to happen when the checkpoint is complete, but rather when the next checkpoint starts (== when snapshotState was called).
   
   I don't understand that reasoning: Why do we stop processing the record when a new checkpoint is triggered rather than when the actual checkpoint is committed? :thinking: Could you elaborate here a bit more?



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -184,8 +183,9 @@ void testGatedRateLimiter() throws Exception {
 
         final GeneratorFunction<Long, Long> generatorFunction = index -> 1L;
 
+        int numCycles = 3;
         // Allow each subtask to produce at least 3 cycles, gated by checkpoints
-        int count = capacityPerCycle * 3;
+        int count = capacityPerCycle * numCycles;

Review Comment:
   nit: The `numCycles = 3` is not really necessary anymore, is it? We're stopping the data generation now after reaching the first checkpoint. I guess, `capacityPerSubtaskPerCycle` and `capacityPerCycle` could be replaced by variables that reflect the new test setup in a better way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org