You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/03 00:03:30 UTC

[GitHub] [iceberg] kbendick commented on a diff in pull request #4943: Flink: add an option to set monitoring snapshot number

kbendick commented on code in PR #4943:
URL: https://github.com/apache/iceberg/pull/4943#discussion_r888501221


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java:
##########
@@ -72,6 +72,9 @@ class ScanContext implements Serializable {
   private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS =
       ConfigOptions.key("include-column-stats").booleanType().defaultValue(false);
 
+  private static final ConfigOption<Integer> MONITOR_SNAPSHOT_NUMBER =
+      ConfigOptions.key("monitor-snapshot-number").intType().defaultValue(Integer.MAX_VALUE);

Review Comment:
   Nit: Is there perhaps a more descriptive name for this? This is the number of snapshots to consider within each monitor interval loop, correct?
   
   Kafka has more or less the same concept in its `max.poll.interval` and other consumer related configurations properties around polling.
   
   Maybe we can take some inspiration from that naming. Thinking off the top of my head, but maybe `monitor-max-snapshots-per-interval` or something like that would be more instructive to the user? Given we already have `monitor-interval` as a configuration property as well.
   
   cc @stevenzwu for your thoughts as well



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -208,6 +208,37 @@ public void testCheckpointRestore() throws Exception {
     }
   }
 
+  @Test
+  public void testConsumeWithDifferentMonitorSnapshotNumbers() throws Exception {
+    List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+
+    for (int monitorNumber = 1; monitorNumber < 11; monitorNumber = monitorNumber + 1) {
+      ScanContext scanContext = ScanContext.builder()
+          .monitorInterval(Duration.ofMillis(100))
+          .monitorSnapshotNumber(monitorNumber)
+          .build();

Review Comment:
   Are there any assertions we can apply (that wouldn't be too flakey) for this whole outer loop? Seems like we should have 10 splits total, correct?
   
   Also nit on starting the for loop at 0 vs 1 if possible.



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java:
##########
@@ -155,6 +162,12 @@ private void monitorAndForwardSplits() {
       if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
         newScanContext = scanContext.copyWithSnapshotId(snapshotId);
       } else {
+        List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(table, lastSnapshotId, snapshot.snapshotId());
+        if (snapshotIds.size() < scanContext.monitorSnapshotNumber()) {
+          snapshotId = snapshot.snapshotId();
+        } else {
+          snapshotId = snapshotIds.get(snapshotIds.size() - scanContext.monitorSnapshotNumber());
+        }

Review Comment:
   Can you elaborate on this logic here / walk me through an example case where `snapshotId` needs to be determined because it's equal to (or possibly greater than?) the `monitorSnapshotNumber`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org