You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/30 06:37:18 UTC

[GitHub] [iceberg] pvary commented on a diff in pull request #6299: Flink: support split discovery throttling for streaming read

pvary commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1035577623


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -92,34 +103,52 @@ public IcebergEnumeratorState snapshotState(long checkpointId) {
 
   /** This method is executed in an IO thread pool. */
   private ContinuousEnumerationResult discoverSplits() {
-    return splitPlanner.planSplits(enumeratorPosition.get());
+    int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+    if (enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+      // If the assigner already has many pending splits, it is better to pause split discovery.
+      // Otherwise, eagerly discovering more splits will just increase assigner memory footprint
+      // and enumerator checkpoint state size.
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending splits: {}",
+          pendingSplitCountFromAssigner);
+      return new ContinuousEnumerationResult(
+          Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
+    } else {
+      return splitPlanner.planSplits(enumeratorPosition.get());
+    }
   }
 
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
-        // thread pool.
-        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the
-        // thread
-        // pool is busy and multiple discovery actions are executed concurrently. Discovery result
-        // should
-        // only be accepted if the starting position matches the enumerator position (like
-        // compare-and-swap).
+        // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
+        // tests) or the thread pool is busy and multiple discovery actions are executed
+        // concurrently. Discovery result should only be accepted if the starting position
+        // matches the enumerator position (like compare-and-swap).
         LOG.info(
             "Skip {} discovered splits because the scan starting position doesn't match "
                 + "the current enumerator position: enumerator position = {}, scan starting position = {}",
             result.splits().size(),
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
-        assigner.onDiscoveredSplits(result.splits());
-        LOG.info(
-            "Added {} splits discovered between ({}, {}] to the assigner",
-            result.splits().size(),
-            result.fromPosition(),
-            result.toPosition());
+        // Sometimes, enumeration may yield no splits for a few reasons.
+        // - upstream paused or delayed streaming writes to the Iceberg table.
+        // - enumeration frequency is higher than the upstream write frequency.
+        if (!result.splits().isEmpty()) {
+          assigner.onDiscoveredSplits(result.splits());
+          // EnumerationHistory makes throttling decision on split discovery
+          // based on the total number of splits discovered in the last a few cycles.
+          // Only update enumeration history when there are some discovered splits.
+          enumerationHistory.add(result.splits().size());
+          LOG.info(
+              "Added {} splits discovered between ({}, {}] to the assigner",
+              result.splits().size(),
+              result.fromPosition(),
+              result.toPosition());
+        }

Review Comment:
   nit: Maybe an info log here, or minimally a debug log, that there were no split to add?
   Otherwise it might cause problems understanding why the discovery is "not working" 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org