You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/12/19 02:45:51 UTC

[flink-table-store] branch master updated: [FLINK-30212] Introduce a TableStoreCompactJob class for users to submit compact jobs in Table Store

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 15af711a [FLINK-30212] Introduce a TableStoreCompactJob class for users to submit compact jobs in Table Store
15af711a is described below

commit 15af711a1bf50b2d612b439a27225c8ad42acb1d
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Dec 19 10:45:47 2022 +0800

    [FLINK-30212] Introduce a TableStoreCompactJob class for users to submit compact jobs in Table Store
    
    This closes #437.
---
 .../store/connector/action/CompactAction.java      | 168 +++++++++++++
 .../table/store/connector/action/FlinkActions.java |  80 ++++++
 .../table/store/connector/sink/FlinkSink.java      |   3 +-
 .../sink/FullChangelogStoreSinkWrite.java          |  25 +-
 .../ChangelogWithKeyFileStoreTableITCase.java      |  51 ++++
 .../connector/action/CompactActionITCase.java      | 269 +++++++++++++++++++++
 .../table/store/table/FileDataFilterTestBase.java  |   1 -
 .../flink/table/store/tests/E2eTestBase.java       |   2 +-
 .../table/store/tests/FlinkActionsE2eTest.java     | 138 +++++++++++
 9 files changed, 724 insertions(+), 13 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
new file mode 100644
index 00000000..7b31df19
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.connector.sink.CompactorSinkBuilder;
+import org.apache.flink.table.store.connector.source.CompactorSourceBuilder;
+import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Table compact action for Flink. */
+public class CompactAction {
+
+    private final CompactorSourceBuilder sourceBuilder;
+    private final CompactorSinkBuilder sinkBuilder;
+
+    CompactAction(Path tablePath) {
+        Configuration tableOptions = new Configuration();
+        tableOptions.set(CoreOptions.PATH, tablePath.toString());
+        tableOptions.set(CoreOptions.WRITE_COMPACTION_SKIP, false);
+        FileStoreTable table = FileStoreTableFactory.create(tableOptions);
+
+        sourceBuilder = new CompactorSourceBuilder(tablePath.toString(), table);
+        sinkBuilder = new CompactorSinkBuilder(table);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Java API
+    // ------------------------------------------------------------------------
+
+    public CompactAction withPartitions(List<Map<String, String>> partitions) {
+        sourceBuilder.withPartitions(partitions);
+        return this;
+    }
+
+    public void build(StreamExecutionEnvironment env) {
+        ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
+        boolean isStreaming =
+                conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
+
+        DataStreamSource<RowData> source =
+                sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
+        sinkBuilder.withInput(source).build();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    public static Optional<CompactAction> create(MultipleParameterTool params) {
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        String warehouse = params.get("warehouse");
+        String database = params.get("database");
+        String table = params.get("table");
+        String path = params.get("path");
+
+        Path tablePath = null;
+        int count = 0;
+        if (warehouse != null || database != null || table != null) {
+            if (warehouse == null || database == null || table == null) {
+                System.err.println(
+                        "Warehouse, database and table must be specified all at once.\n"
+                                + "Run compact --help for help.");
+                return Optional.empty();
+            }
+            tablePath = new Path(new Path(warehouse, database + ".db"), table);
+            count++;
+        }
+        if (path != null) {
+            tablePath = new Path(path);
+            count++;
+        }
+
+        if (count != 1) {
+            System.err.println(
+                    "Please specify either \"warehouse, database and table\" or \"path\".\n"
+                            + "Run compact --help for help.");
+            return Optional.empty();
+        }
+
+        CompactAction action = new CompactAction(tablePath);
+
+        if (params.has("partition")) {
+            List<Map<String, String>> partitions = new ArrayList<>();
+            for (String partition : params.getMultiParameter("partition")) {
+                Map<String, String> kvs = new HashMap<>();
+                for (String kvString : partition.split(",")) {
+                    String[] kv = kvString.split("=");
+                    if (kv.length != 2) {
+                        System.err.print(
+                                "Invalid key-value pair \""
+                                        + kvString
+                                        + "\".\n"
+                                        + "Run compact --help for help.");
+                        return Optional.empty();
+                    }
+                    kvs.put(kv[0], kv[1]);
+                }
+                partitions.add(kvs);
+            }
+            action.withPartitions(partitions);
+        }
+
+        return Optional.of(action);
+    }
+
+    public static void printHelp() {
+        System.out.println(
+                "Action \"compact\" runs a dedicated job for compacting specified table.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  compact --warehouse <warehouse-path> --database <database-name> "
+                        + "--table <table-name> [--partition <partition-name>]");
+        System.out.println("  compact --path <table-path> [--partition <partition-name>]");
+        System.out.println();
+
+        System.out.println("Partition name syntax:");
+        System.out.println("  key1=value1,key2=value2,...");
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table");
+        System.out.println(
+                "  compact --path hdfs:///path/to/warehouse/test_db.db/test_table --partition dt=20221126,hh=08");
+        System.out.println(
+                "  compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table "
+                        + "--partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
new file mode 100644
index 00000000..ab907784
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+/** Table maintenance actions for Flink. */
+public class FlinkActions {
+
+    // ------------------------------------------------------------------------
+    //  Java API
+    // ------------------------------------------------------------------------
+
+    public static CompactAction compact(Path tablePath) {
+        return new CompactAction(tablePath);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Flink run methods
+    // ------------------------------------------------------------------------
+
+    public static void main(String[] args) throws Exception {
+        if (args.length < 1) {
+            printHelp();
+            System.exit(1);
+        }
+
+        String action = args[0].toLowerCase();
+        if ("compact".equals(action)) {
+            runCompact(Arrays.copyOfRange(args, 1, args.length));
+        } else {
+            System.err.println("Unknown action \"" + action + "\"");
+            printHelp();
+            System.exit(1);
+        }
+    }
+
+    private static void runCompact(String[] args) throws Exception {
+        Optional<CompactAction> action = CompactAction.create(MultipleParameterTool.fromArgs(args));
+        if (action.isPresent()) {
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            action.get().build(env);
+            env.execute("Compact job: " + String.join(" ", args));
+        } else {
+            System.exit(1);
+        }
+    }
+
+    private static void printHelp() {
+        System.out.println("Usage: <action> [OPTIONS]");
+        System.out.println();
+
+        System.out.println("Available actions:");
+        System.out.println("  compact");
+        System.out.println();
+
+        System.out.println("For detailed options of each action, run <action> --help");
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index 3e9ee855..bad47854 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -57,7 +57,8 @@ public abstract class FlinkSink implements Serializable {
     }
 
     protected StoreSinkWrite.Provider createWriteProvider(String initialCommitUser) {
-        if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+        if (table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION
+                && !table.options().writeCompactionSkip()) {
             long fullCompactionThresholdMs =
                     table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
             return (table, context, ioManager) ->
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
index 58827b4f..e7e9a148 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java
@@ -207,6 +207,20 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
             return Optional.empty();
         }
 
+        if (earliestId > identifierToCheck) {
+            throw new RuntimeException(
+                    String.format(
+                            "Can't find snapshot with identifier %d from user %s. "
+                                    + "Earliest snapshot from all users has identifier %d. "
+                                    + "This is rare but it might be that snapshots are expiring too fast.\n"
+                                    + "If this exception happens continuously, please consider increasing %s or %s.",
+                            identifierToCheck,
+                            commitUser,
+                            earliestId,
+                            CoreOptions.SNAPSHOT_TIME_RETAINED.key(),
+                            CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key()));
+        }
+
         // We must find the snapshot whose identifier is exactly `identifierToCheck`.
         // We can't just compare with the latest snapshot identifier by this user (like what we do
         // in `AbstractFileStoreWrite`), because even if the latest snapshot identifier is newer,
@@ -225,16 +239,7 @@ public class FullChangelogStoreSinkWrite extends StoreSinkWriteImpl {
             }
         }
 
-        throw new RuntimeException(
-                String.format(
-                        "All snapshot from user %s has identifier larger than %d. "
-                                + "This is rare but it might be that snapshots are expiring too fast.\n"
-                                + "If this exception happens continuously, please consider increasing "
-                                + CoreOptions.SNAPSHOT_TIME_RETAINED.key()
-                                + " or "
-                                + CoreOptions.SNAPSHOT_NUM_RETAINED_MIN,
-                        commitUser,
-                        identifierToCheck));
+        return Optional.empty();
     }
 
     @Override
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index 607d703f..b759040e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -21,10 +21,13 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.connector.action.FlinkActions;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -115,6 +118,14 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
         return sEnv;
     }
 
+    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        env.getCheckpointConfig()
+                .setCheckpointInterval(ThreadLocalRandom.current().nextInt(900) + 100);
+        return env;
+    }
+
     // ------------------------------------------------------------------------
     //  Constructed Tests
     // ------------------------------------------------------------------------
@@ -225,6 +236,13 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
         testFullCompactionChangelogProducerRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
     }
 
+    @Test
+    public void testStandAloneFullCompactJobRandom() throws Exception {
+        TableEnvironment sEnv = createStreamingTableEnvironment();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3), random.nextBoolean());
+    }
+
     private static final int NUM_PARTS = 4;
     private static final int NUM_KEYS = 64;
     private static final int NUM_VALUES = 1024;
@@ -253,6 +271,39 @@ public class ChangelogWithKeyFileStoreTableITCase extends TestBaseUtils {
         // if we can first read complete records then read incremental records correctly
         Thread.sleep(ThreadLocalRandom.current().nextInt(5000));
 
+        checkFullCompactionTestResult(numProducers);
+    }
+
+    private void testStandAloneFullCompactJobRandom(
+            TableEnvironment tEnv, int numProducers, boolean enableConflicts) throws Exception {
+        testRandom(
+                tEnv,
+                numProducers,
+                false,
+                "'bucket' = '4',"
+                        + "'changelog-producer' = 'full-compaction',"
+                        + "'changelog-producer.compaction-interval' = '1s',"
+                        + "'write.compaction-skip' = 'true'");
+
+        // sleep for a random amount of time to check
+        // if stand-alone compactor job can find first snapshot to compact correctly
+        Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+
+        for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
+            StreamExecutionEnvironment env = createStreamExecutionEnvironment();
+            env.setParallelism(2);
+            FlinkActions.compact(new Path(path + "/default.db/T")).build(env);
+            env.executeAsync();
+        }
+
+        // sleep for a random amount of time to check
+        // if we can first read complete records then read incremental records correctly
+        Thread.sleep(ThreadLocalRandom.current().nextInt(2500));
+
+        checkFullCompactionTestResult(numProducers);
+    }
+
+    private void checkFullCompactionTestResult(int numProducers) throws Exception {
         TableEnvironment sEnv = createStreamingTableEnvironment();
         sEnv.getConfig()
                 .getConfiguration()
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
new file mode 100644
index 00000000..332d04c7
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** IT cases for {@link CompactAction}. */
+public class CompactActionITCase extends AbstractTestBase {
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType(),
+                        DataTypes.INT().getLogicalType()
+                    },
+                    new String[] {"dt", "hh", "k", "v"});
+
+    private Path tablePath;
+    private String commitUser;
+
+    @Before
+    public void before() throws IOException {
+        tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @Test(timeout = 60000)
+    public void testBatchCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+
+        FileStoreTable table = createFileStoreTable(options);
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(20221208, 15, 1, 100));
+        write.write(rowData(20221208, 16, 1, 100));
+        write.write(rowData(20221209, 15, 1, 100));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(20221208, 15, 2, 200));
+        write.write(rowData(20221208, 16, 2, 200));
+        write.write(rowData(20221209, 15, 2, 200));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        Assert.assertEquals(2, snapshot.id());
+        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+
+        write.close();
+        commit.close();
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
+        env.execute();
+
+        snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        Assert.assertEquals(3, snapshot.id());
+        Assert.assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
+
+        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        Assert.assertEquals(3, plan.splits().size());
+        for (DataSplit split : plan.splits) {
+            if (split.partition().getInt(1) == 15) {
+                // compacted
+                Assert.assertEquals(1, split.files().size());
+            } else {
+                // not compacted
+                Assert.assertEquals(2, split.files().size());
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testStreamingCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
+        options.put(CoreOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
+        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+        options.put(CoreOptions.WRITE_COMPACTION_SKIP.key(), "true");
+
+        FileStoreTable table = createFileStoreTable(options);
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TableWrite write = table.newWrite(commitUser);
+        TableCommit commit = table.newCommit(commitUser);
+
+        // base records
+        write.write(rowData(20221208, 15, 1, 100));
+        write.write(rowData(20221208, 16, 1, 100));
+        write.write(rowData(20221209, 15, 1, 100));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        Assert.assertEquals(1, snapshot.id());
+        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+
+        // no full compaction has happened, so plan should be empty
+        SnapshotEnumerator snapshotEnumerator =
+                ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
+        DataTableScan.DataFilePlan plan = snapshotEnumerator.enumerate();
+        Assert.assertEquals(1, (long) plan.snapshotId);
+        Assert.assertTrue(plan.splits().isEmpty());
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        env.getCheckpointConfig().setCheckpointInterval(500);
+        new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
+        env.executeAsync();
+
+        while (true) {
+            plan = snapshotEnumerator.enumerate();
+            if (plan != null) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        // first full compaction
+        Assert.assertEquals(2, (long) plan.snapshotId);
+        List<String> actual = getResult(table.newRead(), plan.splits());
+        actual.sort(String::compareTo);
+        Assert.assertEquals(Arrays.asList("+I 20221208|15|1|100", "+I 20221209|15|1|100"), actual);
+
+        // incremental records
+        write.write(rowData(20221208, 15, 1, 101));
+        write.write(rowData(20221208, 16, 1, 101));
+        write.write(rowData(20221209, 15, 1, 101));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.close();
+        commit.close();
+
+        while (true) {
+            plan = snapshotEnumerator.enumerate();
+            if (plan != null) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        // second full compaction
+        Assert.assertEquals(4, (long) plan.snapshotId);
+        actual = getResult(table.newRead(), plan.splits());
+        actual.sort(String::compareTo);
+        Assert.assertEquals(
+                Arrays.asList(
+                        "+U 20221208|15|1|101",
+                        "+U 20221209|15|1|101",
+                        "-U 20221208|15|1|100",
+                        "-U 20221209|15|1|100"),
+                actual);
+    }
+
+    private List<Map<String, String>> getSpecifiedPartitions() {
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put("dt", "20221208");
+        partition1.put("hh", "15");
+
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put("dt", "20221209");
+        partition2.put("hh", "15");
+
+        return Arrays.asList(partition1, partition2);
+    }
+
+    private GenericRowData rowData(Object... values) {
+        return GenericRowData.of(values);
+    }
+
+    private List<String> getResult(TableRead read, List<Split> splits) throws Exception {
+        List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new ArrayList<>();
+        for (Split split : splits) {
+            readers.add(() -> read.createReader(split));
+        }
+        RecordReader<RowData> recordReader = ConcatRecordReader.create(readers);
+        RecordReaderIterator<RowData> iterator = new RecordReaderIterator<>(recordReader);
+        List<String> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            RowData rowData = iterator.next();
+            result.add(rowDataToString(rowData));
+        }
+        iterator.close();
+        return result;
+    }
+
+    private String rowDataToString(RowData rowData) {
+        return String.format(
+                "%s %d|%d|%d|%d",
+                rowData.getRowKind().shortString(),
+                rowData.getInt(0),
+                rowData.getInt(1),
+                rowData.getInt(2),
+                rowData.getInt(3));
+    }
+
+    private FileStoreTable createFileStoreTable(Map<String, String> options) throws Exception {
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        TableSchema tableSchema =
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Arrays.asList("dt", "hh"),
+                                Arrays.asList("dt", "hh", "k"),
+                                options,
+                                ""));
+        return FileStoreTableFactory.create(tablePath, tableSchema);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
index 47dadf70..5697ff7c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileDataFilterTestBase.java
@@ -258,7 +258,6 @@ public abstract class FileDataFilterTestBase extends SchemaEvolutionTableTestBas
                     // schema0, read all data
                     TableRead read1 =
                             table.newRead().withFilter(PredicateBuilder.or(predicateList));
-                    System.out.println(getResult(read1, splits, SCHEMA_1_ROW_TO_STRING));
                     assertThat(getResult(read1, splits, SCHEMA_1_ROW_TO_STRING))
                             .hasSameElementsAs(
                                     Arrays.asList(
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
index fe09cdb1..2abae148 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
@@ -77,7 +77,7 @@ public abstract class E2eTestBase {
     private final List<String> currentResults = new ArrayList<>();
 
     protected DockerComposeContainer<?> environment;
-    private ContainerState jobManager;
+    protected ContainerState jobManager;
 
     @BeforeEach
     public void before() throws Exception {
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
new file mode 100644
index 00000000..ada923b2
--- /dev/null
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.tests;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+import java.util.UUID;
+
+/** Tests for {@link org.apache.flink.table.store.connector.action.FlinkActions}. */
+public class FlinkActionsE2eTest extends E2eTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkActionsE2eTest.class);
+
+    public FlinkActionsE2eTest() {
+        super(true, false);
+    }
+
+    @Test
+    public void testCompact() throws Exception {
+        String topicName = "ts-topic-" + UUID.randomUUID();
+        createKafkaTopic(topicName, 1);
+        // prepare first part of test data
+        sendKafkaMessage("1.csv", "20221205,1,100\n20221206,1,100\n20221207,1,100", topicName);
+
+        String testDataSourceDdl =
+                String.format(
+                        "CREATE TEMPORARY TABLE test_source (\n"
+                                + "    dt STRING,\n"
+                                + "    k INT,\n"
+                                + "    v INT"
+                                + ") WITH (\n"
+                                + "    'connector' = 'kafka',\n"
+                                + "    'properties.bootstrap.servers' = 'kafka:9092',\n"
+                                + "    'properties.group.id' = 'testGroup',\n"
+                                + "    'scan.startup.mode' = 'earliest-offset',\n"
+                                + "    'topic' = '%s',\n"
+                                + "    'format' = 'csv'\n"
+                                + ");",
+                        topicName);
+
+        String warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
+        String catalogDdl =
+                String.format(
+                        "CREATE CATALOG ts_catalog WITH (\n"
+                                + "    'type' = 'table-store',\n"
+                                + "    'warehouse' = '%s'\n"
+                                + ");",
+                        warehousePath);
+
+        String useCatalogCmd = "USE CATALOG ts_catalog;";
+
+        String tableDdl =
+                "CREATE TABLE IF NOT EXISTS ts_table (\n"
+                        + "    dt STRING,\n"
+                        + "    k INT,\n"
+                        + "    v INT,\n"
+                        + "    PRIMARY KEY (dt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (dt) WITH (\n"
+                        + "    'changelog-producer' = 'full-compaction',\n"
+                        + "    'changelog-producer.compaction-interval' = '1s',\n"
+                        + "    'write.compaction-skip' = 'true'\n"
+                        + ");";
+
+        // insert data into table store
+        runSql(
+                "SET 'execution.checkpointing.interval' = '1s';\n"
+                        + "INSERT INTO ts_table SELECT * FROM test_source;",
+                catalogDdl,
+                useCatalogCmd,
+                tableDdl,
+                testDataSourceDdl);
+
+        // run stand-alone compact job
+        Container.ExecResult execResult =
+                jobManager.execInContainer(
+                        "bin/flink",
+                        "run",
+                        "-c",
+                        "org.apache.flink.table.store.connector.action.FlinkActions",
+                        "-D",
+                        "execution.checkpointing.interval=1s",
+                        "--detached",
+                        "lib/flink-table-store.jar",
+                        "compact",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "ts_table",
+                        "--partition",
+                        "dt=20221205",
+                        "--partition",
+                        "dt=20221206");
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        // read all data from table store
+        runSql(
+                "INSERT INTO result1 SELECT * FROM ts_table;",
+                catalogDdl,
+                useCatalogCmd,
+                tableDdl,
+                createResultSink("result1", "dt STRING, k INT, v INT"));
+
+        // check that first part of test data are compacted
+        checkResult("20221205, 1, 100", "20221206, 1, 100");
+
+        // prepare second part of test data
+        sendKafkaMessage("2.csv", "20221205,1,101\n20221206,1,101\n20221207,1,101", topicName);
+
+        // check that second part of test data are compacted
+        checkResult("20221205, 1, 101", "20221206, 1, 101");
+    }
+
+    private void runSql(String sql, String... ddls) throws Exception {
+        runSql(String.join("\n", ddls) + "\n" + sql);
+    }
+}