You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/05/18 08:55:12 UTC
[iceberg] branch master updated: Flink: backport Add config for max allowed consecutive planning failures in IcebergSource before failing the job (#7571) to 1.16 and 1.15 (#7629)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 477da36e3f Flink: backport Add config for max allowed consecutive planning failures in IcebergSource before failing the job (#7571) to 1.16 and 1.15 (#7629)
477da36e3f is described below
commit 477da36e3fd5be7f96903010557ddf5ebfad7c6d
Author: pvary <pe...@gmail.com>
AuthorDate: Thu May 18 10:55:02 2023 +0200
Flink: backport Add config for max allowed consecutive planning failures in IcebergSource before failing the job (#7571) to 1.16 and 1.15 (#7629)
---
.../org/apache/iceberg/flink/FlinkReadConf.java | 9 ++
.../org/apache/iceberg/flink/FlinkReadOptions.java | 4 +
.../apache/iceberg/flink/source/IcebergSource.java | 7 ++
.../apache/iceberg/flink/source/ScanContext.java | 24 ++++-
.../enumerator/ContinuousIcebergEnumerator.java | 12 ++-
.../enumerator/ManualContinuousSplitPlanner.java | 9 +-
.../TestContinuousIcebergEnumerator.java | 115 ++++++++++++++++++++-
.../org/apache/iceberg/flink/FlinkReadConf.java | 9 ++
.../org/apache/iceberg/flink/FlinkReadOptions.java | 4 +
.../apache/iceberg/flink/source/IcebergSource.java | 7 ++
.../apache/iceberg/flink/source/ScanContext.java | 24 ++++-
.../enumerator/ContinuousIcebergEnumerator.java | 12 ++-
.../enumerator/ManualContinuousSplitPlanner.java | 9 +-
.../TestContinuousIcebergEnumerator.java | 115 ++++++++++++++++++++-
14 files changed, 346 insertions(+), 14 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index baef57a8e7..0e04c9affb 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -181,4 +181,13 @@ public class FlinkReadConf {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}
+
+ public int maxAllowedPlanningFailures() {
+ return confParser
+ .intConf()
+ .option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
+ .flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
+ .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
+ .parse();
+ }
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index d75b2234d7..55c5aca3b6 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -105,4 +105,8 @@ public class FlinkReadOptions {
public static final String LIMIT = "limit";
public static final ConfigOption<Long> LIMIT_OPTION =
ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+ public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
+ public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
+ ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0675305e10..cbdd184870 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -391,6 +391,13 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
return this;
}
+ public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
+ readOptions.put(
+ FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+ Integer.toString(maxAllowedPlanningFailures));
+ return this;
+ }
+
/**
* Set the read properties for Flink source. View the supported properties in {@link
* FlinkReadOptions}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 23f33e6d2e..e380204e87 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
private final boolean includeColumnStats;
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
+ private final int maxAllowedPlanningFailures;
private ScanContext(
boolean caseSensitive,
@@ -86,6 +87,7 @@ public class ScanContext implements Serializable {
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount,
+ int maxAllowedPlanningFailures,
String branch,
String tag,
String startTag,
@@ -115,6 +117,7 @@ public class ScanContext implements Serializable {
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
+ this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
validate();
}
@@ -155,6 +158,10 @@ public class ScanContext implements Serializable {
Preconditions.checkArgument(
!(endTag != null && endSnapshotId() != null),
"END_SNAPSHOT_ID and END_TAG cannot both be set.");
+
+ Preconditions.checkArgument(
+ maxAllowedPlanningFailures >= -1,
+ "Cannot set maxAllowedPlanningFailures to a negative number other than -1.");
}
public boolean caseSensitive() {
@@ -253,6 +260,10 @@ public class ScanContext implements Serializable {
return maxPlanningSnapshotCount;
}
+ public int maxAllowedPlanningFailures() {
+ return maxAllowedPlanningFailures;
+ }
+
public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -277,6 +288,7 @@ public class ScanContext implements Serializable {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}
@@ -304,6 +316,7 @@ public class ScanContext implements Serializable {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}
@@ -341,6 +354,8 @@ public class ScanContext implements Serializable {
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
private int maxPlanningSnapshotCount =
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
+ private int maxAllowedPlanningFailures =
+ FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private Builder() {}
@@ -464,6 +479,11 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
+ this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
+ return this;
+ }
+
public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
@@ -488,7 +508,8 @@ public class ScanContext implements Serializable {
.limit(flinkReadConf.limit())
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
- .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
+ .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
}
public ScanContext build() {
@@ -513,6 +534,7 @@ public class ScanContext implements Serializable {
exposeLocality,
planParallelism,
maxPlanningSnapshotCount,
+ maxAllowedPlanningFailures,
branch,
tag,
startTag,
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b84dab190a..b1dadfb9a6 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
/** Track enumeration result history for split discovery throttling. */
private final EnumerationHistory enumerationHistory;
+ /** Count the consecutive failures and throw exception if the max allowed failres are reached */
+ private transient int consecutiveFailures = 0;
+
public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
@@ -122,6 +125,7 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
/** This method is executed in a single coordinator thread. */
private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
if (error == null) {
+ consecutiveFailures = 0;
if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
// Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
// thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
@@ -161,7 +165,13 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
- LOG.error("Failed to discover new splits", error);
+ consecutiveFailures++;
+ if (scanContext.maxAllowedPlanningFailures() < 0
+ || consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
+ LOG.error("Failed to discover new splits", error);
+ } else {
+ throw new RuntimeException("Failed to discover new splits", error);
+ }
}
}
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
index 7575beed6e..ebc92df023 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
// track splits per snapshot
private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
private long latestSnapshotId;
+ private int remainingFailures;
- ManualContinuousSplitPlanner(ScanContext scanContext) {
+ ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
this.splits = new TreeMap<>();
this.latestSnapshotId = 0L;
+ this.remainingFailures = expectedFailures;
}
@Override
public synchronized ContinuousEnumerationResult planSplits(
IcebergEnumeratorPosition lastPosition) {
+ if (remainingFailures > 0) {
+ remainingFailures--;
+ throw new RuntimeException("Expected failure at planning");
+ }
+
long fromSnapshotIdExclusive = 0;
if (lastPosition != null && lastPosition.snapshotId() != null) {
fromSnapshotIdExclusive = lastPosition.snapshotId();
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index a051a4de0f..d0ae8fdf77 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -51,7 +51,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -81,7 +81,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -110,7 +110,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -163,7 +163,7 @@ public class TestContinuousIcebergEnumerator {
// discover one snapshot at a time
.maxPlanningSnapshotCount(1)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -227,6 +227,113 @@ public class TestContinuousIcebergEnumerator {
splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits);
+
+ // Trigger a planning and check that no splits returned due to the planning error
+ enumeratorContext.triggerAllActions();
+ Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+ // Second scan planning should succeed and discover the expected splits
+ enumeratorContext.triggerAllActions();
+ Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(3).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+ }
+
+ @Test
+ public void testOverMaxAllowedPlanningErrors() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits);
+
+ // Check that the scheduler response ignores the current error and continues to run until the
+ // failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertFalse(
+ enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+ // Check that the task has failed with the expected exception after the failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertTrue(
+ enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+ Assertions.assertThatThrownBy(
+ () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+ .hasCauseInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Failed to discover new split");
+ }
+
+ @Test
+ public void testPlanningIgnoringErrors() throws Exception {
+ int expectedFailures = 3;
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(-1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner =
+ new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits);
+
+ Collection<IcebergSourceSplitState> pendingSplits;
+ // Can not discover the new split with planning failures
+ for (int i = 0; i < expectedFailures; ++i) {
+ enumeratorContext.triggerAllActions();
+ pendingSplits = enumerator.snapshotState(i).pendingSplits();
+ Assert.assertEquals(0, pendingSplits.size());
+ }
+
+ // Discovered the new split after a successful scan planning
+ enumeratorContext.triggerAllActions();
+ pendingSplits = enumerator.snapshotState(expectedFailures + 1).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+ }
+
private static ContinuousIcebergEnumerator createEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> context,
ScanContext scanContext,
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index baef57a8e7..0e04c9affb 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -181,4 +181,13 @@ public class FlinkReadConf {
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
.parse();
}
+
+ public int maxAllowedPlanningFailures() {
+ return confParser
+ .intConf()
+ .option(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES)
+ .flinkConfig(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION)
+ .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
+ .parse();
+ }
}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index d75b2234d7..55c5aca3b6 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -105,4 +105,8 @@ public class FlinkReadOptions {
public static final String LIMIT = "limit";
public static final ConfigOption<Long> LIMIT_OPTION =
ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+ public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
+ public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
+ ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);
}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 0675305e10..cbdd184870 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -391,6 +391,13 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
return this;
}
+ public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
+ readOptions.put(
+ FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+ Integer.toString(maxAllowedPlanningFailures));
+ return this;
+ }
+
/**
* Set the read properties for Flink source. View the supported properties in {@link
* FlinkReadOptions}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 23f33e6d2e..e380204e87 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -64,6 +64,7 @@ public class ScanContext implements Serializable {
private final boolean includeColumnStats;
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
+ private final int maxAllowedPlanningFailures;
private ScanContext(
boolean caseSensitive,
@@ -86,6 +87,7 @@ public class ScanContext implements Serializable {
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount,
+ int maxAllowedPlanningFailures,
String branch,
String tag,
String startTag,
@@ -115,6 +117,7 @@ public class ScanContext implements Serializable {
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
+ this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;
validate();
}
@@ -155,6 +158,10 @@ public class ScanContext implements Serializable {
Preconditions.checkArgument(
!(endTag != null && endSnapshotId() != null),
"END_SNAPSHOT_ID and END_TAG cannot both be set.");
+
+ Preconditions.checkArgument(
+ maxAllowedPlanningFailures >= -1,
+ "Cannot set maxAllowedPlanningFailures to a negative number other than -1.");
}
public boolean caseSensitive() {
@@ -253,6 +260,10 @@ public class ScanContext implements Serializable {
return maxPlanningSnapshotCount;
}
+ public int maxAllowedPlanningFailures() {
+ return maxAllowedPlanningFailures;
+ }
+
public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
@@ -277,6 +288,7 @@ public class ScanContext implements Serializable {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}
@@ -304,6 +316,7 @@ public class ScanContext implements Serializable {
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.build();
}
@@ -341,6 +354,8 @@ public class ScanContext implements Serializable {
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
private int maxPlanningSnapshotCount =
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
+ private int maxAllowedPlanningFailures =
+ FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private Builder() {}
@@ -464,6 +479,11 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
+ this.maxAllowedPlanningFailures = newMaxAllowedPlanningFailures;
+ return this;
+ }
+
public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
@@ -488,7 +508,8 @@ public class ScanContext implements Serializable {
.limit(flinkReadConf.limit())
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
- .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount());
+ .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
+ .maxAllowedPlanningFailures(maxAllowedPlanningFailures);
}
public ScanContext build() {
@@ -513,6 +534,7 @@ public class ScanContext implements Serializable {
exposeLocality,
planParallelism,
maxPlanningSnapshotCount,
+ maxAllowedPlanningFailures,
branch,
tag,
startTag,
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b84dab190a..b1dadfb9a6 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -55,6 +55,9 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
/** Track enumeration result history for split discovery throttling. */
private final EnumerationHistory enumerationHistory;
+ /** Count the consecutive failures and throw exception if the max allowed failres are reached */
+ private transient int consecutiveFailures = 0;
+
public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
@@ -122,6 +125,7 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
/** This method is executed in a single coordinator thread. */
private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
if (error == null) {
+ consecutiveFailures = 0;
if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
// Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O
// thread pool. E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit
@@ -161,7 +165,13 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
LOG.info("Update enumerator position to {}", result.toPosition());
}
} else {
- LOG.error("Failed to discover new splits", error);
+ consecutiveFailures++;
+ if (scanContext.maxAllowedPlanningFailures() < 0
+ || consecutiveFailures <= scanContext.maxAllowedPlanningFailures()) {
+ LOG.error("Failed to discover new splits", error);
+ } else {
+ throw new RuntimeException("Failed to discover new splits", error);
+ }
}
}
}
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
index 7575beed6e..ebc92df023 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -32,16 +32,23 @@ class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
// track splits per snapshot
private final NavigableMap<Long, List<IcebergSourceSplit>> splits;
private long latestSnapshotId;
+ private int remainingFailures;
- ManualContinuousSplitPlanner(ScanContext scanContext) {
+ ManualContinuousSplitPlanner(ScanContext scanContext, int expectedFailures) {
this.maxPlanningSnapshotCount = scanContext.maxPlanningSnapshotCount();
this.splits = new TreeMap<>();
this.latestSnapshotId = 0L;
+ this.remainingFailures = expectedFailures;
}
@Override
public synchronized ContinuousEnumerationResult planSplits(
IcebergEnumeratorPosition lastPosition) {
+ if (remainingFailures > 0) {
+ remainingFailures--;
+ throw new RuntimeException("Expected failure at planning");
+ }
+
long fromSnapshotIdExclusive = 0;
if (lastPosition != null && lastPosition.snapshotId() != null) {
fromSnapshotIdExclusive = lastPosition.snapshotId();
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
index a051a4de0f..d0ae8fdf77 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -51,7 +51,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -81,7 +81,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -110,7 +110,7 @@ public class TestContinuousIcebergEnumerator {
.streaming(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -163,7 +163,7 @@ public class TestContinuousIcebergEnumerator {
// discover one snapshot at a time
.maxPlanningSnapshotCount(1)
.build();
- ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext);
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 0);
ContinuousIcebergEnumerator enumerator =
createEnumerator(enumeratorContext, scanContext, splitPlanner);
@@ -227,6 +227,113 @@ public class TestContinuousIcebergEnumerator {
splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
}
+ @Test
+ public void testTransientPlanningErrorsWithSuccessfulRetry() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(2)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits);
+
+ // Trigger a planning and check that no splits returned due to the planning error
+ enumeratorContext.triggerAllActions();
+ Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+ // Second scan planning should succeed and discover the expected splits
+ enumeratorContext.triggerAllActions();
+ Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(3).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+ }
+
+ @Test
+ public void testOverMaxAllowedPlanningErrors() throws Exception {
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 2);
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits);
+
+ // Check that the scheduler response ignores the current error and continues to run until the
+ // failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertFalse(
+ enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+ // Check that the task has failed with the expected exception after the failure limit is reached
+ enumeratorContext.triggerAllActions();
+ Assert.assertTrue(
+ enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+ Assertions.assertThatThrownBy(
+ () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+ .hasCauseInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Failed to discover new split");
+ }
+
+ @Test
+ public void testPlanningIgnoringErrors() throws Exception {
+ int expectedFailures = 3;
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .maxPlanningSnapshotCount(1)
+ .maxAllowedPlanningFailures(-1)
+ .build();
+ ManualContinuousSplitPlanner splitPlanner =
+ new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+ ContinuousIcebergEnumerator enumerator =
+ createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // Make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits);
+
+ Collection<IcebergSourceSplitState> pendingSplits;
+ // Can not discover the new split with planning failures
+ for (int i = 0; i < expectedFailures; ++i) {
+ enumeratorContext.triggerAllActions();
+ pendingSplits = enumerator.snapshotState(i).pendingSplits();
+ Assert.assertEquals(0, pendingSplits.size());
+ }
+
+ // Discovered the new split after a successful scan planning
+ enumeratorContext.triggerAllActions();
+ pendingSplits = enumerator.snapshotState(expectedFailures + 1).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+ }
+
private static ContinuousIcebergEnumerator createEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> context,
ScanContext scanContext,