You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/12 12:24:50 UTC

[flink-table-store] branch master updated (23b523cd -> 20dd568f)

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


    from 23b523cd [FLINK-30341] Document audit log table
     new e367d987 [hotfix] Set checkpointing.randomization to false to avoid random unaligned checkpoints in tests and also delete useless TableEnvironmentTestUtils
     new 20dd568f [FLINK-30209] Introduce StaticCompactorSource and ContinuousCompactorSource for stand-alone compact jobs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/source/CompactorSourceBuilder.java   | 132 +++++++++
 .../source/ContinuousFileStoreSource.java          |  20 +-
 .../store/connector/source/FlinkSourceBuilder.java |   5 +-
 .../connector/source/StaticFileStoreSource.java    |  16 +-
 .../table/store/connector/CatalogITCaseBase.java   |   8 +-
 .../ChangelogWithKeyFileStoreTableITCase.java      |   5 +-
 .../CompositePkAndMultiPartitionedTableITCase.java |  19 +-
 .../store/connector/FileStoreTableITCase.java      |   8 +-
 .../table/store/connector/LookupJoinITCase.java    |   4 +-
 .../table/store/connector/MappingTableITCase.java  |   4 +-
 .../store/connector/ReadWriteTableITCase.java      |  17 +-
 .../store/connector/ReadWriteTableTestBase.java    |   5 +-
 .../store/connector/StreamingWarehouseITCase.java  |   4 +-
 .../store/connector/TableEnvironmentTestUtils.java |  54 ----
 .../table/store/connector/TableStoreTestBase.java  |   3 +-
 .../store/connector/sink/SinkSavepointITCase.java  |   5 +-
 .../connector/source/CompactorSourceITCase.java    | 294 +++++++++++++++++++++
 ...ava => ContinuousCompactorFollowUpScanner.java} |  15 +-
 ...ava => ContinuousCompactorStartingScanner.java} |  29 +-
 .../ContinuousDataFileSnapshotEnumerator.java      |  12 +
 .../snapshot/StaticDataFileSnapshotEnumerator.java |  14 +-
 .../table/store/table/system/BucketsTable.java     | 133 ++++++++++
 ...=> ContinuousCompactorFollowUpScannerTest.java} |  64 +++--
 ...=> ContinuousCompactorStartingScannerTest.java} |  34 ++-
 .../snapshot/SnapshotEnumeratorTestBase.java       |  17 +-
 pom.xml                                            |   2 +-
 26 files changed, 753 insertions(+), 170 deletions(-)
 create mode 100644 flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
 delete mode 100644 flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java
 create mode 100644 flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
 copy flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/{DeltaFollowUpScanner.java => ContinuousCompactorFollowUpScanner.java} (74%)
 copy flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/{ContinuousLatestStartingScanner.java => ContinuousCompactorStartingScanner.java} (54%)
 create mode 100644 flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
 copy flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/{InputChangelogFollowUpScannerTest.java => ContinuousCompactorFollowUpScannerTest.java} (61%)
 copy flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/{FullStartingScannerTest.java => ContinuousCompactorStartingScannerTest.java} (67%)


[flink-table-store] 01/02: [hotfix] Set checkpointing.randomization to false to avoid random unaligned checkpoints in tests and also delete useless TableEnvironmentTestUtils

Posted by cz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit e367d98740fbccf6b58730adc79bda338ffd60c5
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Dec 8 17:04:05 2022 +0800

    [hotfix] Set checkpointing.randomization to false to avoid random unaligned checkpoints in tests and also delete useless TableEnvironmentTestUtils
---
 .../table/store/connector/CatalogITCaseBase.java   |  8 +---
 .../ChangelogWithKeyFileStoreTableITCase.java      |  5 +-
 .../CompositePkAndMultiPartitionedTableITCase.java | 19 ++++----
 .../store/connector/FileStoreTableITCase.java      |  8 +---
 .../table/store/connector/LookupJoinITCase.java    |  4 +-
 .../table/store/connector/MappingTableITCase.java  |  4 +-
 .../store/connector/ReadWriteTableITCase.java      | 17 ++++---
 .../store/connector/ReadWriteTableTestBase.java    |  5 +-
 .../store/connector/StreamingWarehouseITCase.java  |  4 +-
 .../store/connector/TableEnvironmentTestUtils.java | 54 ----------------------
 .../table/store/connector/TableStoreTestBase.java  |  3 +-
 .../store/connector/sink/SinkSavepointITCase.java  |  5 +-
 pom.xml                                            |  2 +-
 13 files changed, 33 insertions(+), 105 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index 17d066cd..8893464d 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -49,9 +49,7 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
 
     @Before
     public void before() throws IOException {
-        tEnv =
-                TableEnvironmentTestUtils.create(
-                        EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
         String catalog = "TABLE_STORE";
         tEnv.executeSql(
                 String.format(
@@ -59,9 +57,7 @@ public abstract class CatalogITCaseBase extends AbstractTestBase {
                         catalog, TEMPORARY_FOLDER.newFolder().toURI()));
         tEnv.useCatalog(catalog);
 
-        sEnv =
-                TableEnvironmentTestUtils.create(
-                        EnvironmentSettings.newInstance().inStreamingMode().build());
+        sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
         sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
         sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
         sEnv.useCatalog(catalog);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index 42c34e40..607d703f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -99,13 +99,12 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
     }
 
     private TableEnvironment createBatchTableEnvironment() {
-        return TableEnvironmentTestUtils.create(
-                EnvironmentSettings.newInstance().inBatchMode().build());
+        return TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
     }
 
     private TableEnvironment createStreamingTableEnvironment() {
         TableEnvironment sEnv =
-                TableEnvironmentTestUtils.create(
+                TableEnvironment.create(
                         EnvironmentSettings.newInstance().inStreamingMode().build());
         // set checkpoint interval to a random number to emulate different speed of commit
         sEnv.getConfig()
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
index f0886d05..a0a6b3f6 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CompositePkAndMultiPartitionedTableITCase.java
@@ -87,7 +87,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                TableEnvironmentTestUtils.create(buildStreamEnv());
+                StreamTableEnvironment.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -273,7 +273,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                TableEnvironmentTestUtils.create(buildStreamEnv());
+                StreamTableEnvironment.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -402,7 +402,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                TableEnvironmentTestUtils.create(buildStreamEnv());
+                StreamTableEnvironment.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -495,7 +495,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                TableEnvironmentTestUtils.create(buildStreamEnv());
+                StreamTableEnvironment.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -598,7 +598,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                TableEnvironmentTestUtils.create(buildStreamEnv());
+                StreamTableEnvironment.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -737,8 +737,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                TableEnvironmentTestUtils.create(
-                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
@@ -843,8 +842,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                TableEnvironmentTestUtils.create(
-                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
@@ -925,8 +923,7 @@ public class CompositePkAndMultiPartitionedTableITCase extends ReadWriteTableTes
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                TableEnvironmentTestUtils.create(
-                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index d5ff515d..3bb66a62 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -65,12 +65,8 @@ public abstract class FileStoreTableITCase extends AbstractTestBase {
 
     @Before
     public void before() throws IOException {
-        bEnv =
-                TableEnvironmentTestUtils.create(
-                        EnvironmentSettings.newInstance().inBatchMode().build());
-        sEnv =
-                TableEnvironmentTestUtils.create(
-                        EnvironmentSettings.newInstance().inStreamingMode().build());
+        bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
         sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
         path = TEMPORARY_FOLDER.newFolder().toURI().toString();
         prepareConfiguration(bEnv, path);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index a0f502b9..54c7c34f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -43,9 +43,7 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Before
     public void before() throws Exception {
-        env =
-                TableEnvironmentTestUtils.create(
-                        EnvironmentSettings.newInstance().inStreamingMode().build());
+        env = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
         env.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
         env.getConfig()
                 .getConfiguration()
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
index d68ace6d..7f909390 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
@@ -44,9 +44,7 @@ public class MappingTableITCase extends AbstractTestBase {
 
     @Before
     public void before() throws IOException {
-        tEnv =
-                TableEnvironmentTestUtils.create(
-                        EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
         path = TEMPORARY_FOLDER.newFolder().toURI().toString();
     }
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index b4a56c39..0cc8fc07 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -92,7 +92,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                TableEnvironmentTestUtils.create(buildStreamEnv());
+                StreamTableEnvironment.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         BlockingIterator<Row, Row> streamIter =
                 collectAndCheck(
@@ -211,7 +211,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
 
         // test streaming read
         final StreamTableEnvironment streamTableEnv =
-                TableEnvironmentTestUtils.create(buildStreamEnv());
+                StreamTableEnvironment.create(buildStreamEnv());
         registerTable(streamTableEnv, managedTable);
         collectAndCheck(
                         streamTableEnv,
@@ -428,8 +428,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
 
         // batch read to check data refresh
         final StreamTableEnvironment batchTableEnv =
-                TableEnvironmentTestUtils.create(
-                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchTableEnv, managedTable);
         collectAndCheck(
                 batchTableEnv,
@@ -1208,7 +1207,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testQueryContainsDefaultFieldName() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         String id = registerData(Collections.singletonList(changelogRow("+I", 1, "abc")));
         tEnv.executeSql(
                 String.format(
@@ -1234,7 +1233,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testLike() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         List<Row> input =
                 Arrays.asList(
                         changelogRow("+I", 1, "test_1"),
@@ -1335,7 +1334,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testIn() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         List<Row> input =
                 Arrays.asList(
                         changelogRow("+I", 1, "aaa"),
@@ -1416,7 +1415,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     @Test
     public void testChangeBucketNumber() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
-        tEnv = TableEnvironmentTestUtils.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        tEnv = StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         tEnv.executeSql(
                 String.format(
                         "CREATE TABLE IF NOT EXISTS rates (\n"
@@ -1484,7 +1483,7 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
     public void testStreamingInsertOverwrite() throws Exception {
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
         tEnv =
-                TableEnvironmentTestUtils.create(
+                StreamTableEnvironment.create(
                         buildStreamEnv(), EnvironmentSettings.inStreamingMode());
         tEnv.executeSql(
                 String.format(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
index 41867850..db9170f7 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java
@@ -313,7 +313,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
             env = buildBatchEnv();
             builder.inBatchMode();
         }
-        tEnv = TableEnvironmentTestUtils.create(env, builder.build());
+        tEnv = StreamTableEnvironment.create(env, builder.build());
         tEnv.executeSql(helperTableDdl);
 
         String managedTableDdl;
@@ -406,8 +406,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase {
 
     protected void prepareEnvAndOverwrite(String managedTable, String query) throws Exception {
         final StreamTableEnvironment batchEnv =
-                TableEnvironmentTestUtils.create(
-                        buildBatchEnv(), EnvironmentSettings.inBatchMode());
+                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
         registerTable(batchEnv, managedTable);
         batchEnv.executeSql(query).await();
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
index 7516df9d..faa8eef7 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java
@@ -36,9 +36,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class StreamingWarehouseITCase extends ReadWriteTableTestBase {
 
     private final StreamTableEnvironment streamTableEnv =
-            TableEnvironmentTestUtils.create(buildStreamEnv(1));
+            StreamTableEnvironment.create(buildStreamEnv(1));
     private final StreamTableEnvironment batchTableEnv =
-            TableEnvironmentTestUtils.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
+            StreamTableEnvironment.create(buildBatchEnv(1), EnvironmentSettings.inBatchMode());
 
     @Test
     public void testUserStory() throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java
deleted file mode 100644
index f8588ec2..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableEnvironmentTestUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector;
-
-import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-/** Utility class for creating {@link TableEnvironment} in tests. */
-public class TableEnvironmentTestUtils {
-
-    public static TableEnvironment create(EnvironmentSettings settings) {
-        TableEnvironment tEnv = TableEnvironment.create(settings);
-        disableUnalignedCheckpoint(tEnv);
-        return tEnv;
-    }
-
-    public static StreamTableEnvironment create(StreamExecutionEnvironment env) {
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-        disableUnalignedCheckpoint(tEnv);
-        return tEnv;
-    }
-
-    public static StreamTableEnvironment create(
-            StreamExecutionEnvironment env, EnvironmentSettings settings) {
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
-        disableUnalignedCheckpoint(tEnv);
-        return tEnv;
-    }
-
-    private static void disableUnalignedCheckpoint(TableEnvironment tEnv) {
-        tEnv.getConfig()
-                .getConfiguration()
-                .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index 0d5868fa..f8728183 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
@@ -90,7 +91,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
             env.enableCheckpointing(100);
             builder.inStreamingMode();
         }
-        tEnv = TableEnvironmentTestUtils.create(env, builder.build());
+        tEnv = StreamTableEnvironment.create(env, builder.build());
         ((TableEnvironmentImpl) tEnv)
                 .getCatalogManager()
                 .registerCatalog(
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
index ca4614da..1aa4546f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/SinkSavepointITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.store.connector.TableEnvironmentTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
@@ -134,7 +133,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
 
         EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
-        StreamTableEnvironment tEnv = TableEnvironmentTestUtils.create(env, settings);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
         tEnv.getConfig()
                 .getConfiguration()
                 .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(500));
@@ -206,7 +205,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
 
     private void checkRecoverFromSavepointResult(String failingPath) throws Exception {
         EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
-        TableEnvironment tEnv = TableEnvironmentTestUtils.create(settings);
+        TableEnvironment tEnv = TableEnvironment.create(settings);
         // no failure should occur when checking for answer
         FailingAtomicRenameFileSystem.reset(failingName, 0, 1);
 
diff --git a/pom.xml b/pom.xml
index f067f1be..40ee8dac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -444,7 +444,7 @@ under the License.
                     <trimStackTrace>false</trimStackTrace>
                     <systemPropertyVariables>
                         <forkNumber>0${surefire.forkNumber}</forkNumber>
-                        <checkpointing.randomization>true</checkpointing.randomization>
+                        <checkpointing.randomization>false</checkpointing.randomization>
                         <project.basedir>${project.basedir}</project.basedir>
                         <!--suppress MavenModelInspection -->
                         <test.randomization.seed>${test.randomization.seed}</test.randomization.seed>


[flink-table-store] 02/02: [FLINK-30209] Introduce StaticCompactorSource and ContinuousCompactorSource for stand-alone compact jobs

Posted by cz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 20dd568f3c2a9b2afd4cb5eb512cd24976602e27
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Dec 12 16:58:45 2022 +0800

    [FLINK-30209] Introduce StaticCompactorSource and ContinuousCompactorSource for stand-alone compact jobs
---
 .../connector/source/CompactorSourceBuilder.java   | 132 +++++++++
 .../source/ContinuousFileStoreSource.java          |  20 +-
 .../store/connector/source/FlinkSourceBuilder.java |   5 +-
 .../connector/source/StaticFileStoreSource.java    |  16 +-
 .../connector/source/CompactorSourceITCase.java    | 294 +++++++++++++++++++++
 .../ContinuousCompactorFollowUpScanner.java        |  52 ++++
 .../ContinuousCompactorStartingScanner.java        |  58 ++++
 .../ContinuousDataFileSnapshotEnumerator.java      |  12 +
 .../snapshot/StaticDataFileSnapshotEnumerator.java |  14 +-
 .../table/store/table/system/BucketsTable.java     | 133 ++++++++++
 .../ContinuousCompactorFollowUpScannerTest.java    | 117 ++++++++
 .../ContinuousCompactorStartingScannerTest.java    |  83 ++++++
 .../snapshot/SnapshotEnumeratorTestBase.java       |  17 +-
 13 files changed, 939 insertions(+), 14 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
new file mode 100644
index 00000000..649daa7e
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousCompactorFollowUpScanner;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousCompactorStartingScanner;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
+import org.apache.flink.table.store.table.source.snapshot.StaticDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.system.BucketsTable;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
+ * ContinuousFileStoreSource}. This is for stand-alone compactor jobs.
+ */
+public class CompactorSourceBuilder {
+
+    private final String tableIdentifier;
+    private final BucketsTable bucketsTable;
+
+    private boolean isContinuous = false;
+    private StreamExecutionEnvironment env;
+    @Nullable private List<Map<String, String>> specifiedPartitions = null;
+
+    public CompactorSourceBuilder(String tableIdentifier, FileStoreTable table) {
+        this.tableIdentifier = tableIdentifier;
+        this.bucketsTable = new BucketsTable(table);
+    }
+
+    public CompactorSourceBuilder withContinuousMode(boolean isContinuous) {
+        this.isContinuous = isContinuous;
+        return this;
+    }
+
+    public CompactorSourceBuilder withEnv(StreamExecutionEnvironment env) {
+        this.env = env;
+        return this;
+    }
+
+    public CompactorSourceBuilder withPartition(Map<String, String> partition) {
+        return withPartitions(Collections.singletonList(partition));
+    }
+
+    public CompactorSourceBuilder withPartitions(List<Map<String, String>> partitions) {
+        this.specifiedPartitions = partitions;
+        return this;
+    }
+
+    private Source<RowData, ?, ?> buildSource() {
+        Predicate partitionPredicate = null;
+        if (specifiedPartitions != null) {
+            partitionPredicate =
+                    PredicateBuilder.or(
+                            specifiedPartitions.stream()
+                                    .map(p -> PredicateConverter.fromMap(p, bucketsTable.rowType()))
+                                    .toArray(Predicate[]::new));
+        }
+
+        if (isContinuous) {
+            return new ContinuousFileStoreSource(
+                    bucketsTable,
+                    null,
+                    partitionPredicate,
+                    null,
+                    (table, scan, nextSnapshotId) ->
+                            new ContinuousDataFileSnapshotEnumerator(
+                                    table.location(),
+                                    scan,
+                                    new ContinuousCompactorStartingScanner(),
+                                    new ContinuousCompactorFollowUpScanner(),
+                                    nextSnapshotId));
+        } else {
+            return new StaticFileStoreSource(
+                    bucketsTable,
+                    null,
+                    partitionPredicate,
+                    null,
+                    (table, scan) ->
+                            new StaticDataFileSnapshotEnumerator(
+                                    table.location(),
+                                    scan,
+                                    // static compactor source will compact all current files
+                                    new FullStartingScanner()));
+        }
+    }
+
+    public DataStreamSource<RowData> build() {
+        if (env == null) {
+            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
+        }
+
+        LogicalType produceType = bucketsTable.rowType();
+        return env.fromSource(
+                buildSource(),
+                WatermarkStrategy.noWatermarks(),
+                tableIdentifier + "-compact-source",
+                InternalTypeInfo.of(produceType));
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
index 00c52d16..0ab529fb 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java
@@ -34,17 +34,33 @@ import java.util.Collection;
 /** Unbounded {@link FlinkSource} for reading records. It continuously monitors new snapshots. */
 public class ContinuousFileStoreSource extends FlinkSource {
 
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
 
     private final DataTable table;
+    private final ContinuousDataFileSnapshotEnumerator.Factory enumeratorFactory;
 
     public ContinuousFileStoreSource(
             DataTable table,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
+        this(
+                table,
+                projectedFields,
+                predicate,
+                limit,
+                ContinuousDataFileSnapshotEnumerator::create);
+    }
+
+    public ContinuousFileStoreSource(
+            DataTable table,
+            @Nullable int[][] projectedFields,
+            @Nullable Predicate predicate,
+            @Nullable Long limit,
+            ContinuousDataFileSnapshotEnumerator.Factory enumeratorFactory) {
         super(table, projectedFields, predicate, limit);
         this.table = table;
+        this.enumeratorFactory = enumeratorFactory;
     }
 
     @Override
@@ -73,6 +89,6 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 splits,
                 nextSnapshotId,
                 table.options().continuousDiscoveryInterval().toMillis(),
-                ContinuousDataFileSnapshotEnumerator.create(table, scan, nextSnapshotId));
+                enumeratorFactory.create(table, scan, nextSnapshotId));
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index bb007e26..99aa81cb 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -44,7 +44,10 @@ import java.util.Optional;
 
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
 
-/** Source builder to build a Flink {@link Source}. */
+/**
+ * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
+ * ContinuousFileStoreSource}. This is for normal read/write jobs.
+ */
 public class FlinkSourceBuilder {
 
     private final ObjectIdentifier tableIdentifier;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
index c479c322..fe3da842 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSource.java
@@ -37,17 +37,28 @@ import java.util.Collection;
 /** Bounded {@link FlinkSource} for reading records. It does not monitor new snapshots. */
 public class StaticFileStoreSource extends FlinkSource {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 3L;
 
     private final DataTable table;
+    private final StaticDataFileSnapshotEnumerator.Factory enumeratorFactory;
 
     public StaticFileStoreSource(
             DataTable table,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
+        this(table, projectedFields, predicate, limit, StaticDataFileSnapshotEnumerator::create);
+    }
+
+    public StaticFileStoreSource(
+            DataTable table,
+            @Nullable int[][] projectedFields,
+            @Nullable Predicate predicate,
+            @Nullable Long limit,
+            StaticDataFileSnapshotEnumerator.Factory enumeratorFactory) {
         super(table, projectedFields, predicate, limit);
         this.table = table;
+        this.enumeratorFactory = enumeratorFactory;
     }
 
     @Override
@@ -72,8 +83,7 @@ public class StaticFileStoreSource extends FlinkSource {
             FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
 
             // read all splits from the enumerator in one go
-            SnapshotEnumerator snapshotEnumerator =
-                    StaticDataFileSnapshotEnumerator.create(table, table.newScan());
+            SnapshotEnumerator snapshotEnumerator = enumeratorFactory.create(table, scan);
             while (true) {
                 DataTableScan.DataFilePlan plan = snapshotEnumerator.enumerate();
                 if (plan == null) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
new file mode 100644
index 00000000..ed68276c
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/CompactorSourceITCase.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactorSourceBuilder}. */
+public class CompactorSourceITCase extends AbstractTestBase {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType()
+                    },
+                    new String[] {"dt", "hh", "k", "v"});
+
+    private Path tablePath;
+    private String commitUser;
+
+    @Before
+    public void before() throws IOException {
+        tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @Test
+    public void testBatchRead() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(20221208, 15, 1, 1510));
+        write.write(rowData(20221208, 16, 2, 1620));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(20221208, 15, 1, 1511));
+        write.write(rowData(20221209, 15, 1, 1510));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<RowData> compactorSource =
+                new CompactorSourceBuilder("test", table)
+                        .withContinuousMode(false)
+                        .withEnv(env)
+                        .build();
+        CloseableIterator<RowData> it = compactorSource.executeAndCollect();
+
+        List<String> actual = new ArrayList<>();
+        while (it.hasNext()) {
+            actual.add(toString(it.next()));
+        }
+        assertThat(actual)
+                .hasSameElementsAs(
+                        Arrays.asList("+I 20221208|15|0", "+I 20221208|16|0", "+I 20221209|15|0"));
+
+        write.close();
+        commit.close();
+        it.close();
+    }
+
+    @Test
+    public void testStreamingRead() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(20221208, 15, 1, 1510));
+        write.write(rowData(20221208, 16, 2, 1620));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(20221208, 15, 1, 1511));
+        write.write(rowData(20221209, 15, 1, 1510));
+        write.compact(binaryRow(20221208, 15), 0, true);
+        write.compact(binaryRow(20221209, 15), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(20221208, 15, 2, 1520));
+        write.write(rowData(20221208, 16, 2, 1621));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.write(rowData(20221208, 15, 1, 1512));
+        write.write(rowData(20221209, 16, 2, 1620));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<RowData> compactorSource =
+                new CompactorSourceBuilder("test", table)
+                        .withContinuousMode(true)
+                        .withEnv(env)
+                        .build();
+        CloseableIterator<RowData> it = compactorSource.executeAndCollect();
+
+        List<String> actual = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            actual.add(toString(it.next()));
+        }
+        assertThat(actual)
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "+I 20221208|15|0",
+                                "+I 20221208|16|0",
+                                "+I 20221208|15|0",
+                                "+I 20221209|16|0"));
+
+        write.write(rowData(20221209, 15, 2, 1520));
+        write.write(rowData(20221208, 16, 1, 1510));
+        write.write(rowData(20221209, 15, 1, 1511));
+        commit.commit(4, write.prepareCommit(true, 4));
+
+        actual.clear();
+        for (int i = 0; i < 2; i++) {
+            actual.add(toString(it.next()));
+        }
+        assertThat(actual).hasSameElementsAs(Arrays.asList("+I 20221208|16|0", "+I 20221209|15|0"));
+
+        write.close();
+        commit.close();
+
+        write = table.newWrite(commitUser).withOverwrite(true);
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("dt", "20221209");
+        partitionMap.put("hh", "16");
+        commit = table.newCommit(commitUser).withOverwritePartition(partitionMap);
+        write.write(rowData(20221209, 16, 1, 1512));
+        write.write(rowData(20221209, 16, 2, 1622));
+        commit.commit(5, write.prepareCommit(true, 5));
+
+        assertThat(toString(it.next())).isEqualTo("+I 20221209|16|0");
+
+        write.close();
+        commit.close();
+        it.close();
+    }
+
+    @Test
+    public void testStreamingPartitionSpec() throws Exception {
+        testPartitionSpec(
+                true,
+                getSpecifiedPartitions(),
+                Arrays.asList(
+                        "+I 20221208|16|0",
+                        "+I 20221209|15|0",
+                        "+I 20221208|16|0",
+                        "+I 20221209|15|0"));
+    }
+
+    @Test
+    public void testBatchPartitionSpec() throws Exception {
+        testPartitionSpec(
+                false,
+                getSpecifiedPartitions(),
+                Arrays.asList("+I 20221208|16|0", "+I 20221209|15|0"));
+    }
+
+    private List<Map<String, String>> getSpecifiedPartitions() {
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put("dt", "20221208");
+        partition1.put("hh", "16");
+
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put("dt", "20221209");
+        partition2.put("hh", "15");
+
+        return Arrays.asList(partition1, partition2);
+    }
+
+    private void testPartitionSpec(
+            boolean isStreaming,
+            List<Map<String, String>> specifiedPartitions,
+            List<String> expected)
+            throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(20221208, 15, 1, 1510));
+        write.write(rowData(20221208, 16, 2, 1620));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(20221208, 15, 2, 1520));
+        write.write(rowData(20221209, 15, 2, 1520));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(20221208, 15, 1, 1511));
+        write.write(rowData(20221208, 16, 1, 1610));
+        write.write(rowData(20221209, 15, 1, 1510));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<RowData> compactorSource =
+                new CompactorSourceBuilder("test", table)
+                        .withContinuousMode(isStreaming)
+                        .withEnv(env)
+                        .withPartitions(specifiedPartitions)
+                        .build();
+        CloseableIterator<RowData> it = compactorSource.executeAndCollect();
+
+        List<String> actual = new ArrayList<>();
+        for (int i = 0; i < expected.size(); i++) {
+            actual.add(toString(it.next()));
+        }
+        assertThat(actual).hasSameElementsAs(expected);
+
+        write.close();
+        commit.close();
+        it.close();
+    }
+
+    private String toString(RowData rowData) {
+        return String.format(
+                "%s %d|%d|%d",
+                rowData.getRowKind().shortString(),
+                rowData.getInt(0),
+                rowData.getInt(1),
+                rowData.getInt(2));
+    }
+
+    private GenericRowData rowData(Object... values) {
+        return GenericRowData.of(values);
+    }
+
+    private BinaryRowData binaryRow(int dt, int hh) {
+        BinaryRowData b = new BinaryRowData(2);
+        BinaryRowWriter writer = new BinaryRowWriter(b);
+        writer.writeInt(0, dt);
+        writer.writeInt(1, hh);
+        writer.complete();
+        return b;
+    }
+
+    private FileStoreTable createFileStoreTable() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        TableSchema tableSchema =
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Arrays.asList("dt", "hh"),
+                                Arrays.asList("dt", "hh", "k"),
+                                Collections.emptyMap(),
+                                ""));
+        return FileStoreTableFactory.create(tablePath, tableSchema);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
new file mode 100644
index 00000000..74e0f141
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScanner.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link FollowUpScanner} used internally for stand-alone streaming compact job sources. */
+public class ContinuousCompactorFollowUpScanner implements FollowUpScanner {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ContinuousCompactorFollowUpScanner.class);
+
+    @Override
+    public boolean shouldScanSnapshot(Snapshot snapshot) {
+        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND
+                || snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+            return true;
+        }
+
+        LOG.debug(
+                "Next snapshot id {} is neither APPEND nor OVERWRITE, but is {}, check next one.",
+                snapshot.id(),
+                snapshot.commitKind());
+        return false;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(long snapshotId, DataTableScan scan) {
+        return scan.withKind(ScanKind.DELTA).withSnapshot(snapshotId).plan();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScanner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScanner.java
new file mode 100644
index 00000000..ec59d523
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScanner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/** {@link StartingScanner} used internally for stand-alone streaming compact job sources. */
+public class ContinuousCompactorStartingScanner implements StartingScanner {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ContinuousCompactorStartingScanner.class);
+
+    @Override
+    public DataTableScan.DataFilePlan getPlan(SnapshotManager snapshotManager, DataTableScan scan) {
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        if (latestSnapshotId == null || earliestSnapshotId == null) {
+            LOG.debug("There is currently no snapshot. Wait for the snapshot generation.");
+            return null;
+        }
+
+        for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {
+            Snapshot snapshot = snapshotManager.snapshot(id);
+            if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                LOG.debug("Found latest compact snapshot {}, reading from the next snapshot.", id);
+                return new DataTableScan.DataFilePlan(id, Collections.emptyList());
+            }
+        }
+
+        LOG.debug(
+                "No compact snapshot found, reading from the earliest snapshot {}.",
+                earliestSnapshotId);
+        return new DataTableScan.DataFilePlan(earliestSnapshotId - 1, Collections.emptyList());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
index bb9debfa..c53a4244 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/ContinuousDataFileSnapshotEnumerator.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.HashMap;
 
 import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
@@ -186,4 +187,15 @@ public class ContinuousDataFileSnapshotEnumerator implements SnapshotEnumerator
                             + "You can use full compaction changelog producer to support streaming reading.");
         }
     }
+
+    // ------------------------------------------------------------------------
+    //  factory interface
+    // ------------------------------------------------------------------------
+
+    /** Factory to create {@link ContinuousDataFileSnapshotEnumerator}. */
+    public interface Factory extends Serializable {
+
+        ContinuousDataFileSnapshotEnumerator create(
+                DataTable table, DataTableScan scan, @Nullable Long nextSnapshotId);
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
index 582ba9c2..251ad1bf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/StaticDataFileSnapshotEnumerator.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.store.table.source.DataTableScan;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
+
 /** {@link SnapshotEnumerator} for batch read. */
 public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
 
@@ -35,7 +37,7 @@ public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
 
     private boolean hasNext;
 
-    private StaticDataFileSnapshotEnumerator(
+    public StaticDataFileSnapshotEnumerator(
             Path tablePath, DataTableScan scan, StartingScanner startingScanner) {
         this.snapshotManager = new SnapshotManager(tablePath);
         this.scan = scan;
@@ -73,4 +75,14 @@ public class StaticDataFileSnapshotEnumerator implements SnapshotEnumerator {
 
         return new StaticDataFileSnapshotEnumerator(table.location(), scan, startingScanner);
     }
+
+    // ------------------------------------------------------------------------
+    //  factory interface
+    // ------------------------------------------------------------------------
+
+    /** Factory to create {@link StaticDataFileSnapshotEnumerator}. */
+    public interface Factory extends Serializable {
+
+        StaticDataFileSnapshotEnumerator create(DataTable table, DataTableScan scan);
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
new file mode 100644
index 00000000..8f6a22fb
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/BucketsTable.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.system;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.IteratorRecordReader;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.DataTable;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A table to produce modified partitions and buckets for each snapshot.
+ *
+ * <p>Only used internally by stand-alone compact job sources.
+ */
+public class BucketsTable implements DataTable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable wrapped;
+    private final RowType rowType;
+
+    public BucketsTable(FileStoreTable wrapped) {
+        this.wrapped = wrapped;
+
+        RowType partitionType = wrapped.schema().logicalPartitionType();
+        List<RowType.RowField> fields = new ArrayList<>(partitionType.getFields());
+        // same with ManifestEntry.schema
+        fields.add(new RowType.RowField("_BUCKET", new IntType()));
+        this.rowType = new RowType(fields);
+    }
+
+    @Override
+    public Path location() {
+        return wrapped.location();
+    }
+
+    @Override
+    public SnapshotManager snapshotManager() {
+        return wrapped.snapshotManager();
+    }
+
+    @Override
+    public String name() {
+        return "__internal_buckets_" + wrapped.location().getName();
+    }
+
+    @Override
+    public RowType rowType() {
+        return rowType;
+    }
+
+    @Override
+    public DataTableScan newScan() {
+        return wrapped.newScan();
+    }
+
+    @Override
+    public CoreOptions options() {
+        return wrapped.options();
+    }
+
+    @Override
+    public TableRead newRead() {
+        return new BucketsRead();
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new BucketsTable(wrapped.copy(dynamicOptions));
+    }
+
+    private static class BucketsRead implements TableRead {
+
+        @Override
+        public TableRead withFilter(Predicate predicate) {
+            return this;
+        }
+
+        @Override
+        public TableRead withProjection(int[][] projection) {
+            throw new UnsupportedOperationException("BucketsRead does not support projection");
+        }
+
+        @Override
+        public RecordReader<RowData> createReader(Split split) throws IOException {
+            if (!(split instanceof DataSplit)) {
+                throw new IllegalArgumentException("Unsupported split: " + split.getClass());
+            }
+
+            DataSplit dataSplit = (DataSplit) split;
+            RowData row =
+                    new JoinedRowData()
+                            .replace(dataSplit.partition(), GenericRowData.of(dataSplit.bucket()));
+            return new IteratorRecordReader<>(Collections.singletonList(row).iterator());
+        }
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
new file mode 100644
index 00000000..66a21e3b
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.system.BucketsTable;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ContinuousCompactorFollowUpScanner}. */
+public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(2, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(2, 30, 300L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.close();
+        commit.close();
+
+        Map<String, String> overwritePartition = new HashMap<>();
+        overwritePartition.put("pt", "1");
+        write = table.newWrite(commitUser).withOverwrite(true);
+        commit = table.newCommit(commitUser).withOverwritePartition(overwritePartition);
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 20, 201L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.close();
+        commit.close();
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
+
+        BucketsTable bucketsTable = new BucketsTable(table);
+        DataTableScan scan = bucketsTable.newScan();
+        TableRead read = bucketsTable.newRead();
+        ContinuousCompactorFollowUpScanner scanner = new ContinuousCompactorFollowUpScanner();
+
+        Snapshot snapshot = snapshotManager.snapshot(1);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(1, scan);
+        assertThat(plan.snapshotId).isEqualTo(1);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Arrays.asList("+I 1|0", "+I 2|0"));
+
+        snapshot = snapshotManager.snapshot(2);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        plan = scanner.getPlan(2, scan);
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Collections.singletonList("+I 2|0"));
+
+        snapshot = snapshotManager.snapshot(3);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isFalse();
+
+        snapshot = snapshotManager.snapshot(4);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+        assertThat(scanner.shouldScanSnapshot(snapshot)).isTrue();
+        plan = scanner.getPlan(4, scan);
+        assertThat(plan.snapshotId).isEqualTo(4);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Collections.singletonList("+I 1|0"));
+    }
+
+    @Override
+    protected String rowDataToString(RowData rowData) {
+        return String.format(
+                "%s %d|%d",
+                rowData.getRowKind().shortString(), rowData.getInt(0), rowData.getInt(1));
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
new file mode 100644
index 00000000..ef89f24e
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorStartingScannerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.system.BucketsTable;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ContinuousCompactorStartingScanner}. */
+public class ContinuousCompactorStartingScannerTest extends SnapshotEnumeratorTestBase {
+
+    @Test
+    public void testGetPlan() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, 101L));
+        write.write(rowData(1, 30, 300L));
+        write.write(rowData(1, 10, 102L));
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 400L));
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(1, 10, 103L));
+        write.write(rowData(1, 30, 301L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        write.write(rowData(1, 20, 201L));
+        write.write(rowData(1, 40, 401L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        assertThat(snapshotManager.latestSnapshotId()).isEqualTo(5);
+
+        BucketsTable bucketsTable = new BucketsTable(table);
+        ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
+        DataTableScan.DataFilePlan plan = scanner.getPlan(snapshotManager, bucketsTable.newScan());
+        assertThat(plan.snapshotId).isEqualTo(3);
+        assertThat(plan.splits()).isEmpty();
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testNoSnapshot() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        BucketsTable bucketsTable = new BucketsTable(table);
+        ContinuousCompactorStartingScanner scanner = new ContinuousCompactorStartingScanner();
+        assertThat(scanner.getPlan(snapshotManager, bucketsTable.newScan())).isNull();
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
index 9a7f628b..7958a9ed 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumeratorTestBase.java
@@ -102,18 +102,21 @@ public abstract class SnapshotEnumeratorTestBase {
         List<String> result = new ArrayList<>();
         while (iterator.hasNext()) {
             RowData rowData = iterator.next();
-            result.add(
-                    String.format(
-                            "%s %d|%d|%d",
-                            rowData.getRowKind().shortString(),
-                            rowData.getInt(0),
-                            rowData.getInt(1),
-                            rowData.getLong(2)));
+            result.add(rowDataToString(rowData));
         }
         iterator.close();
         return result;
     }
 
+    protected String rowDataToString(RowData rowData) {
+        return String.format(
+                "%s %d|%d|%d",
+                rowData.getRowKind().shortString(),
+                rowData.getInt(0),
+                rowData.getInt(1),
+                rowData.getLong(2));
+    }
+
     protected FileStoreTable createFileStoreTable() throws Exception {
         return createFileStoreTable(new Configuration());
     }