You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/05/18 08:55:12 UTC

[iceberg] branch master updated: Flink: backport Add config for max allowed consecutive planning failures in IcebergSource before failing the job (#7571) to 1.16 and 1.15 (#7629)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 477da36e3f Flink: backport Add config for max allowed consecutive planning failures in IcebergSource before failing the job (#7571) to 1.16 and 1.15 (#7629)
477da36e3f is described below

commit 477da36e3fd5be7f96903010557ddf5ebfad7c6d
Author: pvary <pe...@gmail.com>
AuthorDate: Thu May 18 10:55:02 2023 +0200

    Flink: backport Add config for max allowed consecutive planning failures in IcebergSource before failing the job (#7571) to 1.16 and 1.15 (#7629)
---
 .../org/apache/iceberg/flink/FlinkReadConf.java    |   9 ++
 .../org/apache/iceberg/flink/FlinkReadOptions.java |   4 +
 .../apache/iceberg/flink/source/IcebergSource.java |   7 ++
 .../apache/iceberg/flink/source/ScanContext.java   |  24 ++++-
 .../enumerator/ContinuousIcebergEnumerator.java    |  12 ++-
 .../enumerator/ManualContinuousSplitPlanner.java   |   9 +-
 .../TestContinuousIcebergEnumerator.java           | 115 ++++++++++++++++++++-
 .../org/apache/iceberg/flink/FlinkReadConf.java    |   9 ++
 .../org/apache/iceberg/flink/FlinkReadOptions.java |   4 +
 .../apache/iceberg/flink/source/IcebergSource.java |   7 ++
 .../apache/iceberg/flink/source/ScanContext.java   |  24 ++++-
 .../enumerator/ContinuousIcebergEnumerator.java    |  12 ++-
 .../enumerator/ManualContinuousSplitPlanner.java   |   9 +-
 .../TestContinuousIcebergEnumerator.java           | 115 ++++++++++++++++++++-
 14 files changed, 346 insertions(+), 14 deletions(-)

diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index baef57a8e7..0e04c9affb 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -181,4 +181,13 @@ public class FlinkReadConf {
         .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
         .parse();
   }
+
+  public int maxAllowedPlanningFailures() {
+    return confParser
+        .intConf()
+        .option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
+        .flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
+        .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
+        .parse();
+  }
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index d75b2234d7..55c5aca3b6 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -105,4 +105,8 @@ public class FlinkReadOptions {
   public static final String LIMIT = "limit";
   public static final ConfigOption<Long> LIMIT_OPTION =
       ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+  public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
+  public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
+      ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0675305e10..cbdd184870 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -391,6 +391,13 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
+      readOptions.put(
+          FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+          Integer.toString(maxAllowedPlanningFailures));
+      return this;
+    }
+
     /**
      * Set the read properties for Flink source. View the supported properties in {@link
      * FlinkReadOptions}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 23f33e6d2e..e380204e87 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
   private final boolean includeColumnStats;
   private final Integer planParallelism;
   private final int maxPlanningSnapshotCount;
+  private final int maxAllowedPlanningFailures;
 
   private ScanContext(
       boolean caseSensitive,
@@ -86,6 +87,7 @@ public class ScanContext implements Serializable {
       boolean exposeLocality,
       Integer planParallelism,
       int maxPlanningSnapshotCount,
+      int maxAllowedPlanningFailures,
       String branch,
       String tag,
       String startTag,
@@ -115,6 +117,7 @@ public class ScanContext implements Serializable {
     this.exposeLocality = exposeLocality;
     this.planParallelism = planParallelism;
     this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
+    this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
 
     validate();
   }
@@ -155,6 +158,10 @@ public class ScanContext implements Serializable {
     Preconditions.checkArgument(
         !(endTag != null && endSnapshotId() != null),
         "END_SNAPSHOT_ID and END_TAG cannot both be set.");
+
+    Preconditions.checkArgument(
+        maxAllowedPlanningFailures >= -1,
+        "Cannot set maxAllowedPlanningFailures to a negative number other than -1.");
   }
 
   public boolean caseSensitive() {
@@ -253,6 +260,10 @@ public class ScanContext implements Serializable {
     return maxPlanningSnapshotCount;
   }
 
+  public int maxAllowedPlanningFailures() {
+    return maxAllowedPlanningFailures;
+  }
+
   public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -277,6 +288,7 @@ public class ScanContext implements Serializable {
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+        .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
         .build();
   }
 
@@ -304,6 +316,7 @@ public class ScanContext implements Serializable {
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+        .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
         .build();
   }
 
@@ -341,6 +354,8 @@ public class ScanContext implements Serializable {
         FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
     private int maxPlanningSnapshotCount =
         FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
+    private int maxAllowedPlanningFailures =
+        FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
 
     private Builder() {}
 
@@ -464,6 +479,11 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
+      this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
+      return this;
+    }
+
     public Builder resolveConfig(
         Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
       FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
@@ -488,7 +508,8 @@ public class ScanContext implements Serializable {
           .limit(flinkReadConf.limit())
           .planParallelism(flinkReadConf.workerPoolSize())
           .includeColumnStats(flinkReadConf.includeColumnStats())
-          .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
+          .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
+          .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
     }
 
     public ScanContext build() {
@@ -513,6 +534,7 @@ public class ScanContext implements Serializable {
           exposeLocality,
           planParallelism,
           maxPlanningSnapshotCount,
+          maxAllowedPlanningFailures,
           branch,
           tag,
           startTag,
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b84dab190a..b1dadfb9a6 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
   /** Track enumeration result history for split discovery throttling. */
   private final EnumerationHistory enumerationHistory;
 
+  /** Count the consecutive failures and throw exception if the max allowed failres are reached */
+  private transient int consecutiveFailures = 0;
+
   public ContinuousIcebergEnumerator(
       SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
       SplitAssigner assigner,
@@ -122,6 +125,7 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
+      consecutiveFailures = 0;
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
         // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
@@ -161,7 +165,13 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() < 0
+          || consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
+        LOG.error("Failed to discover new splits", error);
+      } else {
+        throw new RuntimeException("Failed to discover new splits", error);
+      }
     }
   }
 }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
index 7575beed6e..ebc92df023 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
   // track splits per snapshot
   private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
   private long latestSnapshotId;
+  private int remainingFailures;
 
-  ManualContinuousSplitPlanner(ScanContext scanContext) {
+  ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
     this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
     this.splits = new TreeMap<>();
     this.latestSnapshotId = 0L;
+    this.remainingFailures = expectedFailures;
   }
 
   @Override
   public synchronized ContinuousEnumerationResult planSplits(
       IcebergEnumeratorPosition lastPosition) {
+    if (remainingFailures > 0) {
+      remainingFailures--;
+      throw new RuntimeException("Expected failure at planning");
+    }
+
     long fromSnapshotIdExclusive = 0;
     if (lastPosition != null && lastPosition.snapshotId() != null) {
       fromSnapshotIdExclusive = lastPosition.snapshotId();
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index a051a4de0f..d0ae8fdf77 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -51,7 +51,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -81,7 +81,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -110,7 +110,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -163,7 +163,7 @@ public class TestContinuousIcebergEnumerator {
             // discover one snapshot at a time
             .maxPlanningSnapshotCount(1)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -227,6 +227,113 @@ public class TestContinuousIcebergEnumerator {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that no splits returned due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Second scan planning should succeed and discover the expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(3).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningErrors() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testPlanningIgnoringErrors() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Can not discover the new split with planning failures
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(i).pendingSplits();
+      Assert.assertEquals(0, pendingSplits.size());
+    }
+
+    // Discovered the new split after a successful scan planning
+    enumeratorContext.triggerAllActions();
+    pendingSplits = enumerator.snapshotState(expectedFailures + 1).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
   private static ContinuousIcebergEnumerator createEnumerator(
       SplitEnumeratorContext<IcebergSourceSplit> context,
       ScanContext scanContext,
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index baef57a8e7..0e04c9affb 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -181,4 +181,13 @@ public class FlinkReadConf {
         .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
         .parse();
   }
+
+  public int maxAllowedPlanningFailures() {
+    return confParser
+        .intConf()
+        .option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
+        .flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
+        .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
+        .parse();
+  }
 }
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index d75b2234d7..55c5aca3b6 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -105,4 +105,8 @@ public class FlinkReadOptions {
   public static final String LIMIT = "limit";
   public static final ConfigOption<Long> LIMIT_OPTION =
       ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+  public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
+  public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
+      ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
 }
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0675305e10..cbdd184870 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -391,6 +391,13 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
+      readOptions.put(
+          FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+          Integer.toString(maxAllowedPlanningFailures));
+      return this;
+    }
+
     /**
      * Set the read properties for Flink source. View the supported properties in {@link
      * FlinkReadOptions}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 23f33e6d2e..e380204e87 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
   private final boolean includeColumnStats;
   private final Integer planParallelism;
   private final int maxPlanningSnapshotCount;
+  private final int maxAllowedPlanningFailures;
 
   private ScanContext(
       boolean caseSensitive,
@@ -86,6 +87,7 @@ public class ScanContext implements Serializable {
       boolean exposeLocality,
       Integer planParallelism,
       int maxPlanningSnapshotCount,
+      int maxAllowedPlanningFailures,
       String branch,
       String tag,
       String startTag,
@@ -115,6 +117,7 @@ public class ScanContext implements Serializable {
     this.exposeLocality = exposeLocality;
     this.planParallelism = planParallelism;
     this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
+    this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
 
     validate();
   }
@@ -155,6 +158,10 @@ public class ScanContext implements Serializable {
     Preconditions.checkArgument(
         !(endTag != null && endSnapshotId() != null),
         "END_SNAPSHOT_ID and END_TAG cannot both be set.");
+
+    Preconditions.checkArgument(
+        maxAllowedPlanningFailures >= -1,
+        "Cannot set maxAllowedPlanningFailures to a negative number other than -1.");
   }
 
   public boolean caseSensitive() {
@@ -253,6 +260,10 @@ public class ScanContext implements Serializable {
     return maxPlanningSnapshotCount;
   }
 
+  public int maxAllowedPlanningFailures() {
+    return maxAllowedPlanningFailures;
+  }
+
   public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
@@ -277,6 +288,7 @@ public class ScanContext implements Serializable {
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+        .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
         .build();
   }
 
@@ -304,6 +316,7 @@ public class ScanContext implements Serializable {
         .exposeLocality(exposeLocality)
         .planParallelism(planParallelism)
         .maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+        .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
         .build();
   }
 
@@ -341,6 +354,8 @@ public class ScanContext implements Serializable {
         FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
     private int maxPlanningSnapshotCount =
         FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
+    private int maxAllowedPlanningFailures =
+        FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
 
     private Builder() {}
 
@@ -464,6 +479,11 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
+      this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
+      return this;
+    }
+
     public Builder resolveConfig(
         Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
       FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
@@ -488,7 +508,8 @@ public class ScanContext implements Serializable {
           .limit(flinkReadConf.limit())
           .planParallelism(flinkReadConf.workerPoolSize())
           .includeColumnStats(flinkReadConf.includeColumnStats())
-          .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
+          .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
+          .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
     }
 
     public ScanContext build() {
@@ -513,6 +534,7 @@ public class ScanContext implements Serializable {
           exposeLocality,
           planParallelism,
           maxPlanningSnapshotCount,
+          maxAllowedPlanningFailures,
           branch,
           tag,
           startTag,
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b84dab190a..b1dadfb9a6 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
   /** Track enumeration result history for split discovery throttling. */
   private final EnumerationHistory enumerationHistory;
 
+  /** Count the consecutive failures and throw exception if the max allowed failres are reached */
+  private transient int consecutiveFailures = 0;
+
   public ContinuousIcebergEnumerator(
       SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
       SplitAssigner assigner,
@@ -122,6 +125,7 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
     if (error == null) {
+      consecutiveFailures = 0;
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
         // thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
@@ -161,7 +165,13 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() < 0
+          || consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
+        LOG.error("Failed to discover new splits", error);
+      } else {
+        throw new RuntimeException("Failed to discover new splits", error);
+      }
     }
   }
 }
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
index 7575beed6e..ebc92df023 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
   // track splits per snapshot
   private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
   private long latestSnapshotId;
+  private int remainingFailures;
 
-  ManualContinuousSplitPlanner(ScanContext scanContext) {
+  ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
     this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
     this.splits = new TreeMap<>();
     this.latestSnapshotId = 0L;
+    this.remainingFailures = expectedFailures;
   }
 
   @Override
   public synchronized ContinuousEnumerationResult planSplits(
       IcebergEnumeratorPosition lastPosition) {
+    if (remainingFailures > 0) {
+      remainingFailures--;
+      throw new RuntimeException("Expected failure at planning");
+    }
+
     long fromSnapshotIdExclusive = 0;
     if (lastPosition != null && lastPosition.snapshotId() != null) {
       fromSnapshotIdExclusive = lastPosition.snapshotId();
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index a051a4de0f..d0ae8fdf77 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -51,7 +51,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -81,7 +81,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -110,7 +110,7 @@ public class TestContinuousIcebergEnumerator {
             .streaming(true)
             .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -163,7 +163,7 @@ public class TestContinuousIcebergEnumerator {
             // discover one snapshot at a time
             .maxPlanningSnapshotCount(1)
             .build();
-    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
     ContinuousIcebergEnumerator enumerator =
         createEnumerator(enumeratorContext, scanContext, splitPlanner);
 
@@ -227,6 +227,113 @@ public class TestContinuousIcebergEnumerator {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that no splits returned due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Second scan planning should succeed and discover the expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(3).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningErrors() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testPlanningIgnoringErrors() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Can not discover the new split with planning failures
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(i).pendingSplits();
+      Assert.assertEquals(0, pendingSplits.size());
+    }
+
+    // Discovered the new split after a successful scan planning
+    enumeratorContext.triggerAllActions();
+    pendingSplits = enumerator.snapshotState(expectedFailures + 1).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
   private static ContinuousIcebergEnumerator createEnumerator(
       SplitEnumeratorContext<IcebergSourceSplit> context,
       ScanContext scanContext,