You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/28 04:20:44 UTC

[incubator-paimon] branch master updated: [core][bug] TableScan#plan should not return null (#698)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7032eb869 [core][bug] TableScan#plan should not return null (#698)
7032eb869 is described below

commit 7032eb869d40f3ae242e1a9d1aa7ee61cba5f505
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Mar 28 12:20:39 2023 +0800

    [core][bug] TableScan#plan should not return null (#698)
---
 .../paimon/table/AbstractFileStoreTable.java       |   2 +-
 .../paimon/table/source/AbstractDataTableScan.java |  27 ------
 .../paimon/table/source/BatchDataTableScan.java    |  15 +++
 .../table/source/BatchDataTableScanImpl.java       |  34 ++++++-
 .../apache/paimon/table/source/DataTableScan.java  |  10 +-
 .../paimon/table/source/StreamDataTableScan.java   |  15 +++
 .../table/source/StreamDataTableScanImpl.java      |  47 +++++++--
 .../paimon/table/source/StreamTableScan.java       |   2 +
 .../paimon/table/source/TableStreamingReader.java  |   4 +-
 .../source/snapshot/CompactedStartingScanner.java  |   9 +-
 .../CompactionChangelogFollowUpScanner.java        |   3 +-
 .../ContinuousCompactorFollowUpScanner.java        |   3 +-
 .../ContinuousCompactorStartingScanner.java        |  11 +--
 .../ContinuousFromSnapshotStartingScanner.java     |  12 +--
 .../ContinuousFromTimestampStartingScanner.java    |   9 +-
 .../snapshot/ContinuousLatestStartingScanner.java  |   9 +-
 .../source/snapshot/DeltaFollowUpScanner.java      |   3 +-
 .../table/source/snapshot/FollowUpScanner.java     |   4 +-
 .../table/source/snapshot/FullStartingScanner.java |   9 +-
 .../snapshot/InputChangelogFollowUpScanner.java    |   3 +-
 .../table/source/snapshot/StartingScanner.java     |  32 ++++++-
 .../StaticFromSnapshotStartingScanner.java         |   9 +-
 .../StaticFromTimestampStartingScanner.java        |  11 +--
 .../apache/paimon/table/system/AuditLogTable.java  |  16 ++--
 .../org/apache/paimon/table/system/FilesTable.java |  10 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |   4 +-
 .../table/source/BatchDataTableScanTest.java       |   1 -
 .../table/source/StreamDataTableScanTest.java      |  40 ++++----
 .../snapshot/CompactedStartingScannerTest.java     |  13 ++-
 .../CompactionChangelogFollowUpScannerTest.java    |   8 +-
 .../ContinuousCompactorFollowUpScannerTest.java    |   8 +-
 .../ContinuousCompactorStartingScannerTest.java    |  11 +--
 ...ContinuousFromTimestampStartingScannerTest.java |  17 ++--
 .../ContinuousLatestStartingScannerTest.java       |  11 +--
 .../source/snapshot/DeltaFollowUpScannerTest.java  |   8 +-
 .../source/snapshot/FullStartingScannerTest.java   |  11 +--
 .../InputChangelogFollowUpScannerTest.java         |   8 +-
 .../table/source/snapshot/ScannerTestBase.java     |   5 +
 .../flink/lookup/FileStoreLookupFunction.java      |   2 +-
 .../flink/source/CompactorSourceBuilder.java       |   1 +
 .../source/ContinuousFileSplitEnumerator.java      |  16 ++--
 .../flink/source/ContinuousFileStoreSource.java    |   2 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    |   3 +-
 .../flink/source/LogHybridSourceFactory.java       |  81 ++++++++++++++++
 .../paimon/flink/source/StaticFileStoreSource.java |  17 +---
 .../paimon/flink/action/CompactActionITCase.java   |   1 -
 .../source/ContinuousFileSplitEnumeratorTest.java  | 106 ++++++++++++++++-----
 .../source/FileStoreSourceSplitGeneratorTest.java  |   2 +-
 .../paimon/hive/mapred/PaimonInputFormat.java      |   9 +-
 .../java/org/apache/paimon/spark/SparkScan.java    |   6 +-
 50 files changed, 437 insertions(+), 263 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4e83ed8e6..507b4a385 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -200,7 +200,7 @@ public abstract class AbstractFileStoreTable implements FileStoreTable {
                 return Optional.empty();
             case FROM_TIMESTAMP:
                 Snapshot snapshot =
-                        StaticFromTimestampStartingScanner.getSnapshot(
+                        StaticFromTimestampStartingScanner.timeTravelToTimestamp(
                                 snapshotManager(), coreOptions.scanTimestampMills());
                 if (snapshot != null) {
                     long schemaId = snapshot.schemaId();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index eb5604209..bc485b748 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -21,8 +21,6 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
 import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
 import org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
@@ -32,7 +30,6 @@ import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
 import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
-import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Preconditions;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
@@ -46,30 +43,6 @@ public abstract class AbstractDataTableScan implements DataTableScan {
         this.snapshotSplitReader = snapshotSplitReader;
     }
 
-    @Override
-    public AbstractDataTableScan withSnapshot(long snapshotId) {
-        snapshotSplitReader.withSnapshot(snapshotId);
-        return this;
-    }
-
-    @Override
-    public AbstractDataTableScan withFilter(Predicate predicate) {
-        snapshotSplitReader.withFilter(predicate);
-        return this;
-    }
-
-    @Override
-    public AbstractDataTableScan withKind(ScanKind scanKind) {
-        snapshotSplitReader.withKind(scanKind);
-        return this;
-    }
-
-    @Override
-    public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
-        snapshotSplitReader.withLevelFilter(levelFilter);
-        return this;
-    }
-
     @VisibleForTesting
     public AbstractDataTableScan withBucket(int bucket) {
         snapshotSplitReader.withBucket(bucket);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
index 4a3e0bcf7..e2839253e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScan.java
@@ -18,14 +18,29 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
 
 import java.io.Serializable;
 
 /** {@link DataTableScan} for batch planning. */
 public interface BatchDataTableScan extends DataTableScan {
 
+    @Override
+    BatchDataTableScan withSnapshot(long snapshotId);
+
+    @Override
+    BatchDataTableScan withFilter(Predicate predicate);
+
+    @Override
+    BatchDataTableScan withKind(ScanKind scanKind);
+
+    @Override
+    BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter);
+
     BatchDataTableScan withStartingScanner(StartingScanner startingScanner);
 
     // ------------------------------------------------------------------------
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
index 2d180cb86..d0dead613 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/BatchDataTableScanImpl.java
@@ -19,12 +19,13 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 /** {@link DataTableScan} for batch planning. */
 public class BatchDataTableScanImpl extends AbstractDataTableScan implements BatchDataTableScan {
 
@@ -43,6 +44,30 @@ public class BatchDataTableScanImpl extends AbstractDataTableScan implements Bat
         this.hasNext = true;
     }
 
+    @Override
+    public BatchDataTableScan withSnapshot(long snapshotId) {
+        snapshotSplitReader.withSnapshot(snapshotId);
+        return this;
+    }
+
+    @Override
+    public BatchDataTableScan withFilter(Predicate predicate) {
+        snapshotSplitReader.withFilter(predicate);
+        return this;
+    }
+
+    @Override
+    public BatchDataTableScan withKind(ScanKind scanKind) {
+        snapshotSplitReader.withKind(scanKind);
+        return this;
+    }
+
+    @Override
+    public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+        snapshotSplitReader.withLevelFilter(levelFilter);
+        return this;
+    }
+
     @Override
     public BatchDataTableScan withStartingScanner(StartingScanner startingScanner) {
         this.startingScanner = startingScanner;
@@ -50,7 +75,6 @@ public class BatchDataTableScanImpl extends AbstractDataTableScan implements Bat
     }
 
     @Override
-    @Nullable
     public DataFilePlan plan() {
         if (startingScanner == null) {
             startingScanner = createStartingScanner(false);
@@ -58,7 +82,9 @@ public class BatchDataTableScanImpl extends AbstractDataTableScan implements Bat
 
         if (hasNext) {
             hasNext = false;
-            return startingScanner.getPlan(snapshotManager, snapshotSplitReader);
+            StartingScanner.Result result =
+                    startingScanner.scan(snapshotManager, snapshotSplitReader);
+            return DataFilePlan.fromResult(result);
         } else {
             throw new EndOfScanException();
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
index 066f0fb1b..c5249c5bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableScan.java
@@ -21,10 +21,12 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.utils.Filter;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.List;
 
 /** A {@link TableScan} for reading data. */
@@ -43,12 +45,10 @@ public interface DataTableScan extends InnerTableScan {
     /** Scanning plan containing snapshot ID and input splits. */
     class DataFilePlan implements Plan {
 
-        @Nullable public final Long snapshotId;
         public final List<DataSplit> splits;
 
         @VisibleForTesting
-        public DataFilePlan(@Nullable Long snapshotId, List<DataSplit> splits) {
-            this.snapshotId = snapshotId;
+        public DataFilePlan(List<DataSplit> splits) {
             this.splits = splits;
         }
 
@@ -57,5 +57,9 @@ public interface DataTableScan extends InnerTableScan {
         public List<Split> splits() {
             return (List) splits;
         }
+
+        public static DataFilePlan fromResult(@Nullable StartingScanner.Result result) {
+            return new DataFilePlan(result == null ? Collections.emptyList() : result.splits());
+        }
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index d125aec90..471c8428d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -19,11 +19,14 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
 
 import javax.annotation.Nullable;
 
@@ -33,6 +36,18 @@ import java.util.HashMap;
 /** {@link DataTableScan} for streaming planning. */
 public interface StreamDataTableScan extends DataTableScan, InnerStreamTableScan {
 
+    @Override
+    StreamDataTableScan withSnapshot(long snapshotId);
+
+    @Override
+    StreamDataTableScan withFilter(Predicate predicate);
+
+    @Override
+    StreamDataTableScan withKind(ScanKind scanKind);
+
+    @Override
+    StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter);
+
     boolean supportStreamingReadOverwrite();
 
     StreamDataTableScan withStartingScanner(StartingScanner startingScanner);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
index bf5336c08..afa4a302b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
@@ -20,6 +20,8 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
 import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -29,6 +31,7 @@ import org.apache.paimon.table.source.snapshot.FullStartingScanner;
 import org.apache.paimon.table.source.snapshot.InputChangelogFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -36,6 +39,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
+
 /** {@link DataTableScan} for streaming planning. */
 public class StreamDataTableScanImpl extends AbstractDataTableScan implements StreamDataTableScan {
 
@@ -62,6 +67,30 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
         this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
     }
 
+    @Override
+    public StreamDataTableScanImpl withSnapshot(long snapshotId) {
+        snapshotSplitReader.withSnapshot(snapshotId);
+        return this;
+    }
+
+    @Override
+    public StreamDataTableScanImpl withFilter(Predicate predicate) {
+        snapshotSplitReader.withFilter(predicate);
+        return this;
+    }
+
+    @Override
+    public StreamDataTableScanImpl withKind(ScanKind scanKind) {
+        snapshotSplitReader.withKind(scanKind);
+        return this;
+    }
+
+    @Override
+    public StreamDataTableScanImpl withLevelFilter(Filter<Integer> levelFilter) {
+        snapshotSplitReader.withLevelFilter(levelFilter);
+        return this;
+    }
+
     @Override
     public boolean supportStreamingReadOverwrite() {
         return supportStreamingReadOverwrite;
@@ -94,7 +123,6 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
         return this;
     }
 
-    @Nullable
     @Override
     public DataFilePlan plan() {
         if (startingScanner == null) {
@@ -115,16 +143,15 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
     }
 
     private DataFilePlan tryFirstPlan() {
-        DataTableScan.DataFilePlan plan =
-                startingScanner.getPlan(snapshotManager, snapshotSplitReader);
-        if (plan != null) {
-            long snapshot = plan.snapshotId;
-            nextSnapshotId = snapshot + 1;
-            if (boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshot))) {
+        StartingScanner.Result result = startingScanner.scan(snapshotManager, snapshotSplitReader);
+        if (result != null) {
+            long snapshotId = result.snapshotId();
+            nextSnapshotId = snapshotId + 1;
+            if (boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshotId))) {
                 isEnd = true;
             }
         }
-        return plan;
+        return DataFilePlan.fromResult(result);
     }
 
     private DataFilePlan nextPlan() {
@@ -137,7 +164,7 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
                 LOG.debug(
                         "Next snapshot id {} does not exist, wait for the snapshot generation.",
                         nextSnapshotId);
-                return null;
+                return new DataFilePlan(Collections.emptyList());
             }
 
             Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
@@ -158,7 +185,7 @@ public class StreamDataTableScanImpl extends AbstractDataTableScan implements St
             } else if (followUpScanner.shouldScanSnapshot(snapshot)) {
                 LOG.debug("Find snapshot id {}.", nextSnapshotId);
                 DataTableScan.DataFilePlan plan =
-                        followUpScanner.getPlan(nextSnapshotId, snapshotSplitReader);
+                        followUpScanner.scan(nextSnapshotId, snapshotSplitReader);
                 nextSnapshotId++;
                 return plan;
             } else {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 82d4d7d38..44f0ce43a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -24,6 +24,8 @@ import org.apache.paimon.utils.Restorable;
 /**
  * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
  *
+ * <p>NOTE: {@link #checkpoint} will return the next snapshot id.
+ *
  * @since 0.4.0
  */
 @Public
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
index e19eeef4f..28c2d9fed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableStreamingReader.java
@@ -87,11 +87,9 @@ public class TableStreamingReader {
         }
     }
 
-    @Nullable
     public Iterator<InternalRow> nextBatch() throws Exception {
         try {
-            DataFilePlan plan = scan.plan();
-            return plan == null ? null : read(plan);
+            return read(scan.plan());
         } catch (EndOfScanException e) {
             throw new IllegalArgumentException(
                     "TableStreamingReader does not support finished enumerator.", e);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index fed53fd02..63f5f7578 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -20,26 +20,27 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /** {@link StartingScanner} for the {@link CoreOptions.StartupMode#COMPACTED_FULL} startup mode. */
 public class CompactedStartingScanner implements StartingScanner {
 
     private static final Logger LOG = LoggerFactory.getLogger(CompactedStartingScanner.class);
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
         Long startingSnapshotId = snapshotManager.latestCompactedSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no compact snapshot. Waiting for snapshot generation.");
             return null;
         }
-        return new DataTableScan.DataFilePlan(
+        return new Result(
                 startingSnapshotId,
                 snapshotSplitReader
                         .withKind(ScanKind.ALL)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
index aeb5e5fb5..ac957587a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScanner.java
@@ -49,10 +49,9 @@ public class CompactionChangelogFollowUpScanner implements FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
+    public DataTableScan.DataFilePlan scan(
             long snapshotId, SnapshotSplitReader snapshotSplitReader) {
         return new DataTableScan.DataFilePlan(
-                snapshotId,
                 snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
index ccba3eef4..aa9e83e26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -45,10 +45,9 @@ public class ContinuousCompactorFollowUpScanner implements FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
+    public DataTableScan.DataFilePlan scan(
             long snapshotId, SnapshotSplitReader snapshotSplitReader) {
         return new DataTableScan.DataFilePlan(
-                snapshotId,
                 snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
index 0900d9ecf..e92291e5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScanner.java
@@ -19,13 +19,12 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import javax.annotation.Nullable;
 
 /** {@link StartingScanner} used internally for stand-alone streaming compact job sources. */
 public class ContinuousCompactorStartingScanner implements StartingScanner {
@@ -34,8 +33,8 @@ public class ContinuousCompactorStartingScanner implements StartingScanner {
             LoggerFactory.getLogger(ContinuousCompactorStartingScanner.class);
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (latestSnapshotId == null || earliestSnapshotId == null) {
@@ -47,13 +46,13 @@ public class ContinuousCompactorStartingScanner implements StartingScanner {
             Snapshot snapshot = snapshotManager.snapshot(id);
             if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
                 LOG.debug("Found latest compact snapshot {}, reading from the next snapshot.", id);
-                return new DataTableScan.DataFilePlan(id, Collections.emptyList());
+                return new Result(id);
             }
         }
 
         LOG.debug(
                 "No compact snapshot found, reading from the earliest snapshot {}.",
                 earliestSnapshotId);
-        return new DataTableScan.DataFilePlan(earliestSnapshotId - 1, Collections.emptyList());
+        return new Result(earliestSnapshotId - 1);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
index 8b6b88add..355bd6d6f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java
@@ -19,10 +19,9 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
-import java.util.Collections;
+import javax.annotation.Nullable;
 
 /**
  * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
@@ -37,16 +36,15 @@ public class ContinuousFromSnapshotStartingScanner implements StartingScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (earliestSnapshotId == null) {
             return null;
         }
         // We should use `snapshotId - 1` here to start to scan delta data from specific snapshot
         // id. If the snapshotId < earliestSnapshotId, start to scan from the earliest.
-        return new DataTableScan.DataFilePlan(
-                snapshotId >= earliestSnapshotId ? snapshotId - 1 : earliestSnapshotId - 1,
-                Collections.emptyList());
+        return new Result(
+                snapshotId >= earliestSnapshotId ? snapshotId - 1 : earliestSnapshotId - 1);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
index 43087ca77..4f6f318e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java
@@ -19,13 +19,12 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import javax.annotation.Nullable;
 
 /**
  * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
@@ -43,13 +42,13 @@ public class ContinuousFromTimestampStartingScanner implements StartingScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
         Long startingSnapshotId = snapshotManager.earlierThanTimeMills(startupMillis);
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
             return null;
         }
-        return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+        return new Result(startingSnapshotId);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
index 11c1441c7..f0c5c6fe1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScanner.java
@@ -19,13 +19,12 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import javax.annotation.Nullable;
 
 /**
  * {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST} startup mode of a
@@ -37,13 +36,13 @@ public class ContinuousLatestStartingScanner implements StartingScanner {
             LoggerFactory.getLogger(ContinuousLatestStartingScanner.class);
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
             return null;
         }
-        return new DataTableScan.DataFilePlan(startingSnapshotId, Collections.emptyList());
+        return new Result(startingSnapshotId);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
index 10a698f5b..97e30d141 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java
@@ -45,10 +45,9 @@ public class DeltaFollowUpScanner implements FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
+    public DataTableScan.DataFilePlan scan(
             long snapshotId, SnapshotSplitReader snapshotSplitReader) {
         return new DataTableScan.DataFilePlan(
-                snapshotId,
                 snapshotSplitReader.withKind(ScanKind.DELTA).withSnapshot(snapshotId).splits());
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
index b34871598..33886e5a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
@@ -27,11 +27,11 @@ public interface FollowUpScanner {
 
     boolean shouldScanSnapshot(Snapshot snapshot);
 
-    DataTableScan.DataFilePlan getPlan(long snapshotId, SnapshotSplitReader snapshotSplitReader);
+    DataTableScan.DataFilePlan scan(long snapshotId, SnapshotSplitReader snapshotSplitReader);
 
     default DataTableScan.DataFilePlan getOverwriteChangesPlan(
             long snapshotId, SnapshotSplitReader snapshotSplitReader) {
         return new DataTableScan.DataFilePlan(
-                snapshotId, snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
+                snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index c73ede04b..fdc93ab5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -20,26 +20,27 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /** {@link StartingScanner} for the {@link CoreOptions.StartupMode#LATEST_FULL} startup mode. */
 public class FullStartingScanner implements StartingScanner {
 
     private static final Logger LOG = LoggerFactory.getLogger(FullStartingScanner.class);
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
             return null;
         }
-        return new DataTableScan.DataFilePlan(
+        return new Result(
                 startingSnapshotId,
                 snapshotSplitReader
                         .withKind(ScanKind.ALL)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
index 73187c245..c901ef528 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScanner.java
@@ -45,10 +45,9 @@ public class InputChangelogFollowUpScanner implements FollowUpScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
+    public DataTableScan.DataFilePlan scan(
             long snapshotId, SnapshotSplitReader snapshotSplitReader) {
         return new DataTableScan.DataFilePlan(
-                snapshotId,
                 snapshotSplitReader.withKind(ScanKind.CHANGELOG).withSnapshot(snapshotId).splits());
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 39c73521e..2bedd924a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -19,16 +19,42 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.table.source.BatchDataTableScan;
-import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Helper class for the first planning of {@link BatchDataTableScan} and {@link
  * StreamDataTableScan}.
  */
 public interface StartingScanner {
 
-    DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader);
+    Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader);
+
+    /** Scan result of {@link #scan}. */
+    class Result {
+
+        private final long snapshotId;
+        private final List<DataSplit> splits;
+
+        public Result(long snapshotId) {
+            this(snapshotId, Collections.emptyList());
+        }
+
+        public Result(long snapshotId, List<DataSplit> splits) {
+            this.snapshotId = snapshotId;
+            this.splits = splits;
+        }
+
+        public long snapshotId() {
+            return snapshotId;
+        }
+
+        public List<DataSplit> splits() {
+            return splits;
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index b84b3cbe3..a63475035 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -20,9 +20,10 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 /**
  * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_SNAPSHOT} startup mode of a
  * batch read.
@@ -35,13 +36,13 @@ public class StaticFromSnapshotStartingScanner implements StartingScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
         if (snapshotManager.earliestSnapshotId() == null
                 || snapshotId < snapshotManager.earliestSnapshotId()) {
             return null;
         }
-        return new DataTableScan.DataFilePlan(
+        return new Result(
                 snapshotId,
                 snapshotSplitReader.withKind(ScanKind.ALL).withSnapshot(snapshotId).splits());
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 037a9ce5e..f4437e41b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.ScanKind;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -45,16 +44,16 @@ public class StaticFromTimestampStartingScanner implements StartingScanner {
     }
 
     @Override
-    public DataTableScan.DataFilePlan getPlan(
-            SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
-        Snapshot startingSnapshot = getSnapshot(snapshotManager, startupMillis);
+    @Nullable
+    public Result scan(SnapshotManager snapshotManager, SnapshotSplitReader snapshotSplitReader) {
+        Snapshot startingSnapshot = timeTravelToTimestamp(snapshotManager, startupMillis);
         if (startingSnapshot == null) {
             LOG.debug(
                     "There is currently no snapshot earlier than or equal to timestamp[{}]",
                     startupMillis);
             return null;
         }
-        return new DataTableScan.DataFilePlan(
+        return new Result(
                 startingSnapshot.id(),
                 snapshotSplitReader
                         .withKind(ScanKind.ALL)
@@ -63,7 +62,7 @@ public class StaticFromTimestampStartingScanner implements StartingScanner {
     }
 
     @Nullable
-    public static Snapshot getSnapshot(SnapshotManager snapshotManager, long timestamp) {
+    public static Snapshot timeTravelToTimestamp(SnapshotManager snapshotManager, long timestamp) {
         return snapshotManager.earlierOrEqualTimeMills(timestamp);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 9f1202d40..78536c951 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -231,25 +231,25 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
         }
 
         @Override
-        public DataTableScan withFilter(Predicate predicate) {
+        public BatchDataTableScan withFilter(Predicate predicate) {
             convert(predicate).ifPresent(batchScan::withFilter);
             return this;
         }
 
         @Override
-        public DataTableScan withKind(ScanKind kind) {
+        public BatchDataTableScan withKind(ScanKind kind) {
             batchScan.withKind(kind);
             return this;
         }
 
         @Override
-        public DataTableScan withSnapshot(long snapshotId) {
+        public BatchDataTableScan withSnapshot(long snapshotId) {
             batchScan.withSnapshot(snapshotId);
             return this;
         }
 
         @Override
-        public DataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+        public BatchDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
             batchScan.withLevelFilter(levelFilter);
             return this;
         }
@@ -274,25 +274,25 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
         }
 
         @Override
-        public DataTableScan withFilter(Predicate predicate) {
+        public StreamDataTableScan withFilter(Predicate predicate) {
             convert(predicate).ifPresent(streamScan::withFilter);
             return this;
         }
 
         @Override
-        public DataTableScan withKind(ScanKind kind) {
+        public StreamDataTableScan withKind(ScanKind kind) {
             streamScan.withKind(kind);
             return this;
         }
 
         @Override
-        public DataTableScan withSnapshot(long snapshotId) {
+        public StreamDataTableScan withSnapshot(long snapshotId) {
             streamScan.withSnapshot(snapshotId);
             return this;
         }
 
         @Override
-        public DataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+        public StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
             streamScan.withLevelFilter(levelFilter);
             return this;
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 2d2eaad7e..3c213b04c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -52,8 +52,6 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -175,7 +173,6 @@ public class FilesTable implements ReadonlyTable {
             return plan.splits.stream().mapToLong(s -> s.files().size()).sum();
         }
 
-        @Nullable
         private DataTableScan.DataFilePlan dataFilePlan() {
             return storeTable.newScan().plan();
         }
@@ -237,12 +234,7 @@ public class FilesTable implements ReadonlyTable {
             // schema id directly
             FieldStatsConverters fieldStatsConverters =
                     new FieldStatsConverters(
-                            sid -> schemaManager.schema(sid).fields(),
-                            dataFilePlan.snapshotId == null
-                                    ? table.schema().id()
-                                    : table.snapshotManager()
-                                            .snapshot(dataFilePlan.snapshotId)
-                                            .schemaId());
+                            sid -> schemaManager.schema(sid).fields(), table.schema().id());
 
             RowDataToObjectArrayConverter partitionConverter =
                     new RowDataToObjectArrayConverter(table.schema().logicalPartitionType());
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index fac1be471..61ece3833 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -457,7 +457,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         }
 
         // no more changelog
-        assertThat(scan.plan()).isNull();
+        assertThat(scan.plan().splits()).isEmpty();
 
         // write another commit
         StreamTableWrite write = table.newWrite(commitUser);
@@ -472,7 +472,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         assertNextSnapshot.apply(2);
 
         // no more changelog
-        assertThat(scan.plan()).isNull();
+        assertThat(scan.plan().splits()).isEmpty();
     }
 
     private void writeData() throws Exception {
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
index 14067842c..61e9236ca 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/BatchDataTableScanTest.java
@@ -54,7 +54,6 @@ public class BatchDataTableScanTest extends ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
         DataTableScan.DataFilePlan plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(2);
         assertThat(getResult(table.newRead(), plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
index 95adb855d..062a50856 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
@@ -48,8 +48,8 @@ public class StreamDataTableScanTest extends ScannerTestBase {
         StreamTableCommit commit = table.newCommit(commitUser);
         StreamDataTableScan scan = table.newStreamScan();
 
-        // first call without any snapshot, should return null
-        assertThat(scan.plan()).isNull();
+        // first call without any snapshot, should return empty plan
+        assertThat(scan.plan().splits()).isEmpty();
 
         // write base data
         write.write(rowData(1, 10, 100L));
@@ -64,12 +64,11 @@ public class StreamDataTableScanTest extends ScannerTestBase {
 
         // first call with snapshot, should return complete records from 2nd commit
         DataTableScan.DataFilePlan plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(2);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
 
-        // incremental call without new snapshots, should return null
-        assertThat(scan.plan()).isNull();
+        // incremental call without new snapshots, should return empty plan
+        assertThat(scan.plan().splits()).isEmpty();
 
         // write incremental data
         write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 101L));
@@ -85,18 +84,16 @@ public class StreamDataTableScanTest extends ScannerTestBase {
 
         // first incremental call, should return incremental records from 3rd commit
         plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(3);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|201", "+I 1|40|400"));
 
         // second incremental call, should return incremental records from 4th commit
         plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(4);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|103", "-D 1|40|400", "+I 1|50|500"));
 
-        // no more new snapshots, should return null
-        assertThat(scan.plan()).isNull();
+        // no more new snapshots, should return empty plan
+        assertThat(scan.plan().splits()).isEmpty();
 
         write.close();
         commit.close();
@@ -113,8 +110,8 @@ public class StreamDataTableScanTest extends ScannerTestBase {
         StreamTableCommit commit = table.newCommit(commitUser);
         StreamDataTableScan scan = table.newStreamScan();
 
-        // first call without any snapshot, should return null
-        assertThat(scan.plan()).isNull();
+        // first call without any snapshot, should return empty plan
+        assertThat(scan.plan().splits()).isEmpty();
 
         // write base data
         write.write(rowData(1, 10, 100L));
@@ -137,12 +134,11 @@ public class StreamDataTableScanTest extends ScannerTestBase {
 
         // first call with snapshot, should return full compacted records from 3rd commit
         DataTableScan.DataFilePlan plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(4);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
 
-        // incremental call without new snapshots, should return null
-        assertThat(scan.plan()).isNull();
+        // incremental call without new snapshots, should return empty plan
+        assertThat(scan.plan().splits()).isEmpty();
 
         // write incremental data
         write.write(rowData(1, 10, 103L));
@@ -150,15 +146,14 @@ public class StreamDataTableScanTest extends ScannerTestBase {
         write.write(rowData(1, 50, 500L));
         commit.commit(3, write.prepareCommit(true, 3));
 
-        // no new compact snapshots, should return null
-        assertThat(scan.plan()).isNull();
+        // no new compact snapshots, should return empty plan
+        assertThat(scan.plan().splits()).isEmpty();
 
         write.compact(binaryRow(1), 0, true);
         commit.commit(4, write.prepareCommit(true, 4));
 
         // full compaction done, read new changelog
         plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(6);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(
                         Arrays.asList(
@@ -168,8 +163,8 @@ public class StreamDataTableScanTest extends ScannerTestBase {
                                 "+U 1|20|201",
                                 "+I 1|50|500"));
 
-        // no more new snapshots, should return null
-        assertThat(scan.plan()).isNull();
+        // no more new snapshots, should return empty plan
+        assertThat(scan.plan().splits()).isEmpty();
 
         write.close();
         commit.close();
@@ -191,7 +186,6 @@ public class StreamDataTableScanTest extends ScannerTestBase {
         commit.commit(committable);
 
         DataTableScan.DataFilePlan plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(1);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
 
@@ -217,10 +211,9 @@ public class StreamDataTableScanTest extends ScannerTestBase {
         commit.commit(committable);
 
         DataTableScan.DataFilePlan plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(1);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
-        assertThat(scan.plan()).isNull();
+        assertThat(scan.plan().splits()).isEmpty();
 
         write.write(rowData(2, 20, 200L));
         committable = new ManifestCommittable(0, 7L);
@@ -228,10 +221,9 @@ public class StreamDataTableScanTest extends ScannerTestBase {
         commit.commit(committable);
 
         plan = scan.plan();
-        assertThat(plan.snapshotId).isEqualTo(2);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Collections.singletonList("+I 2|20|200"));
-        assertThat(scan.plan()).isNull();
+        assertThat(scan.plan().splits()).isEmpty();
 
         write.write(rowData(3, 30, 300L));
         committable = new ManifestCommittable(0, 9L);
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
index 245fb4c77..42ec5dc76 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactedStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -34,7 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class CompactedStartingScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -57,9 +56,9 @@ public class CompactedStartingScannerTest extends ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
 
         CompactedStartingScanner scanner = new CompactedStartingScanner();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(3);
-        assertThat(getResult(table.newRead(), plan.splits()))
+        StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+        assertThat(result.snapshotId()).isEqualTo(3);
+        assertThat(getResult(table.newRead(), toSplits(result.splits())))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
 
         write.close();
@@ -70,7 +69,7 @@ public class CompactedStartingScannerTest extends ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         CompactedStartingScanner scanner = new CompactedStartingScanner();
-        assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
     }
 
     @Test
@@ -87,7 +86,7 @@ public class CompactedStartingScannerTest extends ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(1);
 
         CompactedStartingScanner scanner = new CompactedStartingScanner();
-        assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
 
         write.close();
         commit.close();
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
index 43568f72e..bc3ed34d3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CompactionChangelogFollowUpScannerTest.java
@@ -39,7 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -79,8 +79,7 @@ public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
         snapshot = snapshotManager.snapshot(3);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(3, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(3);
+        DataTableScan.DataFilePlan plan = scanner.scan(3, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|20|200", "+I 1|30|300"));
 
@@ -91,8 +90,7 @@ public class CompactionChangelogFollowUpScannerTest extends ScannerTestBase {
         snapshot = snapshotManager.snapshot(5);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.getPlan(5, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(5);
+        plan = scanner.scan(5, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(
                         Arrays.asList("-U 1|10|102", "+U 1|10|103", "-D 1|30|300", "+I 1|40|401"));
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 1063d17de..3ee8d0d67 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -46,7 +46,7 @@ public class ContinuousCompactorFollowUpScannerTest extends ScannerTestBase {
     private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -84,16 +84,14 @@ public class ContinuousCompactorFollowUpScannerTest extends ScannerTestBase {
         Snapshot snapshot = snapshotManager.snapshot(1);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(1, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(1);
+        DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
 
         snapshot = snapshotManager.snapshot(2);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.getPlan(2, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(2);
+        plan = scanner.scan(2, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Collections.singletonList("+I 2|2|0|1"));
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
index 7f3402c0b..c62fa0514 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -32,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ContinuousCompactorStartingScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -60,9 +59,9 @@ public class ContinuousCompactorStartingScannerTest extends ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
 
         ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(3);
-        assertThat(plan.splits()).isEmpty();
+        StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+        assertThat(result.snapshotId()).isEqualTo(3);
+        assertThat(result.splits()).isEmpty();
 
         write.close();
         commit.close();
@@ -72,6 +71,6 @@ public class ContinuousCompactorStartingScannerTest extends ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
-        assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
     }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
index 85aedecc5..69dcb55c4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -32,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -60,9 +59,9 @@ public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase
 
         ContinuousFromTimestampStartingScanner scanner =
                 new ContinuousFromTimestampStartingScanner(timestamp);
-        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+        StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+        assertThat(result.snapshotId()).isEqualTo(2);
+        assertThat(getResult(table.newRead(), toSplits(result.splits()))).isEmpty();
 
         write.close();
         commit.close();
@@ -73,7 +72,7 @@ public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousFromTimestampStartingScanner scanner =
                 new ContinuousFromTimestampStartingScanner(System.currentTimeMillis());
-        assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
     }
 
     @Test
@@ -93,9 +92,9 @@ public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase
 
         ContinuousFromTimestampStartingScanner scanner =
                 new ContinuousFromTimestampStartingScanner(timestamp);
-        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(0);
-        assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+        StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+        assertThat(result.snapshotId()).isEqualTo(0);
+        assertThat(getResult(table.newRead(), toSplits(result.splits()))).isEmpty();
 
         write.close();
         commit.close();
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
index ba218410c..33ea201b7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousLatestStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -32,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ContinuousLatestStartingScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -50,9 +49,9 @@ public class ContinuousLatestStartingScannerTest extends ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
         ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(getResult(table.newRead(), plan.splits())).isEmpty();
+        StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+        assertThat(result.snapshotId()).isEqualTo(2);
+        assertThat(getResult(table.newRead(), toSplits(result.splits()))).isEmpty();
 
         write.close();
         commit.close();
@@ -62,6 +61,6 @@ public class ContinuousLatestStartingScannerTest extends ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         ContinuousLatestStartingScanner scanner = new ContinuousLatestStartingScanner();
-        assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
     }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
index 6edc68669..116b17702 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScannerTest.java
@@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class DeltaFollowUpScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -61,16 +61,14 @@ public class DeltaFollowUpScannerTest extends ScannerTestBase {
         Snapshot snapshot = snapshotManager.snapshot(1);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(1, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(1);
+        DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
 
         snapshot = snapshotManager.snapshot(2);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.getPlan(2, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(2);
+        plan = scanner.scan(2, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|102", "+I 1|30|300", "-D 1|40|400"));
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
index 0ecc7260d..7ce631d20 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/FullStartingScannerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -34,7 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class FullStartingScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -52,9 +51,9 @@ public class FullStartingScannerTest extends ScannerTestBase {
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(2);
 
         FullStartingScanner scanner = new FullStartingScanner();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(2);
-        assertThat(getResult(table.newRead(), plan.splits()))
+        StartingScanner.Result result = scanner.scan(snapshotManager, snapshotSplitReader);
+        assertThat(result.snapshotId()).isEqualTo(2);
+        assertThat(getResult(table.newRead(), toSplits(result.splits())))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|101", "+I 1|20|200", "+I 1|30|300"));
 
         write.close();
@@ -65,6 +64,6 @@ public class FullStartingScannerTest extends ScannerTestBase {
     public void testNoSnapshot() {
         SnapshotManager snapshotManager = table.snapshotManager();
         FullStartingScanner scanner = new FullStartingScanner();
-        assertThat(scanner.getPlan(snapshotManager, snapshotSplitReader)).isNull();
+        assertThat(scanner.scan(snapshotManager, snapshotSplitReader)).isNull();
     }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
index 69fac80f8..98a0bd20b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/InputChangelogFollowUpScannerTest.java
@@ -39,7 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class InputChangelogFollowUpScannerTest extends ScannerTestBase {
 
     @Test
-    public void testGetPlan() throws Exception {
+    public void testScan() throws Exception {
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -64,16 +64,14 @@ public class InputChangelogFollowUpScannerTest extends ScannerTestBase {
         Snapshot snapshot = snapshotManager.snapshot(1);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        DataTableScan.DataFilePlan plan = scanner.getPlan(1, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(1);
+        DataTableScan.DataFilePlan plan = scanner.scan(1, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 1|40|400"));
 
         snapshot = snapshotManager.snapshot(2);
         assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
-        plan = scanner.getPlan(2, snapshotSplitReader);
-        assertThat(plan.snapshotId).isEqualTo(2);
+        plan = scanner.scan(2, snapshotSplitReader);
         assertThat(getResult(read, plan.splits()))
                 .hasSameElementsAs(
                         Arrays.asList("+I 1|10|101", "+I 1|30|300", "+I 1|10|102", "-D 1|40|400"));
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index c265141f2..313585606 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataType;
@@ -137,4 +138,8 @@ public abstract class ScannerTestBase {
                                 ""));
         return FileStoreTableFactory.create(fileIO, tablePath, tableSchema, conf);
     }
+
+    protected List<Split> toSplits(List<DataSplit> dataSplits) {
+        return new ArrayList<>(dataSplits);
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index c6ed73628..748850284 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -194,7 +194,7 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
     private void refresh() throws Exception {
         while (true) {
             Iterator<InternalRow> batch = streamingReader.nextBatch();
-            if (batch == null) {
+            if (!batch.hasNext()) {
                 return;
             }
             this.lookupTable.refresh(batch);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index da68d09b9..731d2d787 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -116,6 +116,7 @@ public class CompactorSourceBuilder {
                     null,
                     partitionPredicate,
                     null,
+                    // static compactor source will compact all current files
                     table -> table.newScan().withStartingScanner(new FullStartingScanner()));
         }
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index c18458908..4bdf75f7f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.StreamDataTableScan;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -41,7 +42,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -62,18 +62,18 @@ public class ContinuousFileSplitEnumerator
 
     private final FileStoreSourceSplitGenerator splitGenerator;
 
-    private final Callable<DataFilePlan> callable;
+    private final StreamDataTableScan scan;
 
-    private Long nextSnapshotId;
+    @Nullable private Long nextSnapshotId;
 
     private boolean finished = false;
 
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             Collection<FileStoreSourceSplit> remainSplits,
-            Long nextSnapshotId,
+            @Nullable Long nextSnapshotId,
             long discoveryInterval,
-            Callable<DataFilePlan> callable) {
+            StreamDataTableScan scan) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
         this.bucketSplits = new HashMap<>();
@@ -82,7 +82,7 @@ public class ContinuousFileSplitEnumerator
         this.discoveryInterval = discoveryInterval;
         this.readersAwaitingSplit = new HashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
-        this.callable = callable;
+        this.scan = scan;
     }
 
     private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -107,7 +107,7 @@ public class ContinuousFileSplitEnumerator
 
     @Override
     public void start() {
-        context.callAsync(callable, this::processDiscoveredSplits, 0, discoveryInterval);
+        context.callAsync(scan::plan, this::processDiscoveredSplits, 0, discoveryInterval);
     }
 
     @Override
@@ -167,7 +167,7 @@ public class ContinuousFileSplitEnumerator
             return;
         }
 
-        nextSnapshotId = plan.snapshotId + 1;
+        nextSnapshotId = scan.checkpoint();
         addSplits(splitGenerator.createSplits(plan));
         assignSplits();
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 719477542..909a736fe 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -85,7 +85,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 splits,
                 nextSnapshotId,
                 table.coreOptions().continuousDiscoveryInterval().toMillis(),
-                scanFactory.create(table, nextSnapshotId).withFilter(predicate)::plan);
+                scanFactory.create(table, nextSnapshotId).withFilter(predicate));
     }
 
     @Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index d9bdee83a..0876926a5 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -132,7 +132,8 @@ public class FlinkSourceBuilder {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
-                                buildStaticFileSource())
+                                LogHybridSourceFactory.buildHybridFirstSource(
+                                        table, projectedFields, predicate))
                         .addSource(
                                 new LogHybridSourceFactory(logSourceProvider),
                                 Boundedness.CONTINUOUS_UNBOUNDED)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index bb49967e2..ef2dac7c1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -19,13 +19,27 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.utils.SnapshotManager;
 
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.base.source.hybrid.HybridSource;
 import org.apache.flink.connector.base.source.hybrid.HybridSource.SourceFactory;
 import org.apache.flink.table.data.RowData;
 
+import javax.annotation.Nullable;
+
+import java.util.Collection;
 import java.util.Map;
 
 /** Log {@link SourceFactory} from {@link StaticFileStoreSplitEnumerator}. */
@@ -49,4 +63,71 @@ public class LogHybridSourceFactory
         }
         return provider.createSource(logOffsets);
     }
+
+    public static FlinkSource buildHybridFirstSource(
+            Table table, @Nullable int[][] projectedFields, @Nullable Predicate predicate) {
+        if (!(table instanceof DataTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "FlinkHybridFirstSource only accepts DataTable. Unsupported table type: '%s'.",
+                            table.getClass().getSimpleName()));
+        }
+
+        DataTable dataTable = (DataTable) table;
+
+        return new FlinkHybridFirstSource(
+                table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
+                dataTable.snapshotManager(),
+                dataTable.coreOptions().toConfiguration());
+    }
+
+    /** The first source of a log {@link HybridSource}. */
+    private static class FlinkHybridFirstSource extends FlinkSource {
+
+        private static final long serialVersionUID = 3L;
+
+        private final SnapshotManager snapshotManager;
+        private final Options options;
+
+        public FlinkHybridFirstSource(
+                ReadBuilder readBuilder, SnapshotManager snapshotManager, Options options) {
+            super(readBuilder, null);
+            this.snapshotManager = snapshotManager;
+            this.options = options;
+        }
+
+        @Override
+        public Boundedness getBoundedness() {
+            return Boundedness.BOUNDED;
+        }
+
+        @Override
+        public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+                SplitEnumeratorContext<FileStoreSourceSplit> context,
+                PendingSplitsCheckpoint checkpoint) {
+            Long snapshotId = null;
+            Collection<FileStoreSourceSplit> splits;
+            if (checkpoint == null) {
+                FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
+                // get snapshot id and splits from scan
+                StreamTableScan scan = readBuilder.newStreamScan();
+                splits = splitGenerator.createSplits(scan.plan());
+                Long nextSnapshotId = scan.checkpoint();
+                if (nextSnapshotId != null) {
+                    snapshotId = nextSnapshotId - 1;
+                }
+            } else {
+                // restore from checkpoint
+                snapshotId = checkpoint.currentSnapshotId();
+                splits = checkpoint.splits();
+            }
+
+            Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
+            return new StaticFileStoreSplitEnumerator(
+                    context,
+                    snapshot,
+                    splits,
+                    options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index ae601717a..3af455a67 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,13 +18,11 @@
 
 package org.apache.paimon.flink.source;
 
-import org.apache.paimon.Snapshot;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.source.BatchDataTableScan;
 import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -32,7 +30,6 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Collection;
 
 /** Bounded {@link FlinkSource} for reading records. It does not monitor new snapshots. */
@@ -73,31 +70,21 @@ public class StaticFileStoreSource extends FlinkSource {
     public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
-        SnapshotManager snapshotManager = table.snapshotManager();
-
-        Long snapshotId = null;
         Collection<FileStoreSourceSplit> splits;
         if (checkpoint == null) {
-            splits = new ArrayList<>();
             FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
-
             // read all splits from scan
             DataTableScan.DataFilePlan plan =
                     scanFactory.create(table).withFilter(predicate).plan();
-            if (plan != null) {
-                snapshotId = plan.snapshotId;
-                splits.addAll(splitGenerator.createSplits(plan));
-            }
+            splits = splitGenerator.createSplits(plan);
         } else {
             // restore from checkpoint
-            snapshotId = checkpoint.currentSnapshotId();
             splits = checkpoint.splits();
         }
 
-        Snapshot snapshot = snapshotId == null ? null : snapshotManager.snapshot(snapshotId);
         return new StaticFileStoreSplitEnumerator(
                 context,
-                snapshot,
+                null,
                 splits,
                 table.coreOptions()
                         .toConfiguration()
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 1cc40d216..e6f2e85a9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -148,7 +148,6 @@ public class CompactActionITCase extends ActionITCaseBase {
         // no full compaction has happened, so plan should be empty
         StreamDataTableScan scan = table.newStreamScan();
         DataTableScan.DataFilePlan plan = scan.plan();
-        Assertions.assertEquals(1, (long) plan.snapshotId);
         Assertions.assertTrue(plan.splits().isEmpty());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 742621f31..a06cd198e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -19,9 +19,16 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
+import org.apache.paimon.table.source.snapshot.FollowUpScanner;
+import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.utils.Filter;
 
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
@@ -34,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
@@ -149,21 +155,13 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(1, "test-host");
 
         Queue<DataFilePlan> results = new LinkedBlockingQueue<>();
-        Callable<DataFilePlan> callable =
-                () -> {
-                    DataFilePlan plan = results.poll();
-                    if (plan == null) {
-                        throw new EndOfScanException();
-                    }
-                    return plan;
-                };
-
+        StreamDataTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
                         .setSplitEnumeratorContext(context)
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
-                        .setCallable(callable)
+                        .setScan(scan)
                         .build();
         enumerator.start();
 
@@ -172,7 +170,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.add(new DataFilePlan(snapshot, splits));
+        results.add(new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -228,9 +226,8 @@ public class ContinuousFileSplitEnumeratorTest {
     private static class Builder {
         private SplitEnumeratorContext<FileStoreSourceSplit> context;
         private Collection<FileStoreSourceSplit> initialSplits = Collections.emptyList();
-        private Long nextSnapshotId;
         private long discoveryInterval = Long.MAX_VALUE;
-        private Callable<DataFilePlan> callable;
+        private StreamDataTableScan scan;
 
         public Builder setSplitEnumeratorContext(
                 SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -243,24 +240,89 @@ public class ContinuousFileSplitEnumeratorTest {
             return this;
         }
 
-        public Builder setNextSnapshotId(Long nextSnapshotId) {
-            this.nextSnapshotId = nextSnapshotId;
-            return this;
-        }
-
         public Builder setDiscoveryInterval(long discoveryInterval) {
             this.discoveryInterval = discoveryInterval;
             return this;
         }
 
-        public Builder setCallable(Callable<DataFilePlan> callable) {
-            this.callable = callable;
+        public Builder setScan(StreamDataTableScan scan) {
+            this.scan = scan;
             return this;
         }
 
         public ContinuousFileSplitEnumerator build() {
             return new ContinuousFileSplitEnumerator(
-                    context, initialSplits, nextSnapshotId, discoveryInterval, callable);
+                    context, initialSplits, null, discoveryInterval, scan);
+        }
+    }
+
+    private static class MockScan implements StreamDataTableScan {
+        private final Queue<DataFilePlan> results;
+
+        public MockScan(Queue<DataFilePlan> results) {
+            this.results = results;
+        }
+
+        @Override
+        public StreamDataTableScan withKind(ScanKind kind) {
+            return null;
+        }
+
+        @Override
+        public StreamDataTableScan withSnapshot(long snapshotId) {
+            return null;
+        }
+
+        @Override
+        public StreamDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
+            return null;
+        }
+
+        @Override
+        public StreamDataTableScan withFilter(Predicate predicate) {
+            return null;
         }
+
+        @Override
+        public DataFilePlan plan() {
+            DataFilePlan plan = results.poll();
+            if (plan == null) {
+                throw new EndOfScanException();
+            }
+            return plan;
+        }
+
+        @Override
+        public boolean supportStreamingReadOverwrite() {
+            return false;
+        }
+
+        @Override
+        public StreamDataTableScan withStartingScanner(StartingScanner startingScanner) {
+            return null;
+        }
+
+        @Override
+        public StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner) {
+            return null;
+        }
+
+        @Override
+        public StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker) {
+            return null;
+        }
+
+        @Override
+        public StreamDataTableScan withSnapshotStarting() {
+            return null;
+        }
+
+        @Override
+        public Long checkpoint() {
+            return null;
+        }
+
+        @Override
+        public void restore(Long state) {}
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 4863bb66a..45c53d18c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -80,7 +80,7 @@ public class FileStoreSourceSplitGeneratorTest {
                         false,
                         Collections::singletonList,
                         FileStoreScan.Plan.groupByPartFiles(plan.files(FileKind.ADD)));
-        DataTableScan.DataFilePlan tableScanPlan = new DataTableScan.DataFilePlan(1L, scanSplits);
+        DataTableScan.DataFilePlan tableScanPlan = new DataTableScan.DataFilePlan(scanSplits);
 
         List<FileStoreSourceSplit> splits =
                 new FileStoreSourceSplitGenerator().createSplits(tableScanPlan);
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 98ca11288..f707ba843 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 
@@ -43,8 +42,6 @@ import org.apache.hadoop.mapred.Reporter;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -58,11 +55,7 @@ public class PaimonInputFormat implements InputFormat<Void, RowDataContainer> {
         FileStoreTable table = createFileStoreTable(jobConf);
         DataTableScan scan = table.newScan();
         createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
-
-        // TODO: Roll back modification after refactoring scan interface
-        DataTableScan.DataFilePlan plan = scan.plan();
-        List<DataSplit> splits = plan == null ? Collections.emptyList() : plan.splits;
-        return splits.stream()
+        return scan.plan().splits.stream()
                 .map(split -> new PaimonInputSplit(table.location().toString(), split))
                 .toArray(PaimonInputSplit[]::new);
     }
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index ecf0c6de2..1c5c7b50a 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -20,7 +20,6 @@ package org.apache.paimon.spark;
 
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableScan;
 
 import org.apache.spark.sql.connector.read.Batch;
 import org.apache.spark.sql.connector.read.InputPartition;
@@ -30,7 +29,6 @@ import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
 import org.apache.spark.sql.types.StructType;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
 
@@ -79,9 +77,7 @@ public class SparkScan implements Scan, SupportsReportStatistics {
 
     protected List<Split> splits() {
         if (splits == null) {
-            // TODO: Roll back modification after refactoring scan interface
-            TableScan.Plan plan = readBuilder.newScan().plan();
-            splits = plan == null ? Collections.emptyList() : plan.splits();
+            splits = readBuilder.newScan().plan().splits();
         }
         return splits;
     }