You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "pvary (via GitHub)" <gi...@apache.org> on 2023/05/09 17:44:26 UTC

[GitHub] [iceberg] pvary opened a new pull request, #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

pvary opened a new pull request, #7571:
URL: https://github.com/apache/iceberg/pull/7571

   We found that if the current implementation of the IcebergSource faces some downstream error then it silently retries the planning again and again until the error persists. The only effect on the job is that there is no new record emitted - but no other alarms are raised.
   
   This is similar how the Flink FileSource works but this is confusing for the users.
   Also other sources might implement error handling differently, for example Kafka which fails immediately on an error. This is also not desirable for our user base because we expect more resiliency for our jobs.
   
   Based on our discussion with @zhen-wu2 and @gyula-fora, I have created this PR which adds the possibility to retry the failed planning and to configure the number of retries in a few different ways:
   - `IcebergSource.Builder.planRetryNum` - if the source is created from java code
   - Using FlinkConfiguration key `connector.iceberg.plan-retry-num`
   - Through read option `plan-retry-num`
   
   The default value is 3, which means that if the 4th planning is failed then the Flink job is failed.
   If the original behaviour is needed, then the value `-1` should be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1190615363


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java:
##########
@@ -105,4 +105,8 @@ private FlinkReadOptions() {}
   public static final String LIMIT = "limit";
   public static final ConfigOption<Long> LIMIT_OPTION =
       ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+  public static final String PLAN_RETRY_NUM = "plan-retry-num";

Review Comment:
   suggested a diff name i the doc. applies to other variable or method



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java:
##########
@@ -105,4 +105,8 @@ private FlinkReadOptions() {}
   public static final String LIMIT = "limit";
   public static final ConfigOption<Long> LIMIT_OPTION =
       ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+  public static final String PLAN_RETRY_NUM = "plan-retry-num";

Review Comment:
   suggested a diff name i the doc



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -115,7 +116,12 @@ private ContinuousEnumerationResult discoverSplits() {
       return new ContinuousEnumerationResult(
           Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
     } else {
-      return splitPlanner.planSplits(enumeratorPosition.get());
+      AtomicReference<ContinuousEnumerationResult> result = new AtomicReference<>();
+      Tasks.foreach(enumeratorPosition.get())

Review Comment:
   I was thinking about more like max allowed consecutive planning failures (not retries), because planning is scheduled periodical already



##########
docs/flink-configuration.md:
##########
@@ -110,27 +110,28 @@ env.getConfig()
 
 `Read option` has the highest priority, followed by `Flink configuration` and then `Table property`.
 
-| Read option                 | Flink configuration                           | Table property               | Default                          | Description                                                  |
-| --------------------------- | --------------------------------------------- | ---------------------------- | -------------------------------- | ------------------------------------------------------------ |
-| snapshot-id                 | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the specified snapshot-id. |
-| case-sensitive              | connector.iceberg.case-sensitive              | N/A                          | false                            | If true, match column name in a case sensitive way.          |
-| as-of-timestamp             | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
+| Read option                 | Flink configuration                           | Table property               | Default                          | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
                 |
+|-----------------------------|-----------------------------------------------|------------------------------|----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ----------------|
+| snapshot-id                 | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the specified snapshot-id.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
                 |
+| case-sensitive              | connector.iceberg.case-sensitive              | N/A                          | false                            | If true, match column name in a case sensitive way.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
                 |
+| as-of-timestamp             | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
                 |
 | starting-strategy           | connector.iceberg.starting-strategy           | N/A                          | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just fo
 r FIP27 Source. |
-| start-snapshot-timestamp    | N/A                                           | N/A                          | null                             | Start to read data from the most recent snapshot as of the given time in milliseconds. |
-| start-snapshot-id           | N/A                                           | N/A                          | null                             | Start to read data from the specified snapshot-id.           |
-| end-snapshot-id             | N/A                                           | N/A                          | The latest snapshot id           | Specifies the end snapshot.  
-| branch                     | N/A                                            | N/A             | main       | Specifies the branch to read from in batch mode
-| tag                        | N/A                                            | N/A             | null       | Specifies the tag to read from in batch mode
-| start-tag                  | N/A                                            | N/A             | null       | Specifies the starting tag to read from for incremental reads
-| end-tag                    | N/A                                            | N/A             | null       | Specifies the ending tag to to read from for incremental reads                                |
-| split-size                  | connector.iceberg.split-size                  | read.split.target-size       | 128 MB                           | Target size when combining input splits.                     |
-| split-lookback              | connector.iceberg.split-file-open-cost        | read.split.planning-lookback | 10                               | Number of bins to consider when combining input splits.      |
-| split-file-open-cost        | connector.iceberg.split-file-open-cost        | read.split.open-file-cost    | 4MB                              | The estimated cost to open a file, used as a minimum weight when combining splits. |
-| streaming                   | connector.iceberg.streaming                   | N/A                          | false                            | Sets whether the current task runs in streaming or batch mode. |
-| monitor-interval            | connector.iceberg.monitor-interval            | N/A                          | 60s                              | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
-| include-column-stats        | connector.iceberg.include-column-stats        | N/A                          | false                            | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
-| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A                          | Integer.MAX_VALUE                | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
-| limit                       | connector.iceberg.limit                       | N/A                          | -1                               | Limited output number of rows.                               |
+| start-snapshot-timestamp    | N/A                                           | N/A                          | null                             | Start to read data from the most recent snapshot as of the given time in milliseconds.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
                 |
+| start-snapshot-id           | N/A                                           | N/A                          | null                             | Start to read data from the specified snapshot-id.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
                 |
+| end-snapshot-id             | N/A                                           | N/A                          | The latest snapshot id           | Specifies the end snapshot.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
                 |
+| branch                      | N/A                                           | N/A                          | main                             | Specifies the branch to read from in batch mode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
                 |
+| tag                         | N/A                                           | N/A                          | null                             | Specifies the tag to read from in batch mode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
                 |
+| start-tag                   | N/A                                           | N/A                          | null                             | Specifies the starting tag to read from for incremental reads                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
                 |
+| end-tag                     | N/A                                           | N/A                          | null                             | Specifies the ending tag to to read from for incremental reads                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
                 |
+| split-size                  | connector.iceberg.split-size                  | read.split.target-size       | 128 MB                           | Target size when combining input splits.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
                 |
+| split-lookback              | connector.iceberg.split-file-open-cost        | read.split.planning-lookback | 10                               | Number of bins to consider when combining input splits.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
                 |
+| split-file-open-cost        | connector.iceberg.split-file-open-cost        | read.split.open-file-cost    | 4MB                              | The estimated cost to open a file, used as a minimum weight when combining splits.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
                 |
+| streaming                   | connector.iceberg.streaming                   | N/A                          | false                            | Sets whether the current task runs in streaming or batch mode.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
                 |
+| monitor-interval            | connector.iceberg.monitor-interval            | N/A                          | 60s                              | Monitor interval to discover splits from new snapshots. Applicable only for streaming read.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
                 |
+| include-column-stats        | connector.iceberg.include-column-stats        | N/A                          | false                            | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                 |
+| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A                          | Integer.MAX_VALUE                | Max number of snapshots limited per split enumeration. Applicable only to streaming read.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
                 |
+| limit                       | connector.iceberg.limit                       | N/A                          | -1                               | Limited output number of rows.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
                 |
+| plan-retry-num              | connector.iceberg.plan-retry-num              | N/A                          | 3                                | Retries before job failure when there is an error during continuous planning. Set to -1 to not fail on error.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
                 |

Review Comment:
   `max-allowed-consecutive-planning-failures`?  I know it is a bit long, but probably more clear
   
   Regarding the description, `retries` is not accurate. how about the following?
   ```
   Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1189462306


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -458,7 +468,12 @@ public IcebergSource<T> build() {
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader,
+          context,
+          readerFunction,
+          splitAssignerFactory,
+          flinkReadConf.planRetryNum(),

Review Comment:
   By the way, I once thought about deleting `ScanContext`, because we can use `flinkReadConf` to uniformly parse these parameters, but it was not achieved due to serialization issues. Maybe we can reconsider simplifying this part of the logic, removing `ScanContext`. Of course, this is not what this PR needs to pay attention to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1192912637


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -115,7 +116,12 @@ private ContinuousEnumerationResult discoverSplits() {
       return new ContinuousEnumerationResult(
           Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
     } else {
-      return splitPlanner.planSplits(enumeratorPosition.get());
+      AtomicReference<ContinuousEnumerationResult> result = new AtomicReference<>();
+      Tasks.foreach(enumeratorPosition.get())

Review Comment:
   @stevenzwu: Changed the code. This is what you had in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #7571:
URL: https://github.com/apache/iceberg/pull/7571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1189300663


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -458,7 +468,12 @@ public IcebergSource<T> build() {
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader,
+          context,
+          readerFunction,
+          splitAssignerFactory,
+          flinkReadConf.planRetryNum(),

Review Comment:
   We use `ScanContext` to pass parameters, but this breaks it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194832600


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningFailures() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Can not discover the new split with planning failures
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(2).pendingSplits();

Review Comment:
   Checked the checkpoint ids and changed them - not really relevant for the tests, but still a valid point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#issuecomment-1550610451

   thanks @pvary for the contribution and @hililiwei for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1193435710


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1193433236


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,14 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() == -1
+          || consecutiveFailures < scanContext.maxAllowedPlanningFailures()) {
+        // To have an option for the original behavior - unlimited retries without job failure

Review Comment:
   Removed the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1189809142


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -426,7 +432,8 @@ public IcebergSource<T> build() {
         }
       }
 
-      contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+      FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig);
+      contextBuilder.resolveConfig(flinkReadConf);

Review Comment:
   Correct!
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1190871072


##########
docs/flink-configuration.md:
##########
@@ -110,27 +110,28 @@ env.getConfig()
 
 `Read option` has the highest priority, followed by `Flink configuration` and then `Table property`.
 
-| Read option                 | Flink configuration                           | Table property               | Default                          | Description                                                  |
-| --------------------------- | --------------------------------------------- | ---------------------------- | -------------------------------- | ------------------------------------------------------------ |
-| snapshot-id                 | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the specified snapshot-id. |
-| case-sensitive              | connector.iceberg.case-sensitive              | N/A                          | false                            | If true, match column name in a case sensitive way.          |
-| as-of-timestamp             | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
+| Read option                 | Flink configuration                           | Table property               | Default                          | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
                 |
+|-----------------------------|-----------------------------------------------|------------------------------|----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ----------------|
+| snapshot-id                 | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the specified snapshot-id.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
                 |
+| case-sensitive              | connector.iceberg.case-sensitive              | N/A                          | false                            | If true, match column name in a case sensitive way.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
                 |
+| as-of-timestamp             | N/A                                           | N/A                          | null                             | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
                 |
 | starting-strategy           | connector.iceberg.starting-strategy           | N/A                          | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just fo
 r FIP27 Source. |
-| start-snapshot-timestamp    | N/A                                           | N/A                          | null                             | Start to read data from the most recent snapshot as of the given time in milliseconds. |
-| start-snapshot-id           | N/A                                           | N/A                          | null                             | Start to read data from the specified snapshot-id.           |
-| end-snapshot-id             | N/A                                           | N/A                          | The latest snapshot id           | Specifies the end snapshot.  
-| branch                     | N/A                                            | N/A             | main       | Specifies the branch to read from in batch mode
-| tag                        | N/A                                            | N/A             | null       | Specifies the tag to read from in batch mode
-| start-tag                  | N/A                                            | N/A             | null       | Specifies the starting tag to read from for incremental reads
-| end-tag                    | N/A                                            | N/A             | null       | Specifies the ending tag to to read from for incremental reads                                |
-| split-size                  | connector.iceberg.split-size                  | read.split.target-size       | 128 MB                           | Target size when combining input splits.                     |
-| split-lookback              | connector.iceberg.split-file-open-cost        | read.split.planning-lookback | 10                               | Number of bins to consider when combining input splits.      |
-| split-file-open-cost        | connector.iceberg.split-file-open-cost        | read.split.open-file-cost    | 4MB                              | The estimated cost to open a file, used as a minimum weight when combining splits. |
-| streaming                   | connector.iceberg.streaming                   | N/A                          | false                            | Sets whether the current task runs in streaming or batch mode. |
-| monitor-interval            | connector.iceberg.monitor-interval            | N/A                          | 60s                              | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
-| include-column-stats        | connector.iceberg.include-column-stats        | N/A                          | false                            | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
-| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A                          | Integer.MAX_VALUE                | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
-| limit                       | connector.iceberg.limit                       | N/A                          | -1                               | Limited output number of rows.                               |
+| start-snapshot-timestamp    | N/A                                           | N/A                          | null                             | Start to read data from the most recent snapshot as of the given time in milliseconds.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
                 |
+| start-snapshot-id           | N/A                                           | N/A                          | null                             | Start to read data from the specified snapshot-id.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
                 |
+| end-snapshot-id             | N/A                                           | N/A                          | The latest snapshot id           | Specifies the end snapshot.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
                 |
+| branch                      | N/A                                           | N/A                          | main                             | Specifies the branch to read from in batch mode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
                 |
+| tag                         | N/A                                           | N/A                          | null                             | Specifies the tag to read from in batch mode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
                 |
+| start-tag                   | N/A                                           | N/A                          | null                             | Specifies the starting tag to read from for incremental reads                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
                 |
+| end-tag                     | N/A                                           | N/A                          | null                             | Specifies the ending tag to to read from for incremental reads                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
                 |
+| split-size                  | connector.iceberg.split-size                  | read.split.target-size       | 128 MB                           | Target size when combining input splits.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
                 |
+| split-lookback              | connector.iceberg.split-file-open-cost        | read.split.planning-lookback | 10                               | Number of bins to consider when combining input splits.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
                 |
+| split-file-open-cost        | connector.iceberg.split-file-open-cost        | read.split.open-file-cost    | 4MB                              | The estimated cost to open a file, used as a minimum weight when combining splits.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
                 |
+| streaming                   | connector.iceberg.streaming                   | N/A                          | false                            | Sets whether the current task runs in streaming or batch mode.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
                 |
+| monitor-interval            | connector.iceberg.monitor-interval            | N/A                          | 60s                              | Monitor interval to discover splits from new snapshots. Applicable only for streaming read.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
                 |
+| include-column-stats        | connector.iceberg.include-column-stats        | N/A                          | false                            | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  
                 |
+| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A                          | Integer.MAX_VALUE                | Max number of snapshots limited per split enumeration. Applicable only to streaming read.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
                 |
+| limit                       | connector.iceberg.limit                       | N/A                          | -1                               | Limited output number of rows.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
                 |
+| plan-retry-num              | connector.iceberg.plan-retry-num              | N/A                          | 3                                | Retries before job failure when there is an error during continuous planning. Set to -1 to not fail on error.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
                 |

Review Comment:
   After some back and forth, I have decided to go ahead with the `max-allowed-planning-failures`.
   I agree with you that your name is better, but decided to keep the continuous only in the description to decrease the config and the variable name lengths



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java:
##########
@@ -105,4 +105,8 @@ private FlinkReadOptions() {}
   public static final String LIMIT = "limit";
   public static final ConfigOption<Long> LIMIT_OPTION =
       ConfigOptions.key(PREFIX + LIMIT).longType().defaultValue(-1L);
+
+  public static final String PLAN_RETRY_NUM = "plan-retry-num";

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1192754161


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {

Review Comment:
   nit: method name should be adjusted? like `testTransientPlanningFailure`.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();

Review Comment:
   nit: add comment line to explain the expected behavior. 
   
   maybe also add an assertion with `getAllScheduledTasks` similar to the block below?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,14 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() == -1
+          || consecutiveFailures < scanContext.maxAllowedPlanningFailures()) {
+        // To have an option for the original behavior - unlimited retries without job failure

Review Comment:
   nit: this comment probably needs to be adjusted



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,14 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() == -1

Review Comment:
   nit: I know we have Preconditions check of `>= -1`. I am still wondering if the check here could be just `< 0`?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();

Review Comment:
   move it after the comment line at 252 to separate the two blocks.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();

Review Comment:
   nit: move this line after the comment line 282 that explains the expected behavior.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();

Review Comment:
   we should add assertion here that no splits are discovered due to the expected planning failure. add some comment to explain the expected behavior too



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {

Review Comment:
   nit: method name needs adjustment. like `testOverMaxAllowedPlanningFailures`?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that the task has failed with the expected exception
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Until we have errors, we do not have result
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(2).pendingSplits();
+      Assert.assertEquals(0, pendingSplits.size());
+    }
+
+    // After the errors are fixed we can continue

Review Comment:
   nit: `Discovered the new split after a successful scan planning`



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that the task has failed with the expected exception
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Until we have errors, we do not have result

Review Comment:
   nit: Iceberg style doesn't use words like `we` in the comment. Maybe `Can not discover the new split with planning failures`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#issuecomment-1550971523

   Thanks @hililiwei  and @stevenzwu  for the throughout review and the merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1189364519


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -458,7 +468,12 @@ public IcebergSource<T> build() {
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader,
+          context,
+          readerFunction,
+          splitAssignerFactory,
+          flinkReadConf.planRetryNum(),

Review Comment:
   Thanks for the review @hililiwei!
   
   What I have seen is that all of the `ScanContext` configurations are passed down to the Iceberg scan. This one is a Flink only configuration and we do not have to pass that down there. With that in mind, do you still think that `planRetryNum` belongs there? No strong opinions here, just feels a bit strange.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1189454683


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -458,7 +468,12 @@ public IcebergSource<T> build() {
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader,
+          context,
+          readerFunction,
+          splitAssignerFactory,
+          flinkReadConf.planRetryNum(),

Review Comment:
   But this is not entirely true. For example, `monitorInterval`, which is used to configure the monitoring interval, and `exposeLocality`, `planParallelism`, which are only for Flink.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1192912637


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -115,7 +116,12 @@ private ContinuousEnumerationResult discoverSplits() {
       return new ContinuousEnumerationResult(
           Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
     } else {
-      return splitPlanner.planSplits(enumeratorPosition.get());
+      AtomicReference<ContinuousEnumerationResult> result = new AtomicReference<>();
+      Tasks.foreach(enumeratorPosition.get())

Review Comment:
   Changed the code. This is what you had in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1192497994


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +167,12 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      if (scanContext.maxAllowedPlanningFailures() == -1) {

Review Comment:
   @hililiwei that is correct. similar to Flink's tolerable checkpoint failures.
   
   https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194831305


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,13 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() < 0
+          || consecutiveFailures < scanContext.maxAllowedPlanningFailures()) {

Review Comment:
   Changed this and the corresponding tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1191970434


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +167,12 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      if (scanContext.maxAllowedPlanningFailures() == -1) {

Review Comment:
   @stevenzwu You mean we don’t retry in `discoverSplits()`, but judge the number of errors here?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194199897


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the expected splits

Review Comment:
   nit: avoid `we` in the comment. maybe like `Second scan planning should succeed and discover the expected splits`?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,13 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() < 0
+          || consecutiveFailures < scanContext.maxAllowedPlanningFailures()) {

Review Comment:
   nit: should be `<=`. if maxAllowedPlanningFailures is 3, we should allow 3 consecutive failures.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningFailures() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {

Review Comment:
   nit: this method name is not informative. what does `original` refer to? also how is this test different with `testTransientPlanningFailure`?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningFailures() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Can not discover the new split with planning failures
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(2).pendingSplits();

Review Comment:
   nit: change the checkpoint id to `i`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1190624918


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -115,7 +116,12 @@ private ContinuousEnumerationResult discoverSplits() {
       return new ContinuousEnumerationResult(
           Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
     } else {
-      return splitPlanner.planSplits(enumeratorPosition.get());
+      AtomicReference<ContinuousEnumerationResult> result = new AtomicReference<>();
+      Tasks.foreach(enumeratorPosition.get())

Review Comment:
   I was thinking about more like max allowed consecutive planning failures (not retries), because planning is scheduled periodical already. later plannings are like retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1191304024


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -115,7 +116,12 @@ private ContinuousEnumerationResult discoverSplits() {
       return new ContinuousEnumerationResult(
           Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
     } else {
-      return splitPlanner.planSplits(enumeratorPosition.get());
+      AtomicReference<ContinuousEnumerationResult> result = new AtomicReference<>();
+      Tasks.foreach(enumeratorPosition.get())

Review Comment:
   I was thinking about sth in the same line as Flink's tolerable checkpoint failures.
   
   https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints
   
   We don't do any retry here.
   
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +167,12 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      if (scanContext.maxAllowedPlanningFailures() == -1) {

Review Comment:
   Here, we check the consecutive failures. if it is over the threshold, exception is throw. otherwise, just log an error and move on.
   
   if it is a successful planning, the consecutive failure counter is reset to 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1193433566


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -161,7 +165,14 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
         LOG.info("Update enumerator position to {}", result.toPosition());
       }
     } else {
-      LOG.error("Failed to discover new splits", error);
+      consecutiveFailures++;
+      if (scanContext.maxAllowedPlanningFailures() == -1

Review Comment:
   Done



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194833279


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the expected splits
+    enumeratorContext.triggerAllActions();
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testOverMaxAllowedPlanningFailures() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Check that the scheduler response ignores the current error and continues to run until the
+    // failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertFalse(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+
+    // Check that the task has failed with the expected exception after the failure limit is reached
+    enumeratorContext.triggerAllActions();
+    Assert.assertTrue(
+        enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).isDone());
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {

Review Comment:
   This is testing that we are ignoring failures (`-1`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1194831727


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,113 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testTransientPlanningFailure() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // Make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    // Trigger a planning and check that we did not get splits due to the planning error
+    enumeratorContext.triggerAllActions();
+    Assert.assertEquals(0, enumerator.snapshotState(2).pendingSplits().size());
+
+    // Trigger the planning again to recover from the failure, and we get the expected splits

Review Comment:
   Done.
   Found another `we` and fixed it as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1193436087


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that the task has failed with the expected exception
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Until we have errors, we do not have result

Review Comment:
   Done



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that we got the expected splits
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testRetryLimitReached() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 3);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();
+
+    // Check that the task has failed with the expected exception
+    Assertions.assertThatThrownBy(
+            () -> enumeratorContext.getExecutorService().getAllScheduledTasks().get(0).get())
+        .hasCauseInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Failed to discover new split");
+  }
+
+  @Test
+  public void testOriginalRetry() throws Exception {
+    int expectedFailures = 3;
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(-1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner =
+        new ManualContinuousSplitPlanner(scanContext, expectedFailures);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+
+    Collection<IcebergSourceSplitState> pendingSplits;
+    // Until we have errors, we do not have result
+    for (int i = 0; i < expectedFailures; ++i) {
+      enumeratorContext.triggerAllActions();
+      pendingSplits = enumerator.snapshotState(2).pendingSplits();
+      Assert.assertEquals(0, pendingSplits.size());
+    }
+
+    // After the errors are fixed we can continue

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1193435338


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();

Review Comment:
   Added comments to the sections below



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -227,6 +227,104 @@ public void testThrottlingDiscovery() throws Exception {
         splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
   }
 
+  @Test
+  public void testRetryAndFinish() throws Exception {
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            .maxPlanningSnapshotCount(1)
+            .maxAllowedPlanningFailures(2)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext, 1);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits);
+    enumeratorContext.triggerAllActions();
+    enumeratorContext.triggerAllActions();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1190872525


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -115,7 +116,12 @@ private ContinuousEnumerationResult discoverSplits() {
       return new ContinuousEnumerationResult(
           Collections.emptyList(), enumeratorPosition.get(), enumeratorPosition.get());
     } else {
-      return splitPlanner.planSplits(enumeratorPosition.get());
+      AtomicReference<ContinuousEnumerationResult> result = new AtomicReference<>();
+      Tasks.foreach(enumeratorPosition.get())

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1189602791


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -458,7 +468,12 @@ public IcebergSource<T> build() {
       checkRequired();
       // Since builder already load the table, pass it to the source to avoid double loading
       return new IcebergSource<T>(
-          tableLoader, context, readerFunction, splitAssignerFactory, table);
+          tableLoader,
+          context,
+          readerFunction,
+          splitAssignerFactory,
+          flinkReadConf.planRetryNum(),

Review Comment:
   Thank @hililiwei! Makes sense. Added the config to the `ScanContext`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7571: Flink: Add retry limit for IcebergSource continuous split planning errors

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7571:
URL: https://github.com/apache/iceberg/pull/7571#discussion_r1189669699


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java:
##########
@@ -426,7 +432,8 @@ public IcebergSource<T> build() {
         }
       }
 
-      contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+      FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig);
+      contextBuilder.resolveConfig(flinkReadConf);

Review Comment:
   Since we have changed to `ScanContext`, the modification here seems meaningless, what do you think?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org