You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/28 19:32:29 UTC

[GitHub] [iceberg] stevenzwu opened a new pull request, #6299: Flink: support split discovery throttling for streaming read in case …

stevenzwu opened a new pull request, #6299:
URL: https://github.com/apache/iceberg/pull/6299

   …of lagging behind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035617455


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -147,6 +148,85 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
         .contains(splits.get(0));
   }
 
+  @Test
+  public void testThrottlingDiscovery() throws Exception {
+    // create 10 splits
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 1);
+
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // discover one snapshot at a time
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register reader-2, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2, new SplitRequestEvent());
+
+    // add splits[0] to the planner for next discovery
+    splitPlanner.addSplits(Arrays.asList(splits.get(0)));
+    enumeratorContext.triggerAllActions();
+
+    // because discovered split was assigned to reader, pending splits should be empty
+    Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size());
+    // split assignment to reader-2 should contain splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // add the remaining 9 splits (one for every snapshot)
+    // run discovery cycles while reader-2 still processing the splits[0]
+    for (int i = 1; i < 10; ++i) {
+      splitPlanner.addSplits(Arrays.asList(splits.get(i)));
+      enumeratorContext.triggerAllActions();
+    }
+
+    // can only discover up to 3 snapshots/splits
+    Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[0]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned splits[1] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // run 3 more split discovery cycles
+    for (int i = 0; i < 3; ++i) {
+      enumeratorContext.triggerAllActions();
+    }
+
+    // no more splits are discovered due to throttling
+    Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size());
+    // split assignment to reader-2 should still be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[1]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned new splits[2] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 3)
+    Assert.assertEquals(
+        splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());

Review Comment:
   Seems like a duplicated check - maybe use a method for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1037872916


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -147,6 +148,85 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
         .contains(splits.get(0));
   }
 
+  @Test
+  public void testThrottlingDiscovery() throws Exception {
+    // create 10 splits
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 1);
+
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // discover one snapshot at a time
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register reader-2, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2, new SplitRequestEvent());
+
+    // add splits[0] to the planner for next discovery
+    splitPlanner.addSplits(Arrays.asList(splits.get(0)));
+    enumeratorContext.triggerAllActions();
+
+    // because discovered split was assigned to reader, pending splits should be empty
+    Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size());
+    // split assignment to reader-2 should contain splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // add the remaining 9 splits (one for every snapshot)
+    // run discovery cycles while reader-2 still processing the splits[0]
+    for (int i = 1; i < 10; ++i) {
+      splitPlanner.addSplits(Arrays.asList(splits.get(i)));
+      enumeratorContext.triggerAllActions();
+    }
+
+    // can only discover up to 3 snapshots/splits
+    Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[0]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned splits[1] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // run 3 more split discovery cycles
+    for (int i = 0; i < 3; ++i) {
+      enumeratorContext.triggerAllActions();
+    }
+
+    // no more splits are discovered due to throttling
+    Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size());
+    // split assignment to reader-2 should still be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[1]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned new splits[2] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 3)
+    Assert.assertEquals(
+        splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());

Review Comment:
   Same here - no strong feelings about it on a second glance -, but my first impression was that there seems to me plenty of occurrences of this block, just with different parameters:
   ```
       Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
       Assert.assertEquals(
           splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036310449


##########
bundled-guava/src/main/java/org/apache/iceberg/GuavaClasses.java:
##########
@@ -71,6 +72,7 @@ public class GuavaClasses {
     Splitter.class.getName();
     Throwables.class.getName();
     BiMap.class.getName();
+    EvictingQueue.class.getName();

Review Comment:
   we can avoid it by using an array and a round-robin cursor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036434639


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   @pvary you are correct here. This is the downside of not checkpointing enumeration history. After checkpoint recovery, enumerator can discover more splits because totalSplitCountFromRecentDiscovery is 0 after recovery. There are two scenarios.
   1. job was stuck in a failure/restart loop. this is not a concern as there is no successful checkpoint. newly discovered splits aren't saved. the tracked splits won't grow forever.
   2. here is a pathological scenario. Job was in a restart loop with successful checkpoints (like 1 or 2) btw restart. This would essentially bypass the throttling check. Potentially the tracked splits can keep growing. That is the small concern I has regarding not checkpointing. The concern is small, because this scenario is very unusual. Hence, I didn't implement the checkpointing of enumeration history (for throttling purpose).
   
   On the other hand, it's not too difficult to add the checkpointing of enumeration history for completeness or ability to handle the above edge case. I am open to add it if desired.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035587241


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -78,6 +79,22 @@ public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosi
     }
   }
 
+  private Snapshot toSnapshotInclusive(
+      Long lastConsumedSnapshotId, Snapshot currentSnapshot, int maxPlanningSnapshotCount) {
+    // snapshots are in reverse order (latest snapshot first)
+    List<Snapshot> snapshots =
+        Lists.newArrayList(
+            SnapshotUtil.ancestorsBetween(
+                table, currentSnapshot.snapshotId(), lastConsumedSnapshotId));
+    if (snapshots.size() <= maxPlanningSnapshotCount) {
+      return currentSnapshot;
+    } else {
+      // Because snapshots are in reverse order of commit history, this index returns
+      // the max allowed number of snapshots from the lastConsumedSnapshotId.
+      return snapshots.get(snapshots.size() - maxPlanningSnapshotCount);
+    }
+  }
+
   /** Discover incremental changes between {@code lastPosition} and current table snapshot */

Review Comment:
   Nit: Maybe update the comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
hililiwei commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035451563


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   I prefer the current no-checkpoint approach. To some extent, an empty history seems to be beneficial for quickly processing the backlog of data when the application restarts?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035617455


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -147,6 +148,85 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
         .contains(splits.get(0));
   }
 
+  @Test
+  public void testThrottlingDiscovery() throws Exception {
+    // create 10 splits
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 1);
+
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // discover one snapshot at a time
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register reader-2, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2, new SplitRequestEvent());
+
+    // add splits[0] to the planner for next discovery
+    splitPlanner.addSplits(Arrays.asList(splits.get(0)));
+    enumeratorContext.triggerAllActions();
+
+    // because discovered split was assigned to reader, pending splits should be empty
+    Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size());
+    // split assignment to reader-2 should contain splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // add the remaining 9 splits (one for every snapshot)
+    // run discovery cycles while reader-2 still processing the splits[0]
+    for (int i = 1; i < 10; ++i) {
+      splitPlanner.addSplits(Arrays.asList(splits.get(i)));
+      enumeratorContext.triggerAllActions();
+    }
+
+    // can only discover up to 3 snapshots/splits
+    Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[0]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned splits[1] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // run 3 more split discovery cycles
+    for (int i = 0; i < 3; ++i) {
+      enumeratorContext.triggerAllActions();
+    }
+
+    // no more splits are discovered due to throttling
+    Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size());
+    // split assignment to reader-2 should still be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[1]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned new splits[2] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 3)
+    Assert.assertEquals(
+        splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());

Review Comment:
   Seems like a duplicated check - maybe use a method for this with the appropriate parameters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035581797


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -92,34 +103,52 @@ public IcebergEnumeratorState snapshotState(long checkpointId) {
 
   /** This method is executed in an IO thread pool. */
   private ContinuousEnumerationResult discoverSplits() {
-    return splitPlanner.planSplits(enumeratorPosition.get());
+    int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+    if (enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+      // If the assigner already has many pending splits, it is better to pause split discovery.
+      // Otherwise, eagerly discovering more splits will just increase assigner memory footprint
+      // and enumerator checkpoint state size.
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending splits: {}",
+          pendingSplitCountFromAssigner);
+      return new ContinuousEnumerationResult(
+          Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
+    } else {
+      return splitPlanner.planSplits(enumeratorPosition.get());
+    }
   }
 
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
-        // thread pool.
-        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the
-        // thread
-        // pool is busy and multiple discovery actions are executed concurrently. Discovery result
-        // should
-        // only be accepted if the starting position matches the enumerator position (like
-        // compare-and-swap).
+        // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
+        // tests) or the thread pool is busy and multiple discovery actions are executed
+        // concurrently. Discovery result should only be accepted if the starting position
+        // matches the enumerator position (like compare-and-swap).
         LOG.info(
             "Skip {} discovered splits because the scan starting position doesn't match "
                 + "the current enumerator position: enumerator position = {}, scan starting position = {}",
             result.splits().size(),
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
-        assigner.onDiscoveredSplits(result.splits());
-        LOG.info(
-            "Added {} splits discovered between ({}, {}] to the assigner",
-            result.splits().size(),
-            result.fromPosition(),
-            result.toPosition());
+        // Sometimes, enumeration may yield no splits for a few reasons.
+        // - upstream paused or delayed streaming writes to the Iceberg table.
+        // - enumeration frequency is higher than the upstream write frequency.
+        if (!result.splits().isEmpty()) {
+          assigner.onDiscoveredSplits(result.splits());
+          // EnumerationHistory makes throttling decision on split discovery
+          // based on the total number of splits discovered in the last a few cycles.
+          // Only update enumeration history when there are some discovered splits.
+          enumerationHistory.add(result.splits().size());

Review Comment:
   Question: Do I understand correctly that we do not have to address concurrent calls to `processDiscoveredSplits` because Flink makes sure that these calls are serialized. So there is no race condition between:
   - `assigner.onDiscoveredSplits`  - this one updates `assigner.pendingSplits`
   - `enumerationHistory.add`
   - `enumeratorPosition.set`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035614823


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -507,4 +507,71 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception {
       lastPosition = verifyOneCycle(splitPlanner, lastPosition);
     }
   }
+
+  @Test
+  public void testMaxPlanningSnapshotCount() throws Exception {
+    appendTwoSnapshots();
+    // append 3 more snapshots
+    for (int i = 2; i < 5; ++i) {
+      appendSnapshot(i, 2);
+    }
+
+    ScanContext scanContext =
+        ScanContext.builder()
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // limit to 1 snapshot per discovery
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ContinuousSplitPlannerImpl splitPlanner =
+        new ContinuousSplitPlannerImpl(tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    Assert.assertNull(initialResult.fromPosition());
+    // For inclusive behavior, the initial result should point to snapshot1's parent,
+    // which leads to null snapshotId and snapshotTimestampMs.
+    Assert.assertNull(initialResult.toPosition().snapshotId());
+    Assert.assertNull(initialResult.toPosition().snapshotTimestampMs());
+    Assert.assertEquals(0, initialResult.splits().size());
+
+    ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+    Assert.assertNull(secondResult.fromPosition().snapshotId());
+    Assert.assertNull(secondResult.fromPosition().snapshotTimestampMs());
+    Assert.assertEquals(snapshot1.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(
+        snapshot1.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit splitSecond = Iterables.getOnlyElement(secondResult.splits());
+    Assert.assertEquals(1, splitSecond.task().files().size());
+    Set<String> discoveredFilesSecond =
+        splitSecond.task().files().stream()
+            .map(fileScanTask -> fileScanTask.file().path().toString())
+            .collect(Collectors.toSet());
+    // should discover dataFile1 appended in snapshot1
+    Set<String> expectedFilesSecond = ImmutableSet.of(dataFile1.path().toString());
+    Assert.assertEquals(expectedFilesSecond, discoveredFilesSecond);
+
+    ContinuousEnumerationResult thirdResult = splitPlanner.planSplits(secondResult.toPosition());
+    Assert.assertEquals(
+        snapshot1.snapshotId(), thirdResult.fromPosition().snapshotId().longValue());
+    Assert.assertEquals(
+        snapshot1.timestampMillis(), thirdResult.fromPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(snapshot2.snapshotId(), thirdResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(
+        snapshot2.timestampMillis(), thirdResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit splitThird = Iterables.getOnlyElement(thirdResult.splits());
+    Assert.assertEquals(1, splitThird.task().files().size());
+    Set<String> discoveredFilesThird =
+        splitThird.task().files().stream()
+            .map(fileScanTask -> fileScanTask.file().path().toString())
+            .collect(Collectors.toSet());
+    // should discover dataFile2 appended in snapshot2
+    Set<String> expectedFilesThird = ImmutableSet.of(dataFile2.path().toString());
+    Assert.assertEquals(expectedFilesThird, discoveredFilesThird);

Review Comment:
   Seems like a duplicated code - would it worth to create a method for it, and add some javadoc to describe what it checks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read in case …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1033979400


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java:
##########
@@ -51,7 +51,7 @@ public SimpleSplitAssigner(Collection<IcebergSourceSplitState> assignerState) {
   }
 
   @Override
-  public GetSplitResult getNext(@Nullable String hostname) {
+  public synchronized GetSplitResult getNext(@Nullable String hostname) {

Review Comment:
   added `synchronized` because the new `pendingSplitCount` method can be called from I/O threads, while other methods are only called from the coordinator thread. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035577623


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -92,34 +103,52 @@ public IcebergEnumeratorState snapshotState(long checkpointId) {
 
   /** This method is executed in an IO thread pool. */
   private ContinuousEnumerationResult discoverSplits() {
-    return splitPlanner.planSplits(enumeratorPosition.get());
+    int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+    if (enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+      // If the assigner already has many pending splits, it is better to pause split discovery.
+      // Otherwise, eagerly discovering more splits will just increase assigner memory footprint
+      // and enumerator checkpoint state size.
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending splits: {}",
+          pendingSplitCountFromAssigner);
+      return new ContinuousEnumerationResult(
+          Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
+    } else {
+      return splitPlanner.planSplits(enumeratorPosition.get());
+    }
   }
 
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
-        // thread pool.
-        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the
-        // thread
-        // pool is busy and multiple discovery actions are executed concurrently. Discovery result
-        // should
-        // only be accepted if the starting position matches the enumerator position (like
-        // compare-and-swap).
+        // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
+        // tests) or the thread pool is busy and multiple discovery actions are executed
+        // concurrently. Discovery result should only be accepted if the starting position
+        // matches the enumerator position (like compare-and-swap).
         LOG.info(
             "Skip {} discovered splits because the scan starting position doesn't match "
                 + "the current enumerator position: enumerator position = {}, scan starting position = {}",
             result.splits().size(),
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
-        assigner.onDiscoveredSplits(result.splits());
-        LOG.info(
-            "Added {} splits discovered between ({}, {}] to the assigner",
-            result.splits().size(),
-            result.fromPosition(),
-            result.toPosition());
+        // Sometimes, enumeration may yield no splits for a few reasons.
+        // - upstream paused or delayed streaming writes to the Iceberg table.
+        // - enumeration frequency is higher than the upstream write frequency.
+        if (!result.splits().isEmpty()) {
+          assigner.onDiscoveredSplits(result.splits());
+          // EnumerationHistory makes throttling decision on split discovery
+          // based on the total number of splits discovered in the last a few cycles.
+          // Only update enumeration history when there are some discovered splits.
+          enumerationHistory.add(result.splits().size());
+          LOG.info(
+              "Added {} splits discovered between ({}, {}] to the assigner",
+              result.splits().size(),
+              result.fromPosition(),
+              result.toPosition());
+        }

Review Comment:
   nit: Maybe an info log here, or minimally a debug log, that there were no split to add?
   Otherwise it might cause problems understanding why the discovery is "not working" 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1037864725


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -92,34 +103,52 @@ public IcebergEnumeratorState snapshotState(long checkpointId) {
 
   /** This method is executed in an IO thread pool. */
   private ContinuousEnumerationResult discoverSplits() {
-    return splitPlanner.planSplits(enumeratorPosition.get());
+    int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+    if (enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+      // If the assigner already has many pending splits, it is better to pause split discovery.
+      // Otherwise, eagerly discovering more splits will just increase assigner memory footprint
+      // and enumerator checkpoint state size.
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending splits: {}",
+          pendingSplitCountFromAssigner);
+      return new ContinuousEnumerationResult(
+          Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
+    } else {
+      return splitPlanner.planSplits(enumeratorPosition.get());
+    }
   }
 
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
-        // thread pool.
-        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the
-        // thread
-        // pool is busy and multiple discovery actions are executed concurrently. Discovery result
-        // should
-        // only be accepted if the starting position matches the enumerator position (like
-        // compare-and-swap).
+        // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
+        // tests) or the thread pool is busy and multiple discovery actions are executed
+        // concurrently. Discovery result should only be accepted if the starting position
+        // matches the enumerator position (like compare-and-swap).
         LOG.info(
             "Skip {} discovered splits because the scan starting position doesn't match "
                 + "the current enumerator position: enumerator position = {}, scan starting position = {}",
             result.splits().size(),
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
-        assigner.onDiscoveredSplits(result.splits());
-        LOG.info(
-            "Added {} splits discovered between ({}, {}] to the assigner",
-            result.splits().size(),
-            result.fromPosition(),
-            result.toPosition());
+        // Sometimes, enumeration may yield no splits for a few reasons.
+        // - upstream paused or delayed streaming writes to the Iceberg table.
+        // - enumeration frequency is higher than the upstream write frequency.
+        if (!result.splits().isEmpty()) {
+          assigner.onDiscoveredSplits(result.splits());
+          // EnumerationHistory makes throttling decision on split discovery
+          // based on the total number of splits discovered in the last a few cycles.
+          // Only update enumeration history when there are some discovered splits.
+          enumerationHistory.add(result.splits().size());

Review Comment:
   Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038422914


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   @pvary Just to make sure that I understand you correctly. 
   
   You agree that the big file and restart is not a problem for causing infinite amount of enumerated splits. 
   
   You see that constant restarts with completed checkpoints btw can lead to ever-growing enumerated splits. For that reason, you are in favor of adding the checkpointing of the enumeration history for more proper throttling behavior, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read in case …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1034003585


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   I am taking this simple approach now. I am open to checkpoint the enumeration history as part of the enumerator state if people think it is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1039860822


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038429027


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -147,6 +148,85 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
         .contains(splits.get(0));
   }
 
+  @Test
+  public void testThrottlingDiscovery() throws Exception {
+    // create 10 splits
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 1);
+
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // discover one snapshot at a time
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register reader-2, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2, new SplitRequestEvent());
+
+    // add splits[0] to the planner for next discovery
+    splitPlanner.addSplits(Arrays.asList(splits.get(0)));
+    enumeratorContext.triggerAllActions();
+
+    // because discovered split was assigned to reader, pending splits should be empty
+    Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size());
+    // split assignment to reader-2 should contain splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // add the remaining 9 splits (one for every snapshot)
+    // run discovery cycles while reader-2 still processing the splits[0]
+    for (int i = 1; i < 10; ++i) {
+      splitPlanner.addSplits(Arrays.asList(splits.get(i)));
+      enumeratorContext.triggerAllActions();
+    }
+
+    // can only discover up to 3 snapshots/splits
+    Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[0]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned splits[1] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // run 3 more split discovery cycles
+    for (int i = 0; i < 3; ++i) {
+      enumeratorContext.triggerAllActions();
+    }
+
+    // no more splits are discovered due to throttling
+    Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size());
+    // split assignment to reader-2 should still be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[1]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned new splits[2] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 3)
+    Assert.assertEquals(
+        splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());

Review Comment:
   I will keep it as it is then for better readability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038742272


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   What I missed before, but realized now is that the reader stores the row position of the file reader in the checkpoint state. So the reader will not start from the beginning, and this way we will not advance the checkpoint infinitely...
   
   Still feel that it will cause issues/misunderstanding later if only some part of the IcebergSource state is checkpointed. The source should be either fully checkpointed or not at all.
   I would not have exceptions for this rule without a very strong reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1037864475


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   We might have a big file where the read takes a while (or short checkpointing period), and we face an error at the end of the file.
   
   If it is not too much effort/overhead, I would add the checkpointing here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035464292


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   well, there is still subjective to the `maxPlanningSnapshotCount` limit. its default is `Integer.MAX_VALUE`. For streaming read, I actually think it should be set to some reasonable value like 6 or 10 to avoid potentially discovering too many splits into enumerator memory. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035604579


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   Do we checkpoint the already planned splits?
   
   If so then after a recovery:
   - pendingSplitCountFromAssigner - returns the valid value
   - totalSplitCountFromRecentDiscovery - is 0 after a recovery
   
   If my reasoning above is right then `shouldPauseSplitDiscovery` will return `true` until there is no split left pending which might cause a small hickup after a checkpoint recovery.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035608761


##########
bundled-guava/src/main/java/org/apache/iceberg/GuavaClasses.java:
##########
@@ -71,6 +72,7 @@ public class GuavaClasses {
     Splitter.class.getName();
     Throwables.class.getName();
     BiMap.class.getName();
+    EvictingQueue.class.getName();

Review Comment:
   Do we have an easy alternative here?
   Last time I have tried to add classes here, Ryan said that he tries to avoid adding further guava classes, if it is not strictly necessary.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035464292


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   well, there is still the limit of to `maxPlanningSnapshotCount`. its default is `Integer.MAX_VALUE`. For streaming read, I actually think it should be set to some reasonable value like 6 or 10 to avoid potentially discovering too many splits into enumerator memory. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036451142


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -147,6 +148,85 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
         .contains(splits.get(0));
   }
 
+  @Test
+  public void testThrottlingDiscovery() throws Exception {
+    // create 10 splits
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 1);
+
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // discover one snapshot at a time
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register reader-2, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2, new SplitRequestEvent());
+
+    // add splits[0] to the planner for next discovery
+    splitPlanner.addSplits(Arrays.asList(splits.get(0)));
+    enumeratorContext.triggerAllActions();
+
+    // because discovered split was assigned to reader, pending splits should be empty
+    Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size());
+    // split assignment to reader-2 should contain splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // add the remaining 9 splits (one for every snapshot)
+    // run discovery cycles while reader-2 still processing the splits[0]
+    for (int i = 1; i < 10; ++i) {
+      splitPlanner.addSplits(Arrays.asList(splits.get(i)));
+      enumeratorContext.triggerAllActions();
+    }
+
+    // can only discover up to 3 snapshots/splits
+    Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[0]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned splits[1] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // run 3 more split discovery cycles
+    for (int i = 0; i < 3; ++i) {
+      enumeratorContext.triggerAllActions();
+    }
+
+    // no more splits are discovered due to throttling
+    Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size());
+    // split assignment to reader-2 should still be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[1]
+    enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned new splits[2] to reader-2, one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 3)
+    Assert.assertEquals(
+        splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());

Review Comment:
   I think you are talking about the duplication btw 199-206 and 220-227. We can extract a method to save the some code duplication. I didn't do it for the following reasons.
   1. it is 5 lines of code
   2. the readability may not be as easy when calling a method like `runOneCycle(int subtask, int pendingSplitCount, List<Integer> expectedSplitAssignment, long checkpointId)`, as it is hard to see what the integer arg means.
   
   But I am not very opinionated here. Please let me know if you think the method extraction is better here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036531697


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -507,4 +507,71 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception {
       lastPosition = verifyOneCycle(splitPlanner, lastPosition);
     }
   }
+
+  @Test
+  public void testMaxPlanningSnapshotCount() throws Exception {
+    appendTwoSnapshots();
+    // append 3 more snapshots
+    for (int i = 2; i < 5; ++i) {
+      appendSnapshot(i, 2);
+    }
+
+    ScanContext scanContext =
+        ScanContext.builder()
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // limit to 1 snapshot per discovery
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ContinuousSplitPlannerImpl splitPlanner =
+        new ContinuousSplitPlannerImpl(tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    Assert.assertNull(initialResult.fromPosition());
+    // For inclusive behavior, the initial result should point to snapshot1's parent,
+    // which leads to null snapshotId and snapshotTimestampMs.
+    Assert.assertNull(initialResult.toPosition().snapshotId());
+    Assert.assertNull(initialResult.toPosition().snapshotTimestampMs());
+    Assert.assertEquals(0, initialResult.splits().size());
+
+    ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+    Assert.assertNull(secondResult.fromPosition().snapshotId());
+    Assert.assertNull(secondResult.fromPosition().snapshotTimestampMs());
+    Assert.assertEquals(snapshot1.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(
+        snapshot1.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit splitSecond = Iterables.getOnlyElement(secondResult.splits());
+    Assert.assertEquals(1, splitSecond.task().files().size());
+    Set<String> discoveredFilesSecond =
+        splitSecond.task().files().stream()
+            .map(fileScanTask -> fileScanTask.file().path().toString())
+            .collect(Collectors.toSet());
+    // should discover dataFile1 appended in snapshot1
+    Set<String> expectedFilesSecond = ImmutableSet.of(dataFile1.path().toString());
+    Assert.assertEquals(expectedFilesSecond, discoveredFilesSecond);
+
+    ContinuousEnumerationResult thirdResult = splitPlanner.planSplits(secondResult.toPosition());
+    Assert.assertEquals(
+        snapshot1.snapshotId(), thirdResult.fromPosition().snapshotId().longValue());
+    Assert.assertEquals(
+        snapshot1.timestampMillis(), thirdResult.fromPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(snapshot2.snapshotId(), thirdResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(
+        snapshot2.timestampMillis(), thirdResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit splitThird = Iterables.getOnlyElement(thirdResult.splits());
+    Assert.assertEquals(1, splitThird.task().files().size());
+    Set<String> discoveredFilesThird =
+        splitThird.task().files().stream()
+            .map(fileScanTask -> fileScanTask.file().path().toString())
+            .collect(Collectors.toSet());
+    // should discover dataFile2 appended in snapshot2
+    Set<String> expectedFilesThird = ImmutableSet.of(dataFile2.path().toString());
+    Assert.assertEquals(expectedFilesThird, discoveredFilesThird);

Review Comment:
   agree. will refactor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#issuecomment-1338169362

   Thanks @pvary and @hililiwei for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu merged PR #6299:
URL: https://github.com/apache/iceberg/pull/6299


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038246186


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   > We might have a big file where the read takes a while (or short checkpointing period), and we face an error at the end of the file
   
   This is orthogonal to the split discovery throttling feature in this PR. It is about processing semantics / duplicates upon failure recovery. Flink can achieve exactly-once processing semantics regarding state consistency. Whether it is e2e exactly-once semantics depends on sink (transactional or not). For Iceberg sink (with transactional commit), recovery won't cause duplicates in the sink Iceberg table.
   
   > here is a pathological scenario. Job was in a restart loop with successful checkpoints (like 1 or 2) btw restart. This would essentially bypass the throttling check. Potentially the tracked splits can keep growing. That is the small concern I has regarding not checkpointing. The concern is small, because this scenario is very unusual. Hence, I didn't implement the checkpointing of enumeration history (for throttling purpose).
   
   Whether we checkpoint the enumeration history or not really only affects the throttling feature in this PR. I can only think of the above pathological scenario where not checkpointing can be a potential problem. But again, it is a unusual scenario but it could happen in theory.
   
   We can add the checkpointing here without breaking the compatibility as the `IcebergEnumeratorStateSerializer` is versioned. We can just bump up the version and handle both the old and new versions.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1039049527


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   fair enough. added the checkpoint of the enumeration split count history (for throttling purpose)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036434639


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   @pvary you are correct here. This is the downside of not checkpointing enumeration history. After checkpoint recovery, enumerator can discover more splits because totalSplitCountFromRecentDiscovery is 0 after recovery. There are two scenarios.
   1. job was stuck in a failure/restart loop. this is not a concern as there is no successful checkpoint. newly discovered splits aren't saved. the tracked splits won't grow forever.
   2. here is a pathological scenario. Job was in a restart loop with successful checkpoints (like 1 or 2) btw restart. This would essentially bypass the throttling check. That is the small concern I has regarding not checkpointing. The concern is small, because this scenario is very unusual. Hence, I didn't implement the checkpointing of enumeration history (for throttling purpose).
   
   On the other hand, it's not too difficult to add the checkpointing of enumeration history for completeness or ability to handle the above edge case. I am open to add it if desired.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036307161


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -92,34 +103,52 @@ public IcebergEnumeratorState snapshotState(long checkpointId) {
 
   /** This method is executed in an IO thread pool. */
   private ContinuousEnumerationResult discoverSplits() {
-    return splitPlanner.planSplits(enumeratorPosition.get());
+    int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+    if (enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+      // If the assigner already has many pending splits, it is better to pause split discovery.
+      // Otherwise, eagerly discovering more splits will just increase assigner memory footprint
+      // and enumerator checkpoint state size.
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending splits: {}",
+          pendingSplitCountFromAssigner);
+      return new ContinuousEnumerationResult(
+          Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
+    } else {
+      return splitPlanner.planSplits(enumeratorPosition.get());
+    }
   }
 
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
-        // thread pool.
-        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the
-        // thread
-        // pool is busy and multiple discovery actions are executed concurrently. Discovery result
-        // should
-        // only be accepted if the starting position matches the enumerator position (like
-        // compare-and-swap).
+        // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
+        // tests) or the thread pool is busy and multiple discovery actions are executed
+        // concurrently. Discovery result should only be accepted if the starting position
+        // matches the enumerator position (like compare-and-swap).
         LOG.info(
             "Skip {} discovered splits because the scan starting position doesn't match "
                 + "the current enumerator position: enumerator position = {}, scan starting position = {}",
             result.splits().size(),
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
-        assigner.onDiscoveredSplits(result.splits());
-        LOG.info(
-            "Added {} splits discovered between ({}, {}] to the assigner",
-            result.splits().size(),
-            result.fromPosition(),
-            result.toPosition());
+        // Sometimes, enumeration may yield no splits for a few reasons.
+        // - upstream paused or delayed streaming writes to the Iceberg table.
+        // - enumeration frequency is higher than the upstream write frequency.
+        if (!result.splits().isEmpty()) {
+          assigner.onDiscoveredSplits(result.splits());
+          // EnumerationHistory makes throttling decision on split discovery
+          // based on the total number of splits discovered in the last a few cycles.
+          // Only update enumeration history when there are some discovered splits.
+          enumerationHistory.add(result.splits().size());

Review Comment:
   that is correct. `processDiscoveredSplits` handler is executed in single coordinator thread, while `discoverSplits` callable is executed in I/O thread pool.
   
   ```
       /**
        * Invoke the given callable periodically and handover the return value to the handler which
        * will be executed by the source coordinator. When this method is invoked multiple times, The
        * <code>Callable</code>s may be executed in a thread pool concurrently.
        *
        * <p>It is important to make sure that the callable does not modify any shared state,
        * especially the states that will be a part of the {@link SplitEnumerator#snapshotState(long)}.
        * Otherwise, there might be unexpected behavior.
        *
        * <p>Note that an exception thrown from the handler would result in failing the job.
        *
        * @param callable the callable to call.
        * @param handler a handler that handles the return value of or the exception thrown from the
        *     callable.
        * @param initialDelayMillis the initial delay of calling the callable, in milliseconds.
        * @param periodMillis the period between two invocations of the callable, in milliseconds.
        */
       <T> void callAsync(
               Callable<T> callable,
               BiConsumer<T, Throwable> handler,
               long initialDelayMillis,
               long periodMillis);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036305419


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -92,34 +103,52 @@ public IcebergEnumeratorState snapshotState(long checkpointId) {
 
   /** This method is executed in an IO thread pool. */
   private ContinuousEnumerationResult discoverSplits() {
-    return splitPlanner.planSplits(enumeratorPosition.get());
+    int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+    if (enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+      // If the assigner already has many pending splits, it is better to pause split discovery.
+      // Otherwise, eagerly discovering more splits will just increase assigner memory footprint
+      // and enumerator checkpoint state size.
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending splits: {}",
+          pendingSplitCountFromAssigner);
+      return new ContinuousEnumerationResult(
+          Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
+    } else {
+      return splitPlanner.planSplits(enumeratorPosition.get());
+    }
   }
 
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
-        // thread pool.
-        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the
-        // thread
-        // pool is busy and multiple discovery actions are executed concurrently. Discovery result
-        // should
-        // only be accepted if the starting position matches the enumerator position (like
-        // compare-and-swap).
+        // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
+        // tests) or the thread pool is busy and multiple discovery actions are executed
+        // concurrently. Discovery result should only be accepted if the starting position
+        // matches the enumerator position (like compare-and-swap).
         LOG.info(
             "Skip {} discovered splits because the scan starting position doesn't match "
                 + "the current enumerator position: enumerator position = {}, scan starting position = {}",
             result.splits().size(),
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
-        assigner.onDiscoveredSplits(result.splits());
-        LOG.info(
-            "Added {} splits discovered between ({}, {}] to the assigner",
-            result.splits().size(),
-            result.fromPosition(),
-            result.toPosition());
+        // Sometimes, enumeration may yield no splits for a few reasons.
+        // - upstream paused or delayed streaming writes to the Iceberg table.
+        // - enumeration frequency is higher than the upstream write frequency.
+        if (!result.splits().isEmpty()) {
+          assigner.onDiscoveredSplits(result.splits());
+          // EnumerationHistory makes throttling decision on split discovery
+          // based on the total number of splits discovered in the last a few cycles.
+          // Only update enumeration history when there are some discovered splits.
+          enumerationHistory.add(result.splits().size());
+          LOG.info(
+              "Added {} splits discovered between ({}, {}] to the assigner",
+              result.splits().size(),
+              result.fromPosition(),
+              result.toPosition());
+        }

Review Comment:
   sure. will add INFO log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036309386


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java:
##########
@@ -78,6 +79,22 @@ public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosi
     }
   }
 
+  private Snapshot toSnapshotInclusive(
+      Long lastConsumedSnapshotId, Snapshot currentSnapshot, int maxPlanningSnapshotCount) {
+    // snapshots are in reverse order (latest snapshot first)
+    List<Snapshot> snapshots =
+        Lists.newArrayList(
+            SnapshotUtil.ancestorsBetween(
+                table, currentSnapshot.snapshotId(), lastConsumedSnapshotId));
+    if (snapshots.size() <= maxPlanningSnapshotCount) {
+      return currentSnapshot;
+    } else {
+      // Because snapshots are in reverse order of commit history, this index returns
+      // the max allowed number of snapshots from the lastConsumedSnapshotId.
+      return snapshots.get(snapshots.size() - maxPlanningSnapshotCount);
+    }
+  }
+
   /** Discover incremental changes between {@code lastPosition} and current table snapshot */

Review Comment:
   will just remove the comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036434639


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   @pvary you are correct here. This is the downside of not checkpointing enumeration history. After checkpoint recovery, enumerator can discover more splits because totalSplitCountFromRecentDiscovery is 0 after recovery. There are two scenarios.
   1. job was stuck in a failure/restart loop. this is not a concern as there is no successful checkpoint. newly discovered splits aren't saved.
   2. here is a pathological scenario. Job was in a restart loop with successful checkpoints (like 1 or 2) btw restart. This would essentially bypass the throttling check. That is the small concern I has regarding not checkpointing. However, in practice, this scenario is very unusual. Hence, I didn't implement the checkpointing of enumeration history (for throttling purpose).
   
   On the other hand, it's not too difficult to add the checkpointing of enumeration history for completeness or ability to handle the above edge case. Hence I am open to add it if desired.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read in case …

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1034003585


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   Right now, enumeration history (for the purpose of throttling) is not checkpointed. I am taking this simple approach for now. I am open to checkpoint the enumeration history as part of the enumerator state if people think it is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

Posted by GitBox <gi...@apache.org>.
pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038397611


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/EnumerationHistory.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.enumerator;
+
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.calcite.shaded.com.google.common.collect.EvictingQueue;
+
+/**
+ * This enumeration history is used for split discovery throttling. It wraps Guava {@link
+ * EvictingQueue} to provide thread safety.
+ */
+@ThreadSafe
+class EnumerationHistory {
+
+  // EvictingQueue is not thread safe.
+  private final EvictingQueue<Integer> enumerationSplitCountHistory;
+
+  EnumerationHistory(int maxHistorySize) {
+    this.enumerationSplitCountHistory = EvictingQueue.create(maxHistorySize);
+  }
+
+  /** Add the split count from the last enumeration result. */
+  synchronized void add(int splitCount) {
+    enumerationSplitCountHistory.add(splitCount);
+  }
+
+  /** @return true if split discovery should pause because assigner has too many splits already. */
+  synchronized boolean shouldPauseSplitDiscovery(int pendingSplitCountFromAssigner) {
+    if (enumerationSplitCountHistory.remainingCapacity() > 0) {
+      // only check throttling when full history is obtained.

Review Comment:
   My example for the big file was my stab at finding another edge case where we end up infinite amount of enumerated splits. Since we do not have a state, any issue which causes restarts, but creates checkpoints could cause issues. Slow and failing reads could be one such example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org