You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/02 09:25:35 UTC

[GitHub] [flink] XComp commented on a diff in pull request #21416: [FLINK-30202][tests] Do not assert on checkpointId

XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1037924554


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);

Review Comment:
   ```suggestion
               assertThat(reader.pollNext(out))
                       .as(
                               "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable")
                       .isEqualTo(InputStatus.NOTHING_AVAILABLE);
               for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
                   reader.isAvailable().get();
                   assertThat(reader.pollNext(out))
                           .as(
                                   "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable")
                           .isEqualTo(InputStatus.NOTHING_AVAILABLE);
   ```
   I feel like we should also add assertion here with a description to reason why the `NOTHING_AVAILABLE` is expected. Otherwise, it's kind of unintuitive that `NOTHING_AVAILABLE` is returned even though there is actually data available. WDYT?



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);

Review Comment:
   Considering the test's name, should we also assert here that the right splits are returned in each cycle? :thinking: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        reader.isAvailable().get();

Review Comment:
   ```suggestion
   ```
   Is this really a requirement? Looking at [StreamTask#processInput:561](https://github.com/apache/flink/blob/5601da7cc251eac479fe24167c0f58dbd963072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L561) shows that we're not looping over `isAvailable` anymore if reaching the end of data. Also from a logical standpoint, I would expect `isAvailable` to complete if we reach the end of the data stream. :thinking: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) {
+                reader.isAvailable().get();

Review Comment:
   ```suggestion
                   assertThat(reader.isAvailable())
                           .as(
                                   "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.")
                           .isCompleted();
   ```
   nit: I would use `assertj` functionality. `get()` might imply that we're waiting for concurrent functionality to finish. We don't have concurrent behavior here, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org