You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/07 00:20:06 UTC

[iceberg] branch master updated: Flink 1.15: Port PR #4329 to add FLIP-27 enumerator classes (#4979)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6c6ccec Flink 1.15: Port PR #4329 to add FLIP-27 enumerator classes (#4979)
5d6c6ccec is described below

commit 5d6c6ccecc43f9d9d2348fddbde45b747016d643
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Jun 6 17:20:01 2022 -0700

    Flink 1.15: Port PR #4329 to add FLIP-27 enumerator classes (#4979)
---
 .../iceberg/flink/source/FlinkSplitPlanner.java    |  79 +++-
 .../apache/iceberg/flink/source/ScanContext.java   | 149 ++++---
 .../flink/source/StreamingStartingStrategy.java    |  58 +++
 .../enumerator/ContinuousEnumerationResult.java    |  58 +++
 .../source/enumerator/ContinuousSplitPlanner.java  |  35 ++
 .../enumerator/ContinuousSplitPlannerImpl.java     | 189 +++++++++
 .../enumerator/IcebergEnumeratorPosition.java      |  82 ++++
 .../apache/iceberg/flink/HadoopTableResource.java  |  93 +++++
 .../apache/iceberg/flink/source/TestFlinkScan.java |   6 +-
 .../enumerator/ManualContinuousSplitPlanner.java   |  57 +++
 .../enumerator/TestContinuousSplitPlannerImpl.java | 463 +++++++++++++++++++++
 ...estContinuousSplitPlannerImplStartStrategy.java | 187 +++++++++
 12 files changed, 1383 insertions(+), 73 deletions(-)

diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index a2df652e8..2e5024324 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import org.apache.flink.annotation.Internal;
 import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.Scan;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
@@ -76,50 +78,81 @@ public class FlinkSplitPlanner {
   }
 
   static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context, ExecutorService workerPool) {
-    TableScan scan = table
-        .newScan()
-        .caseSensitive(context.caseSensitive())
-        .project(context.project())
-        .planWith(workerPool);
+    ScanMode scanMode = checkScanMode(context);
+    if (scanMode == ScanMode.INCREMENTAL_APPEND_SCAN) {
+      IncrementalAppendScan scan = table.newIncrementalAppendScan();
+      scan = refineScanWithBaseConfigs(scan, context, workerPool);
 
-    if (context.includeColumnStats()) {
-      scan = scan.includeColumnStats();
-    }
+      if (context.startSnapshotId() != null) {
+        scan = scan.fromSnapshotExclusive(context.startSnapshotId());
+      }
+
+      if (context.endSnapshotId() != null) {
+        scan = scan.toSnapshot(context.endSnapshotId());
+      }
+
+      return scan.planTasks();
+    } else {
+      TableScan scan = table.newScan();
+      scan = refineScanWithBaseConfigs(scan, context, workerPool);
+
+      if (context.snapshotId() != null) {
+        scan = scan.useSnapshot(context.snapshotId());
+      }
+
+      if (context.asOfTimestamp() != null) {
+        scan = scan.asOfTime(context.asOfTimestamp());
+      }
 
-    if (context.snapshotId() != null) {
-      scan = scan.useSnapshot(context.snapshotId());
+      return scan.planTasks();
     }
+  }
+
+  private enum ScanMode {
+    BATCH,
+    INCREMENTAL_APPEND_SCAN
+  }
 
-    if (context.asOfTimestamp() != null) {
-      scan = scan.asOfTime(context.asOfTimestamp());
+  private static ScanMode checkScanMode(ScanContext context) {
+    if (context.isStreaming() || context.startSnapshotId() != null || context.endSnapshotId() != null) {
+      return ScanMode.INCREMENTAL_APPEND_SCAN;
+    } else {
+      return ScanMode.BATCH;
     }
+  }
 
-    if (context.startSnapshotId() != null) {
-      if (context.endSnapshotId() != null) {
-        scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
-      } else {
-        scan = scan.appendsAfter(context.startSnapshotId());
-      }
+  /**
+   * refine scan with common configs
+   */
+  private static <T extends Scan<T>> T refineScanWithBaseConfigs(
+      T scan, ScanContext context, ExecutorService workerPool) {
+    T refinedScan = scan
+        .caseSensitive(context.caseSensitive())
+        .project(context.project())
+        .planWith(workerPool);
+
+    if (context.includeColumnStats()) {
+      refinedScan = refinedScan.includeColumnStats();
     }
 
     if (context.splitSize() != null) {
-      scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
+      refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
     }
 
     if (context.splitLookback() != null) {
-      scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
+      refinedScan = refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
     }
 
     if (context.splitOpenFileCost() != null) {
-      scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
+      refinedScan = refinedScan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
     }
 
     if (context.filters() != null) {
       for (Expression filter : context.filters()) {
-        scan = scan.filter(filter);
+        refinedScan = refinedScan.filter(filter);
       }
     }
 
-    return scan.planTasks();
+    return refinedScan;
   }
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 806006dd1..84c7652ef 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -23,9 +23,11 @@ import java.io.Serializable;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
@@ -35,7 +37,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
 /**
  * Context object with optional arguments for a Flink Scan.
  */
-class ScanContext implements Serializable {
+@Internal
+public class ScanContext implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -48,6 +51,13 @@ class ScanContext implements Serializable {
   private static final ConfigOption<Long> AS_OF_TIMESTAMP =
       ConfigOptions.key("as-of-timestamp").longType().defaultValue(null);
 
+  private static final ConfigOption<StreamingStartingStrategy> STARTING_STRATEGY =
+      ConfigOptions.key("starting-strategy").enumType(StreamingStartingStrategy.class)
+          .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT);
+
+  private static final ConfigOption<Long> START_SNAPSHOT_TIMESTAMP =
+      ConfigOptions.key("start-snapshot-timestamp").longType().defaultValue(null);
+
   private static final ConfigOption<Long> START_SNAPSHOT_ID =
       ConfigOptions.key("start-snapshot-id").longType().defaultValue(null);
 
@@ -75,7 +85,9 @@ class ScanContext implements Serializable {
   private final boolean caseSensitive;
   private final boolean exposeLocality;
   private final Long snapshotId;
+  private final StreamingStartingStrategy startingStrategy;
   private final Long startSnapshotId;
+  private final Long startSnapshotTimestamp;
   private final Long endSnapshotId;
   private final Long asOfTimestamp;
   private final Long splitSize;
@@ -91,13 +103,15 @@ class ScanContext implements Serializable {
   private final boolean includeColumnStats;
   private final Integer planParallelism;
 
-  private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
-                      Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
-                      boolean isStreaming, Duration monitorInterval, String nameMapping, Schema schema,
-                      List<Expression> filters, long limit, boolean includeColumnStats, boolean exposeLocality,
-                      Integer planParallelism) {
+  private ScanContext(boolean caseSensitive, Long snapshotId, StreamingStartingStrategy startingStrategy,
+                      Long startSnapshotTimestamp, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp,
+                      Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming,
+                      Duration monitorInterval, String nameMapping, Schema schema, List<Expression> filters,
+                      long limit, boolean includeColumnStats, boolean exposeLocality, Integer planParallelism) {
     this.caseSensitive = caseSensitive;
     this.snapshotId = snapshotId;
+    this.startingStrategy = startingStrategy;
+    this.startSnapshotTimestamp = startSnapshotTimestamp;
     this.startSnapshotId = startSnapshotId;
     this.endSnapshotId = endSnapshotId;
     this.asOfTimestamp = asOfTimestamp;
@@ -114,77 +128,104 @@ class ScanContext implements Serializable {
     this.includeColumnStats = includeColumnStats;
     this.exposeLocality = exposeLocality;
     this.planParallelism = planParallelism;
+
+    validate();
+  }
+
+  private void validate() {
+    if (isStreaming) {
+      if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) {
+        Preconditions.checkArgument(startSnapshotId != null,
+            "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null");
+        Preconditions.checkArgument(startSnapshotTimestamp == null,
+            "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
+      }
+      if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) {
+        Preconditions.checkArgument(startSnapshotTimestamp != null,
+            "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null");
+        Preconditions.checkArgument(startSnapshotId == null,
+            "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
+      }
+    }
   }
 
-  boolean caseSensitive() {
+  public boolean caseSensitive() {
     return caseSensitive;
   }
 
-  Long snapshotId() {
+  public Long snapshotId() {
     return snapshotId;
   }
 
-  Long startSnapshotId() {
+  public StreamingStartingStrategy startingStrategy() {
+    return startingStrategy;
+  }
+
+  public Long startSnapshotTimestamp() {
+    return startSnapshotTimestamp;
+  }
+
+  public Long startSnapshotId() {
     return startSnapshotId;
   }
 
-  Long endSnapshotId() {
+  public Long endSnapshotId() {
     return endSnapshotId;
   }
 
-  Long asOfTimestamp() {
+  public Long asOfTimestamp() {
     return asOfTimestamp;
   }
 
-  Long splitSize() {
+  public Long splitSize() {
     return splitSize;
   }
 
-  Integer splitLookback() {
+  public Integer splitLookback() {
     return splitLookback;
   }
 
-  Long splitOpenFileCost() {
+  public Long splitOpenFileCost() {
     return splitOpenFileCost;
   }
 
-  boolean isStreaming() {
+  public boolean isStreaming() {
     return isStreaming;
   }
 
-  Duration monitorInterval() {
+  public Duration monitorInterval() {
     return monitorInterval;
   }
 
-  String nameMapping() {
+  public String nameMapping() {
     return nameMapping;
   }
 
-  Schema project() {
+  public Schema project() {
     return schema;
   }
 
-  List<Expression> filters() {
+  public List<Expression> filters() {
     return filters;
   }
 
-  long limit() {
+  public long limit() {
     return limit;
   }
 
-  boolean includeColumnStats() {
+  public boolean includeColumnStats() {
     return includeColumnStats;
   }
 
-  boolean exposeLocality() {
+  public boolean exposeLocality() {
     return exposeLocality;
   }
 
-  Integer planParallelism() {
+  public Integer planParallelism() {
     return planParallelism;
   }
 
-  ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
+  public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
         .useSnapshotId(null)
@@ -206,7 +247,7 @@ class ScanContext implements Serializable {
         .build();
   }
 
-  ScanContext copyWithSnapshotId(long newSnapshotId) {
+  public ScanContext copyWithSnapshotId(long newSnapshotId) {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
         .useSnapshotId(newSnapshotId)
@@ -228,13 +269,15 @@ class ScanContext implements Serializable {
         .build();
   }
 
-  static Builder builder() {
+  public static Builder builder() {
     return new Builder();
   }
 
-  static class Builder {
+  public static class Builder {
     private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
     private Long snapshotId = SNAPSHOT_ID.defaultValue();
+    private StreamingStartingStrategy startingStrategy = STARTING_STRATEGY.defaultValue();
+    private Long startSnapshotTimestamp = START_SNAPSHOT_TIMESTAMP.defaultValue();
     private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
     private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
     private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
@@ -254,98 +297,110 @@ class ScanContext implements Serializable {
     private Builder() {
     }
 
-    Builder caseSensitive(boolean newCaseSensitive) {
+    public Builder caseSensitive(boolean newCaseSensitive) {
       this.caseSensitive = newCaseSensitive;
       return this;
     }
 
-    Builder useSnapshotId(Long newSnapshotId) {
+    public Builder useSnapshotId(Long newSnapshotId) {
       this.snapshotId = newSnapshotId;
       return this;
     }
 
-    Builder startSnapshotId(Long newStartSnapshotId) {
+    public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) {
+      this.startingStrategy = newStartingStrategy;
+      return this;
+    }
+
+    public Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
+      this.startSnapshotTimestamp = newStartSnapshotTimestamp;
+      return this;
+    }
+
+    public Builder startSnapshotId(Long newStartSnapshotId) {
       this.startSnapshotId = newStartSnapshotId;
       return this;
     }
 
-    Builder endSnapshotId(Long newEndSnapshotId) {
+    public Builder endSnapshotId(Long newEndSnapshotId) {
       this.endSnapshotId = newEndSnapshotId;
       return this;
     }
 
-    Builder asOfTimestamp(Long newAsOfTimestamp) {
+    public Builder asOfTimestamp(Long newAsOfTimestamp) {
       this.asOfTimestamp = newAsOfTimestamp;
       return this;
     }
 
-    Builder splitSize(Long newSplitSize) {
+    public Builder splitSize(Long newSplitSize) {
       this.splitSize = newSplitSize;
       return this;
     }
 
-    Builder splitLookback(Integer newSplitLookback) {
+    public Builder splitLookback(Integer newSplitLookback) {
       this.splitLookback = newSplitLookback;
       return this;
     }
 
-    Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+    public Builder splitOpenFileCost(Long newSplitOpenFileCost) {
       this.splitOpenFileCost = newSplitOpenFileCost;
       return this;
     }
 
-    Builder streaming(boolean streaming) {
+    public Builder streaming(boolean streaming) {
       this.isStreaming = streaming;
       return this;
     }
 
-    Builder monitorInterval(Duration newMonitorInterval) {
+    public Builder monitorInterval(Duration newMonitorInterval) {
       this.monitorInterval = newMonitorInterval;
       return this;
     }
 
-    Builder nameMapping(String newNameMapping) {
+    public Builder nameMapping(String newNameMapping) {
       this.nameMapping = newNameMapping;
       return this;
     }
 
-    Builder project(Schema newProjectedSchema) {
+    public Builder project(Schema newProjectedSchema) {
       this.projectedSchema = newProjectedSchema;
       return this;
     }
 
-    Builder filters(List<Expression> newFilters) {
+    public Builder filters(List<Expression> newFilters) {
       this.filters = newFilters;
       return this;
     }
 
-    Builder limit(long newLimit) {
+    public Builder limit(long newLimit) {
       this.limit = newLimit;
       return this;
     }
 
-    Builder includeColumnStats(boolean newIncludeColumnStats) {
+    public Builder includeColumnStats(boolean newIncludeColumnStats) {
       this.includeColumnStats = newIncludeColumnStats;
       return this;
     }
 
-    Builder exposeLocality(boolean newExposeLocality) {
+    public Builder exposeLocality(boolean newExposeLocality) {
       this.exposeLocality = newExposeLocality;
       return this;
     }
 
-    Builder planParallelism(Integer parallelism) {
+    public Builder planParallelism(Integer parallelism) {
       this.planParallelism = parallelism;
       return this;
     }
 
-    Builder fromProperties(Map<String, String> properties) {
+    public Builder fromProperties(Map<String, String> properties) {
       Configuration config = new Configuration();
       properties.forEach(config::setString);
 
       return this.useSnapshotId(config.get(SNAPSHOT_ID))
           .caseSensitive(config.get(CASE_SENSITIVE))
           .asOfTimestamp(config.get(AS_OF_TIMESTAMP))
+          .startingStrategy(config.get(STARTING_STRATEGY))
+          .startSnapshotTimestamp(config.get(START_SNAPSHOT_TIMESTAMP))
           .startSnapshotId(config.get(START_SNAPSHOT_ID))
           .endSnapshotId(config.get(END_SNAPSHOT_ID))
           .splitSize(config.get(SPLIT_SIZE))
@@ -358,8 +413,8 @@ class ScanContext implements Serializable {
     }
 
     public ScanContext build() {
-      return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
-          endSnapshotId, asOfTimestamp, splitSize, splitLookback,
+      return new ScanContext(caseSensitive, snapshotId, startingStrategy, startSnapshotTimestamp,
+          startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback,
           splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
           filters, limit, includeColumnStats, exposeLocality, planParallelism);
     }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java
new file mode 100644
index 000000000..3e83fbe7f
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+/**
+ * Starting strategy for streaming execution.
+ */
+public enum StreamingStartingStrategy {
+  /**
+   * Do a regular table scan then switch to the incremental mode.
+   * <p>
+   * The incremental mode starts from the current snapshot exclusive.
+   */
+  TABLE_SCAN_THEN_INCREMENTAL,
+
+  /**
+   * Start incremental mode from the latest snapshot inclusive.
+   * <p>
+   * If it is an empty map, all future append snapshots should be discovered.
+   */
+  INCREMENTAL_FROM_LATEST_SNAPSHOT,
+
+  /**
+   * Start incremental mode from the earliest snapshot inclusive.
+   * <p>
+   * If it is an empty map, all future append snapshots should be discovered.
+   */
+  INCREMENTAL_FROM_EARLIEST_SNAPSHOT,
+
+  /**
+   * Start incremental mode from a snapshot with a specific id inclusive.
+   */
+  INCREMENTAL_FROM_SNAPSHOT_ID,
+
+  /**
+   * Start incremental mode from a snapshot with a specific timestamp inclusive.
+   * <p>
+   * If the timestamp is between two snapshots, it should start from the snapshot after the timestamp.
+   */
+  INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java
new file mode 100644
index 000000000..8c20f2cf2
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class ContinuousEnumerationResult {
+  private final Collection<IcebergSourceSplit> splits;
+  private final IcebergEnumeratorPosition fromPosition;
+  private final IcebergEnumeratorPosition toPosition;
+
+  /**
+   * @param splits should never be null. But it can be an empty collection
+   * @param fromPosition can be null
+   * @param toPosition should never be null. But it can have null snapshotId and snapshotTimestampMs
+   */
+  ContinuousEnumerationResult(
+      Collection<IcebergSourceSplit> splits,
+      IcebergEnumeratorPosition fromPosition,
+      IcebergEnumeratorPosition toPosition) {
+    Preconditions.checkArgument(splits != null, "Invalid to splits collection: null");
+    Preconditions.checkArgument(toPosition != null, "Invalid end position: null");
+    this.splits = splits;
+    this.fromPosition = fromPosition;
+    this.toPosition = toPosition;
+  }
+
+  public Collection<IcebergSourceSplit> splits() {
+    return splits;
+  }
+
+  public IcebergEnumeratorPosition fromPosition() {
+    return fromPosition;
+  }
+
+  public IcebergEnumeratorPosition toPosition() {
+    return toPosition;
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java
new file mode 100644
index 000000000..1737ae6a5
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.Closeable;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This interface is introduced so that we can plug in different split planner for unit test
+ */
+@Internal
+public interface ContinuousSplitPlanner extends Closeable {
+
+  /**
+   * Discover the files appended between {@code lastPosition} and current table snapshot
+   */
+  ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition);
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
new file mode 100644
index 000000000..4d6e89684
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.source.FlinkSplitPlanner;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(ContinuousSplitPlannerImpl.class);
+
+  private final Table table;
+  private final ScanContext scanContext;
+  private final boolean isSharedPool;
+  private final ExecutorService workerPool;
+
+  /**
+   * @param threadName thread name prefix for worker pool to run the split planning.
+   *                   If null, a shared worker pool will be used.
+   */
+  public ContinuousSplitPlannerImpl(Table table, ScanContext scanContext, String threadName) {
+    this.table = table;
+    this.scanContext = scanContext;
+    this.isSharedPool = threadName == null;
+    this.workerPool = isSharedPool ? ThreadPools.getWorkerPool()
+        : ThreadPools.newWorkerPool("iceberg-plan-worker-pool-" + threadName, scanContext.planParallelism());
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isSharedPool) {
+      workerPool.shutdown();
+    }
+  }
+
+  @Override
+  public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) {
+    table.refresh();
+    if (lastPosition != null) {
+      return discoverIncrementalSplits(lastPosition);
+    } else {
+      return discoverInitialSplits();
+    }
+  }
+
+  /**
+   * Discover incremental changes between @{code lastPosition} and current table snapshot
+   */
+  private ContinuousEnumerationResult discoverIncrementalSplits(IcebergEnumeratorPosition lastPosition) {
+    Snapshot currentSnapshot = table.currentSnapshot();
+    if (currentSnapshot == null) {
+      // empty table
+      Preconditions.checkArgument(lastPosition.snapshotId() == null,
+          "Invalid last enumerated position for an empty table: not null");
+      LOG.info("Skip incremental scan because table is empty");
+      return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
+    } else if (lastPosition.snapshotId() != null && currentSnapshot.snapshotId() == lastPosition.snapshotId()) {
+      LOG.info("Current table snapshot is already enumerated: {}", currentSnapshot.snapshotId());
+      return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
+    } else {
+      IcebergEnumeratorPosition newPosition = IcebergEnumeratorPosition.of(
+          currentSnapshot.snapshotId(), currentSnapshot.timestampMillis());
+      ScanContext incrementalScan = scanContext
+          .copyWithAppendsBetween(lastPosition.snapshotId(), currentSnapshot.snapshotId());
+      List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, incrementalScan, workerPool);
+      LOG.info("Discovered {} splits from incremental scan: " +
+              "from snapshot (exclusive) is {}, to snapshot (inclusive) is {}",
+          splits.size(), lastPosition, newPosition);
+      return new ContinuousEnumerationResult(splits, lastPosition, newPosition);
+    }
+  }
+
+  /**
+   * Discovery initial set of splits based on {@link StreamingStartingStrategy}.
+   *
+   * <li>{@link ContinuousEnumerationResult#splits()} should contain initial splits
+   * discovered from table scan for {@link StreamingStartingStrategy#TABLE_SCAN_THEN_INCREMENTAL}.
+   * For all other strategies, splits collection should be empty.
+   * <li>{@link ContinuousEnumerationResult#toPosition()} points to the starting position
+   * for the next incremental split discovery with exclusive behavior. Meaning files committed
+   * by the snapshot from the position in {@code ContinuousEnumerationResult} won't be included
+   * in the next incremental scan.
+   */
+  private ContinuousEnumerationResult discoverInitialSplits() {
+    Optional<Snapshot> startSnapshotOptional = startSnapshot(table, scanContext);
+    if (!startSnapshotOptional.isPresent()) {
+      return new ContinuousEnumerationResult(Collections.emptyList(), null,
+          IcebergEnumeratorPosition.empty());
+    }
+
+    Snapshot startSnapshot = startSnapshotOptional.get();
+    LOG.info("Get starting snapshot id {} based on strategy {}",
+        startSnapshot.snapshotId(), scanContext.startingStrategy());
+    List<IcebergSourceSplit> splits;
+    IcebergEnumeratorPosition toPosition;
+    if (scanContext.startingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
+      // do a batch table scan first
+      splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
+      LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}",
+          splits.size(), startSnapshot.snapshotId());
+      // For TABLE_SCAN_THEN_INCREMENTAL, incremental mode starts exclusive from the startSnapshot
+      toPosition = IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
+    } else {
+      // For all other modes, starting snapshot should be consumed inclusively.
+      // Use parentId to achieve the inclusive behavior. It is fine if parentId is null.
+      splits = Collections.emptyList();
+      Long parentSnapshotId = startSnapshot.parentId();
+      if (parentSnapshotId != null) {
+        Snapshot parentSnapshot = table.snapshot(parentSnapshotId);
+        Long parentSnapshotTimestampMs = parentSnapshot != null ? parentSnapshot.timestampMillis() : null;
+        toPosition = IcebergEnumeratorPosition.of(parentSnapshotId, parentSnapshotTimestampMs);
+      } else {
+        toPosition = IcebergEnumeratorPosition.empty();
+      }
+
+      LOG.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}",
+          startSnapshot.snapshotId(), startSnapshot.timestampMillis());
+    }
+
+    return new ContinuousEnumerationResult(splits, null, toPosition);
+  }
+
+  /**
+   * Calculate the starting snapshot based on the {@link StreamingStartingStrategy} defined in {@code ScanContext}.
+   * <p>
+   * If the {@link StreamingStartingStrategy} is not {@link StreamingStartingStrategy#TABLE_SCAN_THEN_INCREMENTAL},
+   * the start snapshot should be consumed inclusively.
+   */
+  @VisibleForTesting
+  static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
+    switch (scanContext.startingStrategy()) {
+      case TABLE_SCAN_THEN_INCREMENTAL:
+      case INCREMENTAL_FROM_LATEST_SNAPSHOT:
+        return Optional.ofNullable(table.currentSnapshot());
+      case INCREMENTAL_FROM_EARLIEST_SNAPSHOT:
+        return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));
+      case INCREMENTAL_FROM_SNAPSHOT_ID:
+        Snapshot matchedSnapshotById = table.snapshot(scanContext.startSnapshotId());
+        Preconditions.checkArgument(matchedSnapshotById != null,
+            "Start snapshot id not found in history: " + scanContext.startSnapshotId());
+        return Optional.of(matchedSnapshotById);
+      case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
+        long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
+        Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
+        if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
+          return Optional.of(matchedSnapshotByTimestamp);
+        } else {
+          // if the snapshotIdAsOfTime has the timestamp value smaller than the scanContext.startSnapshotTimestamp(),
+          // return the child snapshot whose timestamp value is larger
+          return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
+        }
+      default:
+        throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.startingStrategy());
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java
new file mode 100644
index 000000000..e024473da
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+class IcebergEnumeratorPosition {
+  private final Long snapshotId;
+  // Track snapshot timestamp mainly for info logging
+  private final Long snapshotTimestampMs;
+
+  static IcebergEnumeratorPosition empty() {
+    return new IcebergEnumeratorPosition(null, null);
+  }
+
+  static IcebergEnumeratorPosition of(long snapshotId, Long snapshotTimestampMs) {
+    return new IcebergEnumeratorPosition(snapshotId, snapshotTimestampMs);
+  }
+
+  private IcebergEnumeratorPosition(Long snapshotId, Long snapshotTimestampMs) {
+    this.snapshotId = snapshotId;
+    this.snapshotTimestampMs = snapshotTimestampMs;
+  }
+
+  boolean isEmpty() {
+    return snapshotId == null;
+  }
+
+  Long snapshotId() {
+    return snapshotId;
+  }
+
+  Long snapshotTimestampMs() {
+    return snapshotTimestampMs;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("snapshotId", snapshotId)
+        .add("snapshotTimestampMs", snapshotTimestampMs)
+        .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        snapshotId,
+        snapshotTimestampMs);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    IcebergEnumeratorPosition other = (IcebergEnumeratorPosition) o;
+    return Objects.equal(snapshotId, other.snapshotId()) &&
+        Objects.equal(snapshotTimestampMs, other.snapshotTimestampMs());
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java
new file mode 100644
index 000000000..bc4e209a4
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+
+public class HadoopTableResource extends ExternalResource {
+  private final TemporaryFolder temporaryFolder;
+  private final String database;
+  private final String tableName;
+  private final Schema schema;
+  private final PartitionSpec partitionSpec;
+
+  private HadoopCatalog catalog;
+  private TableLoader tableLoader;
+  private Table table;
+
+  public HadoopTableResource(TemporaryFolder temporaryFolder, String database, String tableName, Schema schema) {
+    this(temporaryFolder, database, tableName, schema, null);
+  }
+
+  public HadoopTableResource(TemporaryFolder temporaryFolder, String database, String tableName,
+                             Schema schema, PartitionSpec partitionSpec) {
+    this.temporaryFolder = temporaryFolder;
+    this.database = database;
+    this.tableName = tableName;
+    this.schema = schema;
+    this.partitionSpec = partitionSpec;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    File warehouseFile = temporaryFolder.newFolder();
+    Assert.assertTrue(warehouseFile.delete());
+    // before variables
+    String warehouse = "file:" + warehouseFile;
+    Configuration hadoopConf = new Configuration();
+    this.catalog = new HadoopCatalog(hadoopConf, warehouse);
+    String location = String.format("%s/%s/%s", warehouse, database, tableName);
+    this.tableLoader = TableLoader.fromHadoopTable(location);
+    if (partitionSpec == null) {
+      this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema);
+    } else {
+      this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema, partitionSpec);
+    }
+    tableLoader.open();
+  }
+
+  @Override
+  protected void after() {
+    try {
+      catalog.dropTable(TableIdentifier.of(database, tableName));
+      catalog.close();
+      tableLoader.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to close catalog resource");
+    }
+  }
+
+  public TableLoader tableLoader() {
+    return tableLoader;
+  }
+
+  public Table table() {
+    return table;
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 357f5ab14..9284b8fa9 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -237,15 +237,15 @@ public abstract class TestFlinkScan {
     long snapshotId1 = table.currentSnapshot().snapshotId();
 
     // snapshot 2
-    List<Record> records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    List<Record> records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L);
     helper.appendToTable(records2);
 
-    List<Record> records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    List<Record> records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L);
     helper.appendToTable(records3);
     long snapshotId3 = table.currentSnapshot().snapshotId();
 
     // snapshot 4
-    helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
+    helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L));
 
     List<Record> expected2 = Lists.newArrayList();
     expected2.addAll(records2);
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
new file mode 100644
index 000000000..f1db8ef5d
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
+  private final ArrayDeque<IcebergSourceSplit> splits = new ArrayDeque<>();
+  private IcebergEnumeratorPosition latestPosition;
+
+  @Override
+  public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) {
+    ContinuousEnumerationResult result = new ContinuousEnumerationResult(
+        Lists.newArrayList(splits), lastPosition, latestPosition);
+    return result;
+  }
+
+  /**
+   * Add new splits to the collection
+   */
+  public void addSplits(List<IcebergSourceSplit> newSplits, IcebergEnumeratorPosition newPosition) {
+    splits.addAll(newSplits);
+    this.latestPosition = newPosition;
+  }
+
+  /**
+   * Clear the splits collection
+   */
+  public void clearSplits() {
+    splits.clear();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
new file mode 100644
index 000000000..2bcf2f07d
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.joda.time.format.DateTimeFormat;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+public class TestContinuousSplitPlannerImpl {
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static final FileFormat fileFormat = FileFormat.PARQUET;
+  private static final AtomicLong randomSeed = new AtomicLong();
+
+  @Rule
+  public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+      TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private GenericAppenderHelper dataAppender;
+  private DataFile dataFile1;
+  private Snapshot snapshot1;
+  private DataFile dataFile2;
+  private Snapshot snapshot2;
+
+  @Before
+  public void before() throws IOException {
+    dataAppender = new GenericAppenderHelper(tableResource.table(), fileFormat, TEMPORARY_FOLDER);
+  }
+
+  private void appendTwoSnapshots() throws IOException {
+    // snapshot1
+    List<Record> batch1 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    dataFile1 = dataAppender.writeFile(null, batch1);
+    dataAppender.appendToTable(dataFile1);
+    snapshot1 = tableResource.table().currentSnapshot();
+
+    // snapshot2
+    List<Record> batch2 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 1L);
+    dataFile2 = dataAppender.writeFile(null, batch2);
+    dataAppender.appendToTable(dataFile2);
+    snapshot2 = tableResource.table().currentSnapshot();
+  }
+
+  /**
+   * @return the last enumerated snapshot id
+   */
+  private IcebergEnumeratorPosition verifyOneCycle(
+      ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition) throws Exception {
+    List<Record> batch = RandomGenericData.generate(TestFixtures.SCHEMA, 2, randomSeed.incrementAndGet());
+    DataFile dataFile = dataAppender.writeFile(null, batch);
+    dataAppender.appendToTable(dataFile);
+    Snapshot snapshot = tableResource.table().currentSnapshot();
+
+    ContinuousEnumerationResult result = splitPlanner.planSplits(lastPosition);
+    Assert.assertEquals(lastPosition.snapshotId(), result.fromPosition().snapshotId());
+    Assert.assertEquals(lastPosition.snapshotTimestampMs(), result.fromPosition().snapshotTimestampMs());
+    Assert.assertEquals(snapshot.snapshotId(), result.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot.timestampMillis(), result.toPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(1, result.splits().size());
+    IcebergSourceSplit split = Iterables.getOnlyElement(result.splits());
+    Assert.assertEquals(1, split.task().files().size());
+    Assert.assertEquals(dataFile.path().toString(),
+        Iterables.getOnlyElement(split.task().files()).file().path().toString());
+    return result.toPosition();
+  }
+
+  @Test
+  public void testTableScanThenIncrementalWithEmptyTable() throws Exception {
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
+    Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty());
+    Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition());
+    Assert.assertTrue(emptyTableInitialDiscoveryResult.toPosition().isEmpty());
+    Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs());
+
+    ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner
+        .planSplits(emptyTableInitialDiscoveryResult.toPosition());
+    Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty());
+    Assert.assertTrue(emptyTableSecondDiscoveryResult.fromPosition().isEmpty());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs());
+    Assert.assertTrue(emptyTableSecondDiscoveryResult.toPosition().isEmpty());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs());
+
+    // next 3 snapshots
+    IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+
+  @Test
+  public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    Assert.assertNull(initialResult.fromPosition());
+    Assert.assertEquals(snapshot2.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot2.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(1, initialResult.splits().size());
+    IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
+    Assert.assertEquals(2, split.task().files().size());
+    Set<String> discoveredFiles = split.task().files().stream()
+        .map(fileScanTask -> fileScanTask.file().path().toString())
+        .collect(Collectors.toSet());
+    Set<String> expectedFiles = ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString());
+    Assert.assertEquals(expectedFiles, discoveredFiles);
+
+    IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+
+  @Test
+  public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception {
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+        .splitSize(1L)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
+    Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty());
+    Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition());
+    Assert.assertTrue(emptyTableInitialDiscoveryResult.toPosition().isEmpty());
+    Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs());
+
+    ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner
+        .planSplits(emptyTableInitialDiscoveryResult.toPosition());
+    Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty());
+    Assert.assertTrue(emptyTableSecondDiscoveryResult.fromPosition().isEmpty());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs());
+    Assert.assertTrue(emptyTableSecondDiscoveryResult.toPosition().isEmpty());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs());
+
+    // latest mode should discover both snapshots, as latest position is marked by when job starts
+    appendTwoSnapshots();
+    ContinuousEnumerationResult afterTwoSnapshotsAppended = splitPlanner
+        .planSplits(emptyTableSecondDiscoveryResult.toPosition());
+    Assert.assertEquals(2, afterTwoSnapshotsAppended.splits().size());
+
+    // next 3 snapshots
+    IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+
+  @Test
+  public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    Assert.assertNull(initialResult.fromPosition());
+    // For inclusive behavior, the initial result should point to snapshot1
+    // Then the next incremental scan shall discover files from latest snapshot2 (inclusive)
+    Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(0, initialResult.splits().size());
+
+    ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+    Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+    Assert.assertEquals(1, split.task().files().size());
+    Set<String> discoveredFiles = split.task().files().stream()
+        .map(fileScanTask -> fileScanTask.file().path().toString())
+        .collect(Collectors.toSet());
+    // should discover dataFile2 appended in snapshot2
+    Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
+    Assert.assertEquals(expectedFiles, discoveredFiles);
+
+    IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+
+  @Test
+  public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception {
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
+    Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty());
+    Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition());
+    Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotId());
+    Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs());
+
+    ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner
+        .planSplits(emptyTableInitialDiscoveryResult.toPosition());
+    Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotId());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotId());
+    Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs());
+
+    // next 3 snapshots
+    IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+
+  @Test
+  public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    Assert.assertNull(initialResult.fromPosition());
+    // For inclusive behavior, the initial result should point to snapshot1's parent,
+    // which leads to null snapshotId and snapshotTimestampMs.
+    Assert.assertNull(initialResult.toPosition().snapshotId());
+    Assert.assertNull(initialResult.toPosition().snapshotTimestampMs());
+    Assert.assertEquals(0, initialResult.splits().size());
+
+    ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+    Assert.assertNull(secondResult.fromPosition().snapshotId());
+    Assert.assertNull(secondResult.fromPosition().snapshotTimestampMs());
+    Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+    Assert.assertEquals(2, split.task().files().size());
+    Set<String> discoveredFiles = split.task().files().stream()
+        .map(fileScanTask -> fileScanTask.file().path().toString())
+        .collect(Collectors.toSet());
+    // should discover files appended in both snapshot1 and snapshot2
+    Set<String> expectedFiles = ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString());
+    Assert.assertEquals(expectedFiles, discoveredFiles);
+
+    IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+
+  @Test
+  public void testIncrementalFromSnapshotIdWithEmptyTable() throws Exception {
+    ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+        .startSnapshotId(1L)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+    AssertHelpers.assertThrows("Should detect invalid starting snapshot id",
+        IllegalArgumentException.class,
+        "Start snapshot id not found in history: 1",
+        () -> splitPlanner.planSplits(null));
+  }
+
+  @Test
+  public void testIncrementalFromSnapshotIdWithInvalidIds() throws Exception {
+    appendTwoSnapshots();
+
+    // find an invalid snapshotId
+    long invalidSnapshotId = 0L;
+    while (invalidSnapshotId == snapshot1.snapshotId() || invalidSnapshotId == snapshot2.snapshotId()) {
+      invalidSnapshotId++;
+    }
+
+    ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+        .startSnapshotId(invalidSnapshotId)
+        .build();
+
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+    AssertHelpers.assertThrows("Should detect invalid starting snapshot id",
+        IllegalArgumentException.class,
+        "Start snapshot id not found in history: " + invalidSnapshotId,
+        () -> splitPlanner.planSplits(null));
+  }
+
+  @Test
+  public void testIncrementalFromSnapshotId() throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+        .startSnapshotId(snapshot2.snapshotId())
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    Assert.assertNull(initialResult.fromPosition());
+    // For inclusive behavior of snapshot2, the initial result should point to snapshot1 (as snapshot2's parent)
+    Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(0, initialResult.splits().size());
+
+    ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+    Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+    Assert.assertEquals(1, split.task().files().size());
+    Set<String> discoveredFiles = split.task().files().stream()
+        .map(fileScanTask -> fileScanTask.file().path().toString())
+        .collect(Collectors.toSet());
+    // should  discover dataFile2 appended in snapshot2
+    Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
+    Assert.assertEquals(expectedFiles, discoveredFiles);
+
+    IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+
+  @Test
+  public void testIncrementalFromSnapshotTimestampWithEmptyTable() throws Exception {
+    ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+        .startSnapshotTimestamp(1L)
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+    AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp",
+        IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00.001",
+        () -> splitPlanner.planSplits(null));
+  }
+
+  @Test
+  public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exception {
+    appendTwoSnapshots();
+
+    long invalidSnapshotTimestampMs = snapshot1.timestampMillis() - 1000L;
+    String invalidSnapshotTimestampMsStr = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+        .withZoneUTC()
+        .print(invalidSnapshotTimestampMs);
+
+    ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+        .startSnapshotTimestamp(invalidSnapshotTimestampMs)
+        .build();
+
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+    AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp",
+        IllegalArgumentException.class,
+        "Cannot find a snapshot older than " + invalidSnapshotTimestampMsStr,
+        () -> splitPlanner.planSplits(null));
+  }
+
+  @Test
+  public void testIncrementalFromSnapshotTimestamp() throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext = ScanContext.builder()
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+        .startSnapshotTimestamp(snapshot2.timestampMillis())
+        .build();
+    ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+        tableResource.table(), scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    Assert.assertNull(initialResult.fromPosition());
+    // For inclusive behavior, the initial result should point to snapshot1 (as snapshot2's parent).
+    Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(0, initialResult.splits().size());
+
+    ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+    Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue());
+    Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+    Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+    IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+    Assert.assertEquals(1, split.task().files().size());
+    Set<String> discoveredFiles = split.task().files().stream()
+        .map(fileScanTask -> fileScanTask.file().path().toString())
+        .collect(Collectors.toSet());
+    // should discover dataFile2 appended in snapshot2
+    Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
+    Assert.assertEquals(expectedFiles, discoveredFiles);
+
+    IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+    for (int i = 0; i < 3; ++i) {
+      lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
new file mode 100644
index 000000000..ef5265340
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+
+public class TestContinuousSplitPlannerImplStartStrategy {
+  private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
+
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  public final HadoopTableResource tableResource = new HadoopTableResource(temporaryFolder,
+      TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA);
+  @Rule
+  public final TestRule chain = RuleChain
+      .outerRule(temporaryFolder)
+      .around(tableResource);
+
+  private GenericAppenderHelper dataAppender;
+  private Snapshot snapshot1;
+  private Snapshot snapshot2;
+  private Snapshot snapshot3;
+
+  @Before
+  public void before() throws IOException {
+    dataAppender = new GenericAppenderHelper(tableResource.table(), FILE_FORMAT, temporaryFolder);
+  }
+
+  private void appendThreeSnapshots() throws IOException {
+    List<Record> batch1 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+    dataAppender.appendToTable(batch1);
+    snapshot1 = tableResource.table().currentSnapshot();
+
+    List<Record> batch2 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 1L);
+    dataAppender.appendToTable(batch2);
+    snapshot2 = tableResource.table().currentSnapshot();
+
+    List<Record> batch3 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 2L);
+    dataAppender.appendToTable(batch3);
+    snapshot3 = tableResource.table().currentSnapshot();
+  }
+
+  @Test
+  public void testTableScanThenIncrementalStrategy()  throws IOException {
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+
+    // emtpy table
+    Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
+
+    appendThreeSnapshots();
+    Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+    Assert.assertEquals(snapshot3.snapshotId(), startSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testForLatestSnapshotStrategy() throws IOException {
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+        .build();
+
+    // emtpy table
+    Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
+
+    appendThreeSnapshots();
+    Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+    Assert.assertEquals(snapshot3.snapshotId(), startSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testForEarliestSnapshotStrategy() throws IOException {
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+        .build();
+
+    // emtpy table
+    Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
+
+    appendThreeSnapshots();
+    Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+    Assert.assertEquals(snapshot1.snapshotId(), startSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testForSpecificSnapshotIdStrategy() throws IOException {
+    ScanContext scanContextInvalidSnapshotId = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+        .startSnapshotId(1L)
+        .build();
+
+    // emtpy table
+    AssertHelpers.assertThrows("Should detect invalid starting snapshot id",
+        IllegalArgumentException.class,
+        "Start snapshot id not found in history: 1",
+        () -> ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContextInvalidSnapshotId));
+
+    appendThreeSnapshots();
+
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+        .startSnapshotId(snapshot2.snapshotId())
+        .build();
+
+    Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+    Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testForSpecificSnapshotTimestampStrategySnapshot2() throws IOException {
+    ScanContext scanContextInvalidSnapshotTimestamp = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+        .startSnapshotTimestamp(1L)
+        .build();
+
+    // emtpy table
+    AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp",
+        IllegalArgumentException.class,
+        "Cannot find a snapshot older than 1970-01-01 00:00:00.001",
+        () -> ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContextInvalidSnapshotTimestamp));
+
+    appendThreeSnapshots();
+
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+        .startSnapshotTimestamp(snapshot2.timestampMillis())
+        .build();
+
+    Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+    Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId());
+  }
+
+  @Test
+  public void testForSpecificSnapshotTimestampStrategySnapshot2Minus1() throws IOException {
+    appendThreeSnapshots();
+
+    ScanContext config = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+        .startSnapshotTimestamp(snapshot2.timestampMillis() - 1L)
+        .build();
+
+    Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), config).get();
+    Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId());
+  }
+}