You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/04 05:05:09 UTC

[incubator-paimon] branch master updated: [core] Introduce consume-id for retore consuming and reserve snapshot (#1022)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 50d562252 [core] Introduce consume-id for retore consuming and reserve snapshot (#1022)
50d562252 is described below

commit 50d56225201dd934e1e40e5932ef3bde2cb38436
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu May 4 13:05:04 2023 +0800

    [core] Introduce consume-id for retore consuming and reserve snapshot (#1022)
---
 docs/content/how-to/querying-tables.md             |  23 ++
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  11 +
 .../java/org/apache/paimon/consumer/Consumer.java  |  69 ++++++
 .../apache/paimon/consumer/ConsumerManager.java    | 100 +++++++++
 .../paimon/operation/FileStoreExpireImpl.java      |  10 +
 .../table/source/AbstractInnerTableScan.java       |  14 ++
 .../table/source/InnerStreamTableScanImpl.java     |  15 ++
 .../paimon/table/source/StreamTableScan.java       |  17 +-
 .../table/source/snapshot/SnapshotSplitReader.java |   3 +
 .../source/snapshot/SnapshotSplitReaderImpl.java   |  10 +
 .../apache/paimon/table/system/AuditLogTable.java  |  11 +
 .../org/apache/paimon/utils/SnapshotManager.java   |   8 +
 .../paimon/consumer/ConsumerManagerTest.java       |  64 ++++++
 .../paimon/table/FileStoreTableTestBase.java       |  63 ++++++
 .../java/org/apache/paimon/flink/FlinkRowData.java |   7 +-
 .../paimon/flink/source/DataTableSource.java       |   4 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    | 119 ++++++----
 .../flink/source/operator/MonitorFunction.java     | 222 +++++++++++++++++++
 .../paimon/flink/source/operator/ReadOperator.java |  71 ++++++
 .../apache/paimon/flink/utils/JavaSerializer.java  | 161 ++++++++++++++
 .../apache/paimon/flink/utils/JavaTypeInfo.java    | 128 +++++++++++
 .../paimon/flink/ContinuousFileStoreITCase.java    |  24 ++
 .../org/apache/paimon/flink/FileStoreITCase.java   |  17 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  |   4 +
 .../flink/source/operator/OperatorSourceTest.java  | 243 +++++++++++++++++++++
 .../paimon/flink/utils/JavaSerializerTest.java     |  86 ++++++++
 .../paimon/flink/utils/JavaTypeInfoTest.java       |  30 ++-
 28 files changed, 1478 insertions(+), 62 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index cf8aad5e6..6614b8fa7 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -130,6 +130,29 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;
 
 {{< /tabs >}}
 
+## Consumer ID
+
+{{< hint info >}}
+This is an experimental feature.
+{{< /hint >}}
+
+You can specify the `consumer-id` when streaming read table:
+```sql
+SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
+```
+
+When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages:
+
+1. When previous job is stopped, the newly started job can continue to consume from the previous progress without
+   resuming from the state. The newly reading will start reading from next snapshot id found in consumer files.
+2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system,
+   and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.
+
+{{< hint info >}}
+NOTE: If there is a consumer that will not be used anymore, please delete it, otherwise it will affect the expiration
+of the snapshot. The consumer file is in `${table-path}/consumer/consumer-${consumer-id}`.
+{{< /hint >}}
+
 ## System Tables
 
 System tables contain metadata and information about each table, such as the snapshots created and the options in use. Users can access system tables with batch queries.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 16138d13b..6c704aa38 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,6 +68,12 @@
             <td>Integer</td>
             <td>Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.</td>
         </tr>
+        <tr>
+            <td><h5>consumer-id</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Consumer id for recording the offset of consumption in the storage.</td>
+        </tr>
         <tr>
             <td><h5>continuous.discovery-interval</h5></td>
             <td style="word-wrap: break-word;">1 s</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index dc2e310c8..7bb992cfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -564,6 +564,13 @@ public class CoreOptions implements Serializable {
                     .defaultValue(1024)
                     .withDescription("Read batch size for orc and parquet.");
 
+    public static final ConfigOption<String> CONSUMER_ID =
+            key("consumer-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Consumer id for recording the offset of consumption in the storage.");
+
     @Deprecated
     @ExcludeFromDocumentation("For compatibility with older versions")
     public static final ConfigOption<Boolean> APPEND_ONLY_ASSERT_DISORDER =
@@ -877,6 +884,10 @@ public class CoreOptions implements Serializable {
         return options.get(READ_BATCH_SIZE);
     }
 
+    public String consumerId() {
+        return options.get(CONSUMER_ID);
+    }
+
     public static StreamingReadMode streamReadType(Options options) {
         return options.get(STREAMING_READ_MODE);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
new file mode 100644
index 000000000..82a25c0af
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Consumer which contains next snapshot. */
+public class Consumer {
+
+    private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";
+
+    private final long nextSnapshot;
+
+    @JsonCreator
+    public Consumer(@JsonProperty(FIELD_NEXT_SNAPSHOT) long nextSnapshot) {
+        this.nextSnapshot = nextSnapshot;
+    }
+
+    @JsonGetter(FIELD_NEXT_SNAPSHOT)
+    public long nextSnapshot() {
+        return nextSnapshot;
+    }
+
+    public String toJson() {
+        return JsonSerdeUtil.toJson(this);
+    }
+
+    public static Consumer fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, Consumer.class);
+    }
+
+    public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
+        try {
+            if (!fileIO.exists(path)) {
+                return Optional.empty();
+            }
+
+            String json = fileIO.readFileUtf8(path);
+            return Optional.of(Consumer.fromJson(json));
+        } catch (IOException e) {
+            throw new RuntimeException("Fails to read snapshot from path " + path, e);
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
new file mode 100644
index 000000000..273a32dfc
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/** Manage consumer groups. */
+public class ConsumerManager implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String CONSUMER_PREFIX = "consumer-";
+
+    private final FileIO fileIO;
+    private final Path tablePath;
+
+    public ConsumerManager(FileIO fileIO, Path tablePath) {
+        this.fileIO = fileIO;
+        this.tablePath = tablePath;
+    }
+
+    public Optional<Consumer> consumer(String consumerId) {
+        return Consumer.fromPath(fileIO, consumerPath(consumerId));
+    }
+
+    public void recordConsumer(String consumerId, Consumer consumer) {
+        try (PositionOutputStream out = fileIO.newOutputStream(consumerPath(consumerId), true)) {
+            OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
+            writer.write(consumer.toJson());
+            writer.flush();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public OptionalLong minNextSnapshot() {
+        try {
+            Path directory = consumerDirectory();
+            if (!fileIO.exists(directory)) {
+                return OptionalLong.empty();
+            }
+
+            FileStatus[] statuses = fileIO.listStatus(directory);
+
+            if (statuses == null) {
+                throw new RuntimeException(
+                        String.format(
+                                "The return value is null of the listStatus for the '%s' directory.",
+                                directory));
+            }
+
+            return Arrays.stream(statuses)
+                    .map(FileStatus::getPath)
+                    .filter(path -> path.getName().startsWith(CONSUMER_PREFIX))
+                    .map(path -> Consumer.fromPath(fileIO, path))
+                    .filter(Optional::isPresent)
+                    .map(Optional::get)
+                    .mapToLong(Consumer::nextSnapshot)
+                    .reduce(Math::min);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private Path consumerDirectory() {
+        return new Path(tablePath + "/consumer");
+    }
+
+    private Path consumerPath(String consumerId) {
+        return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX + consumerId);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 8f0122e90..d31a1983a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -45,6 +46,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -71,6 +73,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
 
     private final FileStorePathFactory pathFactory;
     private final SnapshotManager snapshotManager;
+    private final ConsumerManager consumerManager;
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
 
@@ -91,6 +94,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         this.millisRetained = millisRetained;
         this.pathFactory = pathFactory;
         this.snapshotManager = snapshotManager;
+        this.consumerManager =
+                new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
     }
@@ -135,6 +140,11 @@ public class FileStoreExpireImpl implements FileStoreExpire {
     }
 
     private void expireUntil(long earliestId, long endExclusiveId) {
+        OptionalLong minNextSnapshot = consumerManager.minNextSnapshot();
+        if (minNextSnapshot.isPresent()) {
+            endExclusiveId = Math.min(minNextSnapshot.getAsLong(), endExclusiveId);
+        }
+
         if (endExclusiveId <= earliestId) {
             // No expire happens:
             // write the hint file in order to see the earliest snapshot directly next time
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 14066239f..250131467 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
 import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
@@ -35,6 +37,8 @@ import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner
 import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import org.apache.paimon.utils.Preconditions;
 
+import java.util.Optional;
+
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
@@ -65,6 +69,16 @@ public abstract class AbstractInnerTableScan implements InnerTableScan {
             return new ContinuousCompactorStartingScanner();
         }
 
+        // read from consumer id
+        String consumerId = options.consumerId();
+        if (consumerId != null) {
+            ConsumerManager consumerManager = snapshotSplitReader.consumerManager();
+            Optional<Consumer> consumer = consumerManager.consumer(consumerId);
+            if (consumer.isPresent()) {
+                return new ContinuousFromSnapshotStartingScanner(consumer.get().nextSnapshot());
+            }
+        }
+
         CoreOptions.StartupMode startupMode = options.startupMode();
         switch (startupMode) {
             case LATEST_FULL:
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 8c0f1ec31..dba177076 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -203,4 +204,18 @@ public class InnerStreamTableScanImpl extends AbstractInnerTableScan
     public void restore(@Nullable Long nextSnapshotId) {
         this.nextSnapshotId = nextSnapshotId;
     }
+
+    @Override
+    public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
+        if (nextSnapshot == null) {
+            return;
+        }
+
+        String consumerId = options.consumerId();
+        if (consumerId != null) {
+            snapshotSplitReader
+                    .consumerManager()
+                    .recordConsumer(consumerId, new Consumer(nextSnapshot));
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 44f0ce43a..f22f70058 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.utils.Restorable;
 
+import javax.annotation.Nullable;
+
 /**
  * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
  *
@@ -29,4 +31,17 @@ import org.apache.paimon.utils.Restorable;
  * @since 0.4.0
  */
 @Public
-public interface StreamTableScan extends TableScan, Restorable<Long> {}
+public interface StreamTableScan extends TableScan, Restorable<Long> {
+
+    /** Restore from checkpoint next snapshot id. */
+    @Override
+    void restore(@Nullable Long nextSnapshotId);
+
+    /** Checkpoint to return next snapshot id. */
+    @Nullable
+    @Override
+    Long checkpoint();
+
+    /** Notifies the checkpoint complete with next snapshot id. */
+    void notifyCheckpointComplete(@Nullable Long nextSnapshot);
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
index d813ffcc7..87def5fa1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
@@ -29,6 +30,8 @@ import java.util.List;
 /** Read splits from specified {@link Snapshot} with given configuration. */
 public interface SnapshotSplitReader {
 
+    ConsumerManager consumerManager();
+
     SnapshotSplitReader withSnapshot(long snapshotId);
 
     SnapshotSplitReader withFilter(Predicate predicate);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
index 5b3176e50..6ab594501 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
@@ -47,10 +48,12 @@ import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
 
 /** Implementation of {@link SnapshotSplitReader}. */
 public class SnapshotSplitReaderImpl implements SnapshotSplitReader {
+
     private final FileStoreScan scan;
     private final TableSchema tableSchema;
     private final CoreOptions options;
     private final SnapshotManager snapshotManager;
+    private final ConsumerManager consumerManager;
     private final SplitGenerator splitGenerator;
     private final BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer;
 
@@ -68,10 +71,17 @@ public class SnapshotSplitReaderImpl implements SnapshotSplitReader {
         this.tableSchema = tableSchema;
         this.options = options;
         this.snapshotManager = snapshotManager;
+        this.consumerManager =
+                new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
         this.splitGenerator = splitGenerator;
         this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
     }
 
+    @Override
+    public ConsumerManager consumerManager() {
+        return consumerManager;
+    }
+
     @Override
     public SnapshotSplitReader withSnapshot(long snapshotId) {
         scan.withSnapshot(snapshotId);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 016e5b961..27d4efbb4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.system;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
@@ -184,6 +185,11 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
             this.snapshotSplitReader = snapshotSplitReader;
         }
 
+        @Override
+        public ConsumerManager consumerManager() {
+            return snapshotSplitReader.consumerManager();
+        }
+
         public SnapshotSplitReader withSnapshot(long snapshotId) {
             snapshotSplitReader.withSnapshot(snapshotId);
             return this;
@@ -267,6 +273,11 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
         public void restore(@Nullable Long nextSnapshotId) {
             streamScan.restore(nextSnapshotId);
         }
+
+        @Override
+        public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
+            streamScan.notifyCheckpointComplete(nextSnapshot);
+        }
     }
 
     private class AuditLogRead implements InnerTableRead {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 077fe3cd0..f9f84fd55 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -55,6 +55,14 @@ public class SnapshotManager implements Serializable {
         this.tablePath = tablePath;
     }
 
+    public FileIO fileIO() {
+        return fileIO;
+    }
+
+    public Path tablePath() {
+        return tablePath;
+    }
+
     public Path snapshotDirectory() {
         return new Path(tablePath + "/snapshot");
     }
diff --git a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
new file mode 100644
index 000000000..572055f17
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ConsumerManager}. */
+public class ConsumerManagerTest {
+
+    @TempDir Path tempDir;
+
+    private ConsumerManager manager;
+
+    @BeforeEach
+    public void before() {
+        this.manager =
+                new ConsumerManager(
+                        LocalFileIO.create(), new org.apache.paimon.fs.Path(tempDir.toUri()));
+    }
+
+    @Test
+    public void test() {
+        Optional<Consumer> consumer = manager.consumer("id1");
+        assertThat(consumer).isEmpty();
+
+        assertThat(manager.minNextSnapshot()).isEmpty();
+
+        manager.recordConsumer("id1", new Consumer(5));
+        consumer = manager.consumer("id1");
+        assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(5L);
+
+        manager.recordConsumer("id2", new Consumer(8));
+        consumer = manager.consumer("id2");
+        assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L);
+
+        assertThat(manager.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
+    }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index bc27cef46..d23132239 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -47,8 +47,11 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -536,6 +539,66 @@ public abstract class FileStoreTableTestBase {
                                 "1|40|400|binary|varbinary|mapKey:mapVal|multiset|4000"));
     }
 
+    @Test
+    public void testConsumeId() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set(CoreOptions.CONSUMER_ID, "my_id");
+                            options.set(SNAPSHOT_NUM_RETAINED_MIN, 3);
+                            options.set(SNAPSHOT_NUM_RETAINED_MAX, 3);
+                        });
+
+        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = writeBuilder.newWrite();
+        StreamTableCommit commit = writeBuilder.newCommit();
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        StreamTableScan scan = readBuilder.newStreamScan();
+        TableRead read = readBuilder.newRead();
+
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        List<String> result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+        assertThat(result)
+                .containsExactlyInAnyOrder("+1|10|100|binary|varbinary|mapKey:mapVal|multiset");
+
+        write.write(rowData(1, 20, 200L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+        assertThat(result)
+                .containsExactlyInAnyOrder("+1|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        // checkpoint and notifyCheckpointComplete
+        Long nextSnapshot = scan.checkpoint();
+        scan.notifyCheckpointComplete(nextSnapshot);
+
+        write.write(rowData(1, 30, 300L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // read again using consume id
+        scan = readBuilder.newStreamScan();
+        assertThat(scan.plan().splits()).isEmpty();
+        result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+        assertThat(result)
+                .containsExactlyInAnyOrder("+1|30|300|binary|varbinary|mapKey:mapVal|multiset");
+
+        // test snapshot expiration
+        for (int i = 3; i <= 8; i++) {
+            write.write(rowData(1, (i + 1) * 10, (i + 1) * 100L));
+            commit.commit(i, write.prepareCommit(true, i));
+        }
+
+        // not expire
+        result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+        assertThat(result)
+                .containsExactlyInAnyOrder("+1|40|400|binary|varbinary|mapKey:mapVal|multiset");
+
+        write.close();
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
index f57fd6349..6e71b3292 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
@@ -39,12 +39,17 @@ import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
 /** Convert to Flink row data. */
 public class FlinkRowData implements RowData {
 
-    private final InternalRow row;
+    private InternalRow row;
 
     public FlinkRowData(InternalRow row) {
         this.row = row;
     }
 
+    public FlinkRowData replace(InternalRow row) {
+        this.row = row;
+        return this;
+    }
+
     @Override
     public int getArity() {
         return row.getFieldCount();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index b24967965..0002cb91b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -39,7 +39,7 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -208,7 +208,7 @@ public class DataTableSource extends FlinkTableSource
                 !streaming, env -> configureSource(sourceBuilder, env));
     }
 
-    private DataStreamSource<RowData> configureSource(
+    private DataStream<RowData> configureSource(
             FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
         Options options = Options.fromMap(this.table.options());
         Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 0895c7389..2159192b1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -24,16 +24,20 @@ import org.apache.paimon.CoreOptions.StreamingReadMode;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.Projection;
 import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.source.operator.MonitorFunction;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -122,23 +126,55 @@ public class FlinkSourceBuilder {
         return this;
     }
 
-    private StaticFileStoreSource buildStaticFileSource() {
-        return new StaticFileStoreSource(
-                table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
-                limit,
-                Options.fromMap(table.options())
-                        .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
-                splits);
+    private ReadBuilder createReadBuilder() {
+        return table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
     }
 
-    private ContinuousFileStoreSource buildContinuousFileSource() {
-        return new ContinuousFileStoreSource(
-                table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
-                table.options(),
-                limit);
+    private DataStream<RowData> buildStaticFileSource() {
+        return toDataStream(
+                new StaticFileStoreSource(
+                        createReadBuilder(),
+                        limit,
+                        Options.fromMap(table.options())
+                                .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+                        splits));
     }
 
-    public Source<RowData, ?, ?> buildSource() {
+    private DataStream<RowData> buildContinuousFileSource() {
+        return toDataStream(
+                new ContinuousFileStoreSource(createReadBuilder(), table.options(), limit));
+    }
+
+    private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
+        DataStreamSource<RowData> dataStream =
+                env.fromSource(
+                        source,
+                        watermarkStrategy == null
+                                ? WatermarkStrategy.noWatermarks()
+                                : watermarkStrategy,
+                        tableIdentifier.asSummaryString(),
+                        produceTypeInfo());
+        if (parallelism != null) {
+            dataStream.setParallelism(parallelism);
+        }
+        return dataStream;
+    }
+
+    private TypeInformation<RowData> produceTypeInfo() {
+        RowType rowType = toLogicalType(table.rowType());
+        LogicalType produceType =
+                Optional.ofNullable(projectedFields)
+                        .map(Projection::of)
+                        .map(p -> p.project(rowType))
+                        .orElse(rowType);
+        return InternalTypeInfo.of(produceType);
+    }
+
+    public DataStream<RowData> build() {
+        if (env == null) {
+            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
+        }
+
         if (isContinuous) {
             TableScanUtils.streamingReadingValidate(table);
 
@@ -148,44 +184,47 @@ public class FlinkSourceBuilder {
 
             if (logSourceProvider != null && streamingReadMode != FILE) {
                 if (startupMode != StartupMode.LATEST_FULL) {
-                    return logSourceProvider.createSource(null);
+                    return toDataStream(logSourceProvider.createSource(null));
+                } else {
+                    return toDataStream(
+                            HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
+                                            LogHybridSourceFactory.buildHybridFirstSource(
+                                                    table, projectedFields, predicate))
+                                    .addSource(
+                                            new LogHybridSourceFactory(logSourceProvider),
+                                            Boundedness.CONTINUOUS_UNBOUNDED)
+                                    .build());
                 }
-                return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
-                                LogHybridSourceFactory.buildHybridFirstSource(
-                                        table, projectedFields, predicate))
-                        .addSource(
-                                new LogHybridSourceFactory(logSourceProvider),
-                                Boundedness.CONTINUOUS_UNBOUNDED)
-                        .build();
             } else {
-                return buildContinuousFileSource();
+                if (conf.contains(CoreOptions.CONSUMER_ID)) {
+                    return buildContinuousStreamOperator();
+                } else {
+                    return buildContinuousFileSource();
+                }
             }
         } else {
             return buildStaticFileSource();
         }
     }
 
-    public DataStreamSource<RowData> build() {
-        if (env == null) {
-            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
+    private DataStream<RowData> buildContinuousStreamOperator() {
+        DataStream<RowData> dataStream;
+        if (limit != null) {
+            throw new IllegalArgumentException(
+                    "Cannot limit streaming source, please use batch execution mode.");
         }
-
-        RowType rowType = toLogicalType(table.rowType());
-        LogicalType produceType =
-                Optional.ofNullable(projectedFields)
-                        .map(Projection::of)
-                        .map(p -> p.project(rowType))
-                        .orElse(rowType);
-        DataStreamSource<RowData> dataStream =
-                env.fromSource(
-                        buildSource(),
-                        watermarkStrategy == null
-                                ? WatermarkStrategy.noWatermarks()
-                                : watermarkStrategy,
+        dataStream =
+                MonitorFunction.buildSource(
+                        env,
                         tableIdentifier.asSummaryString(),
-                        InternalTypeInfo.of(produceType));
+                        produceTypeInfo(),
+                        createReadBuilder(),
+                        conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
         if (parallelism != null) {
-            dataStream.setParallelism(parallelism);
+            dataStream.getTransformation().setParallelism(parallelism);
+        }
+        if (watermarkStrategy != null) {
+            dataStream = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);
         }
         return dataStream;
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
new file mode 100644
index 000000000..0e8688ba9
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.OptionalLong;
+import java.util.TreeMap;
+
+/**
+ * This is the single (non-parallel) monitoring task, it is responsible for:
+ *
+ * <ol>
+ *   <li>Monitoring snapshots of the Paimon table.
+ *   <li>Creating the {@link Split splits} corresponding to the incremental files
+ *   <li>Assigning them to downstream tasks for further processing.
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link ReadOperator} which can have
+ * parallelism greater than one.
+ */
+public class MonitorFunction extends RichSourceFunction<Split>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MonitorFunction.class);
+
+    private final ReadBuilder readBuilder;
+    private final long monitorInterval;
+
+    private volatile boolean isRunning = true;
+
+    private transient StreamTableScan scan;
+    private transient SourceContext<Split> ctx;
+
+    private transient ListState<Long> checkpointState;
+    private transient ListState<Tuple2<Long, Long>> nextSnapshotState;
+    private transient TreeMap<Long, Long> nextSnapshotPerCheckpoint;
+
+    public MonitorFunction(ReadBuilder readBuilder, long monitorInterval) {
+        this.readBuilder = readBuilder;
+        this.monitorInterval = monitorInterval;
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.scan = readBuilder.newStreamScan();
+
+        this.checkpointState =
+                context.getOperatorStateStore()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "next-snapshot", LongSerializer.INSTANCE));
+
+        @SuppressWarnings("unchecked")
+        final Class<Tuple2<Long, Long>> typedTuple =
+                (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class;
+        this.nextSnapshotState =
+                context.getOperatorStateStore()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "next-snapshot-per-checkpoint",
+                                        new TupleSerializer<>(
+                                                typedTuple,
+                                                new TypeSerializer[] {
+                                                    LongSerializer.INSTANCE, LongSerializer.INSTANCE
+                                                })));
+
+        this.nextSnapshotPerCheckpoint = new TreeMap<>();
+
+        if (context.isRestored()) {
+            LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+            List<Long> retrievedStates = new ArrayList<>();
+            for (Long entry : this.checkpointState.get()) {
+                retrievedStates.add(entry);
+            }
+
+            // given that the parallelism of the function is 1, we can only have 1 retrieved items.
+
+            Preconditions.checkArgument(
+                    retrievedStates.size() <= 1,
+                    getClass().getSimpleName() + " retrieved invalid state.");
+
+            if (retrievedStates.size() == 1) {
+                this.scan.restore(retrievedStates.get(0));
+            }
+
+            for (Tuple2<Long, Long> tuple2 : nextSnapshotState.get()) {
+                nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1);
+            }
+        } else {
+            LOG.info("No state to restore for the {}.", getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+        this.checkpointState.clear();
+        Long nextSnapshot = this.scan.checkpoint();
+        if (nextSnapshot != null) {
+            this.checkpointState.add(nextSnapshot);
+            this.nextSnapshotPerCheckpoint.put(ctx.getCheckpointId(), nextSnapshot);
+        }
+
+        List<Tuple2<Long, Long>> nextSnapshots = new ArrayList<>();
+        this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2<>(k, v)));
+        this.nextSnapshotState.update(nextSnapshots);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot);
+        }
+    }
+
+    @SuppressWarnings("BusyWait")
+    @Override
+    public void run(SourceContext<Split> ctx) throws Exception {
+        this.ctx = ctx;
+        while (isRunning) {
+            boolean isEmpty;
+            synchronized (ctx.getCheckpointLock()) {
+                if (!isRunning) {
+                    return;
+                }
+                try {
+                    List<Split> splits = scan.plan().splits();
+                    isEmpty = splits.isEmpty();
+                    splits.forEach(ctx::collect);
+                } catch (EndOfScanException esf) {
+                    LOG.info("Catching EndOfStreamException, the stream is finished.");
+                    return;
+                }
+            }
+
+            if (isEmpty) {
+                Thread.sleep(monitorInterval);
+            }
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        NavigableMap<Long, Long> nextSnapshots =
+                nextSnapshotPerCheckpoint.headMap(checkpointId, true);
+        OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
+        max.ifPresent(scan::notifyCheckpointComplete);
+        nextSnapshots.clear();
+    }
+
+    @Override
+    public void cancel() {
+        // this is to cover the case where cancel() is called before the run()
+        if (ctx != null) {
+            synchronized (ctx.getCheckpointLock()) {
+                isRunning = false;
+            }
+        } else {
+            isRunning = false;
+        }
+    }
+
+    public static DataStream<RowData> buildSource(
+            StreamExecutionEnvironment env,
+            String name,
+            TypeInformation<RowData> typeInfo,
+            ReadBuilder readBuilder,
+            long monitorInterval) {
+        return env.addSource(
+                        new MonitorFunction(readBuilder, monitorInterval),
+                        name + "-Monitor",
+                        new JavaTypeInfo<>(Split.class))
+                .forceNonParallel()
+                .partitionCustom(
+                        (key, numPartitions) -> key % numPartitions,
+                        split -> ((DataSplit) split).bucket())
+                .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
new file mode 100644
index 000000000..f33e31b5f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.utils.CloseableIterator;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * The operator that reads the {@link Split splits} received from the preceding {@link
+ * MonitorFunction}. Contrary to the {@link MonitorFunction} which has a parallelism of 1, this
+ * operator can have DOP > 1.
+ */
+public class ReadOperator extends AbstractStreamOperator<RowData>
+        implements OneInputStreamOperator<Split, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ReadBuilder readBuilder;
+
+    private transient TableRead read;
+    private transient StreamRecord<RowData> reuseRecord;
+    private transient FlinkRowData reuseRow;
+
+    public ReadOperator(ReadBuilder readBuilder) {
+        this.readBuilder = readBuilder;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.read = readBuilder.newRead();
+        this.reuseRow = new FlinkRowData(null);
+        this.reuseRecord = new StreamRecord<>(reuseRow);
+    }
+
+    @Override
+    public void processElement(StreamRecord<Split> record) throws Exception {
+        try (CloseableIterator<InternalRow> iterator =
+                read.createReader(record.getValue()).toCloseableIterator()) {
+            while (iterator.hasNext()) {
+                reuseRow.replace(iterator.next());
+                output.collect(reuseRecord);
+            }
+        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaSerializer.java
new file mode 100644
index 000000000..4fe1c82e8
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaSerializer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeutils.GenericTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A type serializer that serializes its type using the Java serialization. */
+public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+    private static final long serialVersionUID = 3L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(JavaSerializer.class);
+
+    private final Class<T> type;
+
+    public JavaSerializer(Class<T> type) {
+        this.type = checkNotNull(type);
+    }
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @Override
+    public JavaSerializer<T> duplicate() {
+        return new JavaSerializer<>(type);
+    }
+
+    @Override
+    public T createInstance() {
+        try {
+            return type.newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public T copy(T from) {
+        try {
+            return InstantiationUtil.clone(from);
+        } catch (IOException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public T copy(T from, T reuse) {
+        return copy(from);
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public void serialize(T record, DataOutputView target) throws IOException {
+        InstantiationUtil.serializeObject(new DataOutputViewStream(target), record);
+    }
+
+    @Override
+    public T deserialize(DataInputView source) throws IOException {
+        try {
+            return InstantiationUtil.deserializeObject(
+                    new DataInputViewStream(source), getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public T deserialize(T reuse, DataInputView source) throws IOException {
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws IOException {
+        serialize(deserialize(source), target);
+    }
+
+    @Override
+    public int hashCode() {
+        return type.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof JavaSerializer) {
+            JavaSerializer<?> other = (JavaSerializer<?>) obj;
+
+            return type == other.type;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public TypeSerializerSnapshot<T> snapshotConfiguration() {
+        return new JavaSerializerSnapshot<>(type);
+    }
+
+    /** {@link JavaSerializer} snapshot class. */
+    public static final class JavaSerializerSnapshot<T extends Serializable>
+            extends GenericTypeSerializerSnapshot<T, JavaSerializer> {
+
+        @SuppressWarnings("unused")
+        public JavaSerializerSnapshot() {}
+
+        JavaSerializerSnapshot(Class<T> typeClass) {
+            super(typeClass);
+        }
+
+        @Override
+        protected TypeSerializer<T> createSerializer(Class<T> typeClass) {
+            return new JavaSerializer<>(typeClass);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        protected Class<T> getTypeClass(JavaSerializer serializer) {
+            return serializer.type;
+        }
+
+        @Override
+        protected Class<?> serializerClass() {
+            return JavaSerializer.class;
+        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
new file mode 100644
index 000000000..a36243c5b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link TypeInformation} which uses java serialization. */
+public class JavaTypeInfo<T extends Serializable> extends TypeInformation<T>
+        implements AtomicType<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Class<T> typeClass;
+
+    public JavaTypeInfo(Class<T> typeClass) {
+        this.typeClass = checkNotNull(typeClass);
+    }
+
+    @Override
+    @PublicEvolving
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    @PublicEvolving
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    @PublicEvolving
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    @PublicEvolving
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    @PublicEvolving
+    public Class<T> getTypeClass() {
+        return typeClass;
+    }
+
+    @Override
+    @PublicEvolving
+    public boolean isKeyType() {
+        return Comparable.class.isAssignableFrom(typeClass);
+    }
+
+    @Override
+    public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+        return new JavaSerializer<>(this.typeClass);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public TypeComparator<T> createComparator(
+            boolean sortOrderAscending, ExecutionConfig executionConfig) {
+        if (isKeyType()) {
+            @SuppressWarnings("rawtypes")
+            GenericTypeComparator comparator =
+                    new GenericTypeComparator(
+                            sortOrderAscending, createSerializer(executionConfig), this.typeClass);
+            return (TypeComparator<T>) comparator;
+        }
+
+        throw new UnsupportedOperationException(
+                "Types that do not implement java.lang.Comparable cannot be used as keys.");
+    }
+
+    @Override
+    public int hashCode() {
+        return typeClass.hashCode();
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof JavaTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof JavaTypeInfo) {
+            @SuppressWarnings("unchecked")
+            JavaTypeInfo<T> genericTypeInfo = (JavaTypeInfo<T>) obj;
+
+            return typeClass == genericTypeInfo.typeClass;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JavaType<" + typeClass.getCanonicalName() + ">";
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index df2fddbf7..c572d0309 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -92,6 +92,30 @@ public class ContinuousFileStoreITCase extends CatalogITCaseBase {
         testProjection("T2");
     }
 
+    @TestTemplate
+    public void testConsumerId() throws Exception {
+        String table = "T2";
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table));
+
+        batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+        Thread.sleep(1000);
+        iterator.close();
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table));
+        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+        iterator.close();
+    }
+
     private void testSimple(String table) throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 1e17967e0..05070c5e9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -37,9 +37,11 @@ import org.apache.paimon.utils.FailingFileIO;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.GenericRowData;
@@ -326,12 +328,15 @@ public class FileStoreITCase extends AbstractTestBase {
         table =
                 table.copy(
                         Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024"));
-        Source<RowData, ?, ?> source =
+        DataStream<RowData> source =
                 new FlinkSourceBuilder(IDENTIFIER, table)
                         .withContinuousMode(true)
                         .withEnv(env)
-                        .buildSource();
-        assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                        .build();
+        Transformation<RowData> transformation = source.getTransformation();
+        assertThat(transformation).isInstanceOf(SourceTransformation.class);
+        assertThat(((SourceTransformation<?, ?, ?>) transformation).getSource().getBoundedness())
+                .isEqualTo(Boundedness.BOUNDED);
     }
 
     private void innerTestContinuous(FileStoreTable table) throws Exception {
@@ -449,12 +454,12 @@ public class FileStoreITCase extends AbstractTestBase {
                         InternalTypeInfo.of(TABLE_TYPE));
     }
 
-    public static List<Row> executeAndCollect(DataStreamSource<RowData> source) throws Exception {
+    public static List<Row> executeAndCollect(DataStream<RowData> source) throws Exception {
         return executeAndCollect(source, CONVERTER);
     }
 
     public static List<Row> executeAndCollect(
-            DataStreamSource<RowData> source, DataStructureConverter<RowData, Row> converter)
+            DataStream<RowData> source, DataStructureConverter<RowData, Row> converter)
             throws Exception {
         CloseableIterator<RowData> iterator = source.executeAndCollect();
         List<Row> results = new ArrayList<>();
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 2ecd7f54e..a0ab1fc41 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -318,6 +319,9 @@ public class ContinuousFileSplitEnumeratorTest {
             return null;
         }
 
+        @Override
+        public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {}
+
         @Override
         public void restore(Long state) {}
     }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
new file mode 100644
index 000000000..91e4d4e3a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.function.SupplierWithException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.CoreOptions.CONSUMER_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MonitorFunction} and {@link ReadOperator}. */
+public class OperatorSourceTest {
+
+    @TempDir Path tempDir;
+
+    private Table table;
+
+    @BeforeEach
+    public void before()
+            throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException,
+                    Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException {
+        Catalog catalog =
+                CatalogFactory.createCatalog(
+                        CatalogContext.create(new org.apache.paimon.fs.Path(tempDir.toUri())));
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .primaryKey("a")
+                        .options(Collections.singletonMap(CONSUMER_ID.key(), "my_consumer"))
+                        .build();
+        Identifier identifier = Identifier.create("default", "t");
+        catalog.createDatabase("default", false);
+        catalog.createTable(identifier, schema, false);
+        this.table = catalog.getTable(identifier);
+    }
+
+    private void writeToTable(int a, int b, int c) throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(GenericRow.of(a, b, c));
+        writeBuilder.newCommit().commit(write.prepareCommit());
+        write.close();
+    }
+
+    private List<List<Integer>> readSplit(Split split) throws IOException {
+        TableRead read = table.newReadBuilder().newRead();
+        List<List<Integer>> result = new ArrayList<>();
+        read.createReader(split)
+                .forEachRemaining(
+                        row ->
+                                result.add(
+                                        Arrays.asList(
+                                                row.getInt(0), row.getInt(1), row.getInt(2))));
+        return result;
+    }
+
+    @Test
+    public void testMonitorFunction() throws Exception {
+        // 1. run first
+        OperatorSubtaskState snapshot;
+        {
+            MonitorFunction function = new MonitorFunction(table.newReadBuilder(), 10);
+            StreamSource<Split, MonitorFunction> src = new StreamSource<>(function);
+            AbstractStreamOperatorTestHarness<Split> testHarness =
+                    new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+            testHarness.open();
+            snapshot = testReadSplit(function, () -> testHarness.snapshot(0, 0), 1, 1, 1);
+        }
+
+        // 2. restore from state
+        {
+            MonitorFunction functionCopy1 = new MonitorFunction(table.newReadBuilder(), 10);
+            StreamSource<Split, MonitorFunction> srcCopy1 = new StreamSource<>(functionCopy1);
+            AbstractStreamOperatorTestHarness<Split> testHarnessCopy1 =
+                    new AbstractStreamOperatorTestHarness<>(srcCopy1, 1, 1, 0);
+            testHarnessCopy1.initializeState(snapshot);
+            testHarnessCopy1.open();
+            testReadSplit(
+                    functionCopy1,
+                    () -> {
+                        testHarnessCopy1.snapshot(1, 1);
+                        testHarnessCopy1.notifyOfCompletedCheckpoint(1);
+                        return null;
+                    },
+                    2,
+                    2,
+                    2);
+        }
+
+        // 3. restore from consumer id
+        {
+            MonitorFunction functionCopy2 = new MonitorFunction(table.newReadBuilder(), 10);
+            StreamSource<Split, MonitorFunction> srcCopy2 = new StreamSource<>(functionCopy2);
+            AbstractStreamOperatorTestHarness<Split> testHarnessCopy2 =
+                    new AbstractStreamOperatorTestHarness<>(srcCopy2, 1, 1, 0);
+            testHarnessCopy2.open();
+            testReadSplit(functionCopy2, () -> null, 3, 3, 3);
+        }
+    }
+
+    @Test
+    public void testReadOperator() throws Exception {
+        ReadOperator readOperator = new ReadOperator(table.newReadBuilder());
+        OneInputStreamOperatorTestHarness<Split, RowData> harness =
+                new OneInputStreamOperatorTestHarness<>(readOperator);
+        harness.setup(
+                InternalSerializers.create(
+                        RowType.of(new IntType(), new IntType(), new IntType())));
+        writeToTable(1, 1, 1);
+        writeToTable(2, 2, 2);
+        List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+        harness.open();
+        for (Split split : splits) {
+            harness.processElement(new StreamRecord<>(split));
+        }
+        ArrayList<Object> values = new ArrayList<>(harness.getOutput());
+        assertThat(values)
+                .containsExactlyInAnyOrder(
+                        new StreamRecord<>(GenericRowData.of(1, 1, 1)),
+                        new StreamRecord<>(GenericRowData.of(2, 2, 2)));
+    }
+
+    private <T> T testReadSplit(
+            MonitorFunction function,
+            SupplierWithException<T, Exception> beforeClose,
+            int a,
+            int b,
+            int c)
+            throws Exception {
+        Throwable[] error = new Throwable[1];
+        ArrayBlockingQueue<Split> queue = new ArrayBlockingQueue<>(10);
+
+        DummySourceContext sourceContext =
+                new DummySourceContext() {
+                    @Override
+                    public void collect(Split element) {
+                        queue.add(element);
+                    }
+                };
+
+        Thread runner =
+                new Thread(
+                        () -> {
+                            try {
+                                function.run(sourceContext);
+                            } catch (Throwable t) {
+                                t.printStackTrace();
+                                error[0] = t;
+                            }
+                        });
+        runner.start();
+
+        writeToTable(a, b, c);
+
+        Split split = queue.poll(1, TimeUnit.MINUTES);
+        assertThat(readSplit(split)).containsExactlyInAnyOrder(Arrays.asList(a, b, c));
+
+        T t = beforeClose.get();
+        function.cancel();
+        runner.join();
+
+        assertThat(error[0]).isNull();
+
+        return t;
+    }
+
+    private abstract static class DummySourceContext
+            implements SourceFunction.SourceContext<Split> {
+
+        private final Object lock = new Object();
+
+        @Override
+        public void collectWithTimestamp(Split element, long timestamp) {}
+
+        @Override
+        public void emitWatermark(Watermark mark) {}
+
+        @Override
+        public void markAsTemporarilyIdle() {}
+
+        @Override
+        public Object getCheckpointLock() {
+            return lock;
+        }
+
+        @Override
+        public void close() {}
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaSerializerTest.java
new file mode 100644
index 000000000..08bc0c445
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaSerializerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Random;
+
+/** Test for {@link JavaSerializer}. */
+public class JavaSerializerTest extends SerializerTestBase<JavaSerializerTest.TestClass> {
+
+    @Override
+    protected TypeSerializer<TestClass> createSerializer() {
+        return new JavaSerializer<>(TestClass.class);
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @Override
+    protected Class<TestClass> getTypeClass() {
+        return TestClass.class;
+    }
+
+    @Override
+    protected TestClass[] getTestData() {
+        Random rnd = new Random();
+        int rndInt = rnd.nextInt();
+
+        Integer[] integers =
+                new Integer[] {0, 1, -1, Integer.MAX_VALUE, Integer.MIN_VALUE, rndInt, -rndInt};
+        TestClass[] testClasses = new TestClass[integers.length];
+        for (int i = 0; i < integers.length; i++) {
+            testClasses[i] = new TestClass(integers[i]);
+        }
+        return testClasses;
+    }
+
+    static class TestClass implements Serializable {
+        int v;
+
+        public TestClass() {}
+
+        public TestClass(int v) {
+            this.v = v;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestClass testClass = (TestClass) o;
+            return v == testClass.v;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(v);
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaTypeInfoTest.java
similarity index 58%
copy from paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
copy to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaTypeInfoTest.java
index 44f0ce43a..5294f5bd9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaTypeInfoTest.java
@@ -16,17 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.table.source;
+package org.apache.paimon.flink.utils;
 
-import org.apache.paimon.annotation.Public;
-import org.apache.paimon.utils.Restorable;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
 
-/**
- * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
- *
- * <p>NOTE: {@link #checkpoint} will return the next snapshot id.
- *
- * @since 0.4.0
- */
-@Public
-public interface StreamTableScan extends TableScan, Restorable<Long> {}
+import java.io.Serializable;
+
+/** Test for {@link JavaTypeInfo}. */
+class JavaTypeInfoTest extends TypeInformationTestBase<JavaTypeInfo<?>> {
+
+    @Override
+    protected JavaTypeInfo<?>[] getTestData() {
+        return new JavaTypeInfo<?>[] {
+            new JavaTypeInfo<>(TestClass.class), new JavaTypeInfo<>(AlternativeClass.class)
+        };
+    }
+
+    static class TestClass implements Serializable {}
+
+    static class AlternativeClass implements Serializable {}
+}