You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/02 11:01:19 UTC

(incubator-paimon) branch master updated: [flink] All action names and argument keys use underline instead of hyphen (#2437)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b7d4e44f [flink] All action names and argument keys use underline instead of hyphen (#2437)
0b7d4e44f is described below

commit 0b7d4e44f9d05acb7eee0b8b1f04e47301b037bb
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Sat Dec 2 19:01:15 2023 +0800

    [flink] All action names and argument keys use underline instead of hyphen (#2437)
---
 .../flink/action/cdc/CdcActionCommonUtils.java     |  14 ++
 .../cdc/kafka/KafkaSyncDatabaseActionFactory.java  |  42 +++---
 .../cdc/kafka/KafkaSyncTableActionFactory.java     |  36 +++--
 .../mongodb/MongoDBSyncDatabaseActionFactory.java  |  35 +++--
 .../cdc/mongodb/MongoDBSyncTableActionFactory.java |  26 ++--
 .../cdc/mysql/MySqlSyncDatabaseActionFactory.java  |  54 ++++---
 .../cdc/mysql/MySqlSyncTableActionFactory.java     |  42 +++---
 .../pulsar/PulsarSyncDatabaseActionFactory.java    |  42 +++---
 .../cdc/pulsar/PulsarSyncTableActionFactory.java   |  36 +++--
 .../action/cdc/kafka/KafkaActionITCaseBase.java    |  23 ++-
 .../cdc/mongodb/MongoDBActionITCaseBase.java       |  23 ++-
 .../action/cdc/mysql/MySqlActionITCaseBase.java    |  23 ++-
 .../action/cdc/pulsar/PulsarActionITCaseBase.java  |  23 ++-
 .../apache/paimon/flink/action/ActionFactory.java  |  39 +++--
 .../paimon/flink/action/CompactActionFactory.java  |  20 +--
 .../flink/action/CompactDatabaseActionFactory.java |  25 ++--
 .../flink/action/CreateTagActionFactory.java       |  18 ++-
 .../paimon/flink/action/DeleteActionFactory.java   |   9 +-
 .../flink/action/DeleteTagActionFactory.java       |  13 +-
 .../flink/action/DropPartitionActionFactory.java   |   9 +-
 .../flink/action/MergeIntoActionFactory.java       |  63 +++++---
 .../flink/action/MigrateTableActionFactory.java    |  17 ++-
 .../flink/action/MultipleParameterToolAdapter.java |  51 +++++++
 .../action/RemoveOrphanFilesActionFactory.java     |  13 +-
 .../flink/action/ResetConsumerActionFactory.java   |  18 ++-
 .../flink/action/RollbackToActionFactory.java      |  13 +-
 .../paimon/flink/action/ActionITCaseBase.java      |  25 ++++
 .../paimon/flink/action/CompactActionITCase.java   |  50 ++++---
 .../flink/action/CompactDatabaseActionITCase.java  |  78 ++++++----
 .../paimon/flink/action/ConsumerActionITCase.java  |  21 ++-
 .../paimon/flink/action/DeleteActionITCase.java    |  24 ++-
 .../flink/action/DropPartitionActionITCase.java    |  25 +++-
 .../paimon/flink/action/MergeIntoActionITCase.java | 163 ++++++++++++++++++---
 .../action/RemoveOrphanFilesActionITCase.java      |  25 +++-
 .../flink/action/RollbackToActionITCase.java       |  25 +++-
 .../SortCompactActionForDynamicBucketITCase.java   |  37 +++--
 .../SortCompactActionForUnawareBucketITCase.java   |  36 ++---
 .../paimon/flink/action/TagActionITCase.java       |  26 +++-
 38 files changed, 834 insertions(+), 428 deletions(-)

diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index ced474cb7..97d4c9a10 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -47,6 +47,20 @@ public class CdcActionCommonUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(CdcActionCommonUtils.class);
 
+    public static final String KAFKA_CONF = "kafka_conf";
+    public static final String MONGODB_CONF = "mongodb_conf";
+    public static final String MYSQL_CONF = "mysql_conf";
+    public static final String PULSAR_CONF = "pulsar_conf";
+    public static final String TABLE_PREFIX = "table_prefix";
+    public static final String TABLE_SUFFIX = "table_suffix";
+    public static final String INCLUDING_TABLES = "including_tables";
+    public static final String EXCLUDING_TABLES = "excluding_tables";
+    public static final String TYPE_MAPPING = "type_mapping";
+    public static final String PARTITION_KEYS = "partition_keys";
+    public static final String PRIMARY_KEYS = "primary_keys";
+    public static final String COMPUTED_COLUMN = "computed_column";
+    public static final String METADATA_COLUMN = "metadata_column";
+
     public static void assertSchemaCompatible(
             TableSchema paimonSchema, List<DataField> sourceTableFields) {
         if (!schemaCompatible(paimonSchema, sourceTableFields)) {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index c20d925fe..8c55c0bfe 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -20,16 +20,22 @@ package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
 /** Factory to create {@link KafkaSyncDatabaseAction}. */
 public class KafkaSyncDatabaseActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "kafka-sync-database";
+    public static final String IDENTIFIER = "kafka_sync_database";
 
     @Override
     public String identifier() {
@@ -37,24 +43,24 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "kafka-conf");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        checkRequiredArgument(params, KAFKA_CONF);
 
         KafkaSyncDatabaseAction action =
                 new KafkaSyncDatabaseAction(
-                        getRequiredValue(params, "warehouse"),
-                        getRequiredValue(params, "database"),
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "kafka-conf"));
-
-        action.withTableConfig(optionalConfigMap(params, "table-conf"))
-                .withTablePrefix(params.get("table-prefix"))
-                .withTableSuffix(params.get("table-suffix"))
-                .includingTables(params.get("including-tables"))
-                .excludingTables(params.get("excluding-tables"));
-
-        if (params.has("type-mapping")) {
-            String[] options = params.get("type-mapping").split(",");
+                        getRequiredValue(params, WAREHOUSE),
+                        getRequiredValue(params, DATABASE),
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, KAFKA_CONF));
+
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+                .withTablePrefix(params.get(TABLE_PREFIX))
+                .withTableSuffix(params.get(TABLE_SUFFIX))
+                .includingTables(params.get(INCLUDING_TABLES))
+                .excludingTables(params.get(EXCLUDING_TABLES));
+
+        if (params.has(TYPE_MAPPING)) {
+            String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
 
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
index e9367b7f0..8a1efcd52 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
@@ -20,18 +20,24 @@ package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.ArrayList;
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
 /** Factory to create {@link KafkaSyncTableAction}. */
 public class KafkaSyncTableActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "kafka-sync-table";
+    public static final String IDENTIFIER = "kafka_sync_table";
 
     @Override
     public String identifier() {
@@ -39,34 +45,34 @@ public class KafkaSyncTableActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        checkRequiredArgument(params, "kafka-conf");
+        checkRequiredArgument(params, KAFKA_CONF);
 
         KafkaSyncTableAction action =
                 new KafkaSyncTableAction(
                         tablePath.f0,
                         tablePath.f1,
                         tablePath.f2,
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "kafka-conf"));
-        action.withTableConfig(optionalConfigMap(params, "table-conf"));
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, KAFKA_CONF));
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
 
-        if (params.has("partition-keys")) {
-            action.withPartitionKeys(params.get("partition-keys").split(","));
+        if (params.has(PARTITION_KEYS)) {
+            action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
         }
 
-        if (params.has("primary-keys")) {
-            action.withPrimaryKeys(params.get("primary-keys").split(","));
+        if (params.has(PRIMARY_KEYS)) {
+            action.withPrimaryKeys(params.get(PRIMARY_KEYS).split(","));
         }
 
-        if (params.has("computed-column")) {
+        if (params.has(COMPUTED_COLUMN)) {
             action.withComputedColumnArgs(
-                    new ArrayList<>(params.getMultiParameter("computed-column")));
+                    new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
         }
 
-        if (params.has("type-mapping")) {
-            String[] options = params.get("type-mapping").split(",");
+        if (params.has(TYPE_MAPPING)) {
+            String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
 
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
index 2232807fa..b97a73552 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
@@ -20,36 +20,41 @@ package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
-
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+
 /** Factory to create {@link MongoDBSyncDatabaseAction}. */
 public class MongoDBSyncDatabaseActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "mongodb-sync-database";
+    public static final String IDENTIFIER = "mongodb_sync_database";
 
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
 
-    public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "mongodb-conf");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        checkRequiredArgument(params, MONGODB_CONF);
 
         MongoDBSyncDatabaseAction action =
                 new MongoDBSyncDatabaseAction(
-                        getRequiredValue(params, "warehouse"),
-                        getRequiredValue(params, "database"),
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "mongodb-conf"));
-
-        action.withTableConfig(optionalConfigMap(params, "table-conf"))
-                .withTablePrefix(params.get("table-prefix"))
-                .withTableSuffix(params.get("table-suffix"))
-                .includingTables(params.get("including-tables"))
-                .excludingTables(params.get("excluding-tables"));
+                        getRequiredValue(params, WAREHOUSE),
+                        getRequiredValue(params, DATABASE),
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, MONGODB_CONF));
+
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+                .withTablePrefix(params.get(TABLE_PREFIX))
+                .withTableSuffix(params.get(TABLE_SUFFIX))
+                .includingTables(params.get(INCLUDING_TABLES))
+                .excludingTables(params.get(EXCLUDING_TABLES));
 
         return Optional.of(action);
     }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
index 0a2b8acc3..819bf7815 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
@@ -20,17 +20,21 @@ package org.apache.paimon.flink.action.cdc.mongodb;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.ArrayList;
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+
 /** Factory to create {@link MongoDBSyncTableAction}. */
 public class MongoDBSyncTableActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "mongodb-sync-table";
+    public static final String IDENTIFIER = "mongodb_sync_table";
 
     @Override
     public String identifier() {
@@ -38,27 +42,27 @@ public class MongoDBSyncTableActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        checkRequiredArgument(params, "mongodb-conf");
+        checkRequiredArgument(params, MONGODB_CONF);
 
         MongoDBSyncTableAction action =
                 new MongoDBSyncTableAction(
                         tablePath.f0,
                         tablePath.f1,
                         tablePath.f2,
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "mongodb-conf"));
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, MONGODB_CONF));
 
-        action.withTableConfig(optionalConfigMap(params, "table-conf"));
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
 
-        if (params.has("partition-keys")) {
-            action.withPartitionKeys(params.get("partition-keys").split(","));
+        if (params.has(PARTITION_KEYS)) {
+            action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
         }
 
-        if (params.has("computed-column")) {
+        if (params.has(COMPUTED_COLUMN)) {
             action.withComputedColumnArgs(
-                    new ArrayList<>(params.getMultiParameter("computed-column")));
+                    new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
         }
 
         return Optional.of(action);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index 93ed27def..785083589 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -21,17 +21,28 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
 import java.util.Arrays;
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
 /** Factory to create {@link MySqlSyncDatabaseAction}. */
 public class MySqlSyncDatabaseActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "mysql-sync-database";
+    public static final String IDENTIFIER = "mysql_sync_database";
+
+    private static final String IGNORE_INCOMPATIBLE = "ignore_incompatible";
+    private static final String MERGE_SHARDS = "merge_shards";
+    private static final String MODE = "mode";
 
     @Override
     public String identifier() {
@@ -39,32 +50,31 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "mysql-conf");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        checkRequiredArgument(params, MYSQL_CONF);
 
         MySqlSyncDatabaseAction action =
                 new MySqlSyncDatabaseAction(
-                        getRequiredValue(params, "warehouse"),
-                        getRequiredValue(params, "database"),
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "mysql-conf"));
+                        getRequiredValue(params, WAREHOUSE),
+                        getRequiredValue(params, DATABASE),
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, MYSQL_CONF));
 
-        action.withTableConfig(optionalConfigMap(params, "table-conf"))
-                .ignoreIncompatible(Boolean.parseBoolean(params.get("ignore-incompatible")))
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+                .ignoreIncompatible(Boolean.parseBoolean(params.get(IGNORE_INCOMPATIBLE)))
                 .mergeShards(
-                        !params.has("merge-shards")
-                                || Boolean.parseBoolean(params.get("merge-shards")))
-                .withTablePrefix(params.get("table-prefix"))
-                .withTableSuffix(params.get("table-suffix"))
-                .includingTables(params.get("including-tables"))
-                .excludingTables(params.get("excluding-tables"))
-                .withMode(MultiTablesSinkMode.fromString(params.get("mode")));
-        if (params.has("metadata-column")) {
-            action.withMetadataKeys(Arrays.asList(params.get("metadata-column").split(",")));
+                        !params.has(MERGE_SHARDS) || Boolean.parseBoolean(params.get(MERGE_SHARDS)))
+                .withTablePrefix(params.get(TABLE_PREFIX))
+                .withTableSuffix(params.get(TABLE_SUFFIX))
+                .includingTables(params.get(INCLUDING_TABLES))
+                .excludingTables(params.get(EXCLUDING_TABLES))
+                .withMode(MultiTablesSinkMode.fromString(params.get(MODE)));
+        if (params.has(METADATA_COLUMN)) {
+            action.withMetadataKeys(Arrays.asList(params.get(METADATA_COLUMN).split(",")));
         }
 
-        if (params.has("type-mapping")) {
-            String[] options = params.get("type-mapping").split(",");
+        if (params.has(TYPE_MAPPING)) {
+            String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
 
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
index 3f518c948..293fdd6bb 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
@@ -20,18 +20,25 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.ArrayList;
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
 /** Factory to create {@link MySqlSyncTableAction}. */
 public class MySqlSyncTableActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "mysql-sync-table";
+    public static final String IDENTIFIER = "mysql_sync_table";
 
     @Override
     public String identifier() {
@@ -39,40 +46,39 @@ public class MySqlSyncTableActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        checkRequiredArgument(params, "mysql-conf");
+        checkRequiredArgument(params, MYSQL_CONF);
 
         MySqlSyncTableAction action =
                 new MySqlSyncTableAction(
                         tablePath.f0,
                         tablePath.f1,
                         tablePath.f2,
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "mysql-conf"));
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, MYSQL_CONF));
 
-        action.withTableConfig(optionalConfigMap(params, "table-conf"));
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
 
-        if (params.has("partition-keys")) {
-            action.withPartitionKeys(params.get("partition-keys").split(","));
+        if (params.has(PARTITION_KEYS)) {
+            action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
         }
 
-        if (params.has("primary-keys")) {
-            action.withPrimaryKeys(params.get("primary-keys").split(","));
+        if (params.has(PRIMARY_KEYS)) {
+            action.withPrimaryKeys(params.get(PRIMARY_KEYS).split(","));
         }
 
-        if (params.has("computed-column")) {
+        if (params.has(COMPUTED_COLUMN)) {
             action.withComputedColumnArgs(
-                    new ArrayList<>(params.getMultiParameter("computed-column")));
+                    new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
         }
 
-        if (params.has("metadata-column")) {
-            action.withMetadataColumns(
-                    new ArrayList<>(params.getMultiParameter("metadata-column")));
+        if (params.has(METADATA_COLUMN)) {
+            action.withMetadataColumns(new ArrayList<>(params.getMultiParameter(METADATA_COLUMN)));
         }
 
-        if (params.has("type-mapping")) {
-            String[] options = params.get("type-mapping").split(",");
+        if (params.has(TYPE_MAPPING)) {
+            String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
 
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
index b7f0d3ce0..023f139a8 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
@@ -20,16 +20,22 @@ package org.apache.paimon.flink.action.cdc.pulsar;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
 /** Factory to create {@link PulsarSyncDatabaseAction}. */
 public class PulsarSyncDatabaseActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "pulsar-sync-database";
+    public static final String IDENTIFIER = "pulsar_sync_database";
 
     @Override
     public String identifier() {
@@ -37,24 +43,24 @@ public class PulsarSyncDatabaseActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "pulsar-conf");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        checkRequiredArgument(params, PULSAR_CONF);
 
         PulsarSyncDatabaseAction action =
                 new PulsarSyncDatabaseAction(
-                        getRequiredValue(params, "warehouse"),
-                        getRequiredValue(params, "database"),
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "pulsar-conf"));
-
-        action.withTableConfig(optionalConfigMap(params, "table-conf"))
-                .withTablePrefix(params.get("table-prefix"))
-                .withTableSuffix(params.get("table-suffix"))
-                .includingTables(params.get("including-tables"))
-                .excludingTables(params.get("excluding-tables"));
-
-        if (params.has("type-mapping")) {
-            String[] options = params.get("type-mapping").split(",");
+                        getRequiredValue(params, WAREHOUSE),
+                        getRequiredValue(params, DATABASE),
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, PULSAR_CONF));
+
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+                .withTablePrefix(params.get(TABLE_PREFIX))
+                .withTableSuffix(params.get(TABLE_SUFFIX))
+                .includingTables(params.get(INCLUDING_TABLES))
+                .excludingTables(params.get(EXCLUDING_TABLES));
+
+        if (params.has(TYPE_MAPPING)) {
+            String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
 
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
index 4876086da..6929687aa 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
@@ -20,18 +20,24 @@ package org.apache.paimon.flink.action.cdc.pulsar;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.ArrayList;
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
 /** Factory to create {@link PulsarSyncTableAction}. */
 public class PulsarSyncTableActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "pulsar-sync-table";
+    public static final String IDENTIFIER = "pulsar_sync_table";
 
     @Override
     public String identifier() {
@@ -39,34 +45,34 @@ public class PulsarSyncTableActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        checkRequiredArgument(params, "pulsar-conf");
+        checkRequiredArgument(params, PULSAR_CONF);
 
         PulsarSyncTableAction action =
                 new PulsarSyncTableAction(
                         tablePath.f0,
                         tablePath.f1,
                         tablePath.f2,
-                        optionalConfigMap(params, "catalog-conf"),
-                        optionalConfigMap(params, "pulsar-conf"));
-        action.withTableConfig(optionalConfigMap(params, "table-conf"));
+                        optionalConfigMap(params, CATALOG_CONF),
+                        optionalConfigMap(params, PULSAR_CONF));
+        action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
 
-        if (params.has("partition-keys")) {
-            action.withPartitionKeys(params.get("partition-keys").split(","));
+        if (params.has(PARTITION_KEYS)) {
+            action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
         }
 
-        if (params.has("primary-keys")) {
-            action.withPrimaryKeys(params.get("primary-keys").split(","));
+        if (params.has(PRIMARY_KEYS)) {
+            action.withPrimaryKeys(params.get(PRIMARY_KEYS).split(","));
         }
 
-        if (params.has("computed-column")) {
+        if (params.has(COMPUTED_COLUMN)) {
             action.withComputedColumnArgs(
-                    new ArrayList<>(params.getMultiParameter("computed-column")));
+                    new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
         }
 
-        if (params.has("type-mapping")) {
-            String[] options = params.get("type-mapping").split(",");
+        if (params.has(TYPE_MAPPING)) {
+            String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
 
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 321e71579..d044b38dd 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -24,7 +24,6 @@ import org.apache.paimon.utils.StringUtils;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -247,6 +246,7 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
             List<String> args =
                     new ArrayList<>(
                             Arrays.asList(
+                                    "kafka_sync_table",
                                     "--warehouse",
                                     warehouse,
                                     "--database",
@@ -264,12 +264,7 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
 
             args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (KafkaSyncTableAction)
-                    new KafkaSyncTableActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(KafkaSyncTableAction.class, args);
         }
     }
 
@@ -296,7 +291,12 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
         public KafkaSyncDatabaseAction build() {
             List<String> args =
                     new ArrayList<>(
-                            Arrays.asList("--warehouse", warehouse, "--database", database));
+                            Arrays.asList(
+                                    "kafka_sync_database",
+                                    "--warehouse",
+                                    warehouse,
+                                    "--database",
+                                    database));
 
             args.addAll(mapToArgs("--kafka-conf", sourceConfig));
             args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -309,12 +309,7 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
 
             args.addAll(listToArgs("--type-mapping", typeMappingModes));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (KafkaSyncDatabaseAction)
-                    new KafkaSyncDatabaseActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(KafkaSyncDatabaseAction.class, args);
         }
     }
 
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
index 4d2fec69d..b4c5eed77 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -24,7 +24,6 @@ import com.mongodb.ConnectionString;
 import com.mongodb.MongoClientSettings;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,6 +110,7 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
             List<String> args =
                     new ArrayList<>(
                             Arrays.asList(
+                                    "mongodb_sync_table",
                                     "--warehouse",
                                     warehouse,
                                     "--database",
@@ -125,12 +125,7 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
 
             args.addAll(listToArgs("--partition-keys", partitionKeys));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (MongoDBSyncTableAction)
-                    new MongoDBSyncTableActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(MongoDBSyncTableAction.class, args);
         }
     }
 
@@ -161,7 +156,12 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
         public MongoDBSyncDatabaseAction build() {
             List<String> args =
                     new ArrayList<>(
-                            Arrays.asList("--warehouse", warehouse, "--database", database));
+                            Arrays.asList(
+                                    "mongodb_sync_database",
+                                    "--warehouse",
+                                    warehouse,
+                                    "--database",
+                                    database));
 
             args.addAll(mapToArgs("--mongodb-conf", sourceConfig));
             args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -172,12 +172,7 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
             args.addAll(nullableToArgs("--including-tables", includingTables));
             args.addAll(nullableToArgs("--excluding-tables", excludingTables));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (MongoDBSyncDatabaseAction)
-                    new MongoDBSyncDatabaseActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(MongoDBSyncDatabaseAction.class, args);
         }
     }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index d6b09f0a3..b9f91b3b4 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.junit.jupiter.api.AfterAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,6 +111,7 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
             List<String> args =
                     new ArrayList<>(
                             Arrays.asList(
+                                    "mysql_sync_table",
                                     "--warehouse",
                                     warehouse,
                                     "--database",
@@ -130,12 +130,7 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
             args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
             args.addAll(listToMultiArgs("--metadata-column", metadataColumns));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (MySqlSyncTableAction)
-                    new MySqlSyncTableActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(MySqlSyncTableAction.class, args);
         }
     }
 
@@ -150,7 +145,12 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
         public MySqlSyncDatabaseAction build() {
             List<String> args =
                     new ArrayList<>(
-                            Arrays.asList("--warehouse", warehouse, "--database", database));
+                            Arrays.asList(
+                                    "mysql_sync_database",
+                                    "--warehouse",
+                                    warehouse,
+                                    "--database",
+                                    database));
 
             args.addAll(mapToArgs("--mysql-conf", sourceConfig));
             args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -167,12 +167,7 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
             args.addAll(listToArgs("--type-mapping", typeMappingModes));
             args.addAll(listToArgs("--metadata-column", metadataColumn));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (MySqlSyncDatabaseAction)
-                    new MySqlSyncDatabaseActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(MySqlSyncDatabaseAction.class, args);
         }
     }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
index c696124c8..5e96b3a5f 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
@@ -23,7 +23,6 @@ import org.apache.paimon.utils.StringUtils;
 
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
@@ -277,6 +276,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
             List<String> args =
                     new ArrayList<>(
                             Arrays.asList(
+                                    "pulsar_sync_table",
                                     "--warehouse",
                                     warehouse,
                                     "--database",
@@ -294,12 +294,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
 
             args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (PulsarSyncTableAction)
-                    new PulsarSyncTableActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(PulsarSyncTableAction.class, args);
         }
     }
 
@@ -326,7 +321,12 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
         public PulsarSyncDatabaseAction build() {
             List<String> args =
                     new ArrayList<>(
-                            Arrays.asList("--warehouse", warehouse, "--database", database));
+                            Arrays.asList(
+                                    "pulsar_sync_database",
+                                    "--warehouse",
+                                    warehouse,
+                                    "--database",
+                                    database));
 
             args.addAll(mapToArgs("--pulsar-conf", sourceConfig));
             args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -339,12 +339,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
 
             args.addAll(listToArgs("--type-mapping", typeMappingModes));
 
-            MultipleParameterTool params =
-                    MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
-            return (PulsarSyncDatabaseAction)
-                    new PulsarSyncDatabaseActionFactory()
-                            .create(params)
-                            .orElseThrow(RuntimeException::new);
+            return createAction(PulsarSyncDatabaseAction.class, args);
         }
     }
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index 8d37d16db..e6ac1dfaa 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -45,10 +45,20 @@ public interface ActionFactory extends Factory {
 
     Logger LOG = LoggerFactory.getLogger(ActionFactory.class);
 
-    Optional<Action> create(MultipleParameterTool params);
+    String HELP = "help";
+    String WAREHOUSE = "warehouse";
+    String DATABASE = "database";
+    String TABLE = "table";
+    String PATH = "path";
+    String CATALOG_CONF = "catalog_conf";
+    String TABLE_CONF = "table_conf";
+    String PARTITION = "partition";
+
+    Optional<Action> create(MultipleParameterToolAdapter params);
 
     static Optional<Action> createAction(String[] args) {
-        String action = args[0].toLowerCase();
+        // to be compatible with old usage
+        String action = args[0].toLowerCase().replaceAll("-", "_");
         String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
         ActionFactory actionFactory;
         try {
@@ -62,8 +72,9 @@ public interface ActionFactory extends Factory {
 
         LOG.info("{} job args: {}", actionFactory.identifier(), String.join(" ", actionArgs));
 
-        MultipleParameterTool params = MultipleParameterTool.fromArgs(actionArgs);
-        if (params.has("help")) {
+        MultipleParameterToolAdapter params =
+                new MultipleParameterToolAdapter(MultipleParameterTool.fromArgs(actionArgs));
+        if (params.has(HELP)) {
             actionFactory.printHelp();
             return Optional.empty();
         }
@@ -85,11 +96,11 @@ public interface ActionFactory extends Factory {
         System.out.println("For detailed options of each action, run <action> --help");
     }
 
-    default Tuple3<String, String, String> getTablePath(MultipleParameterTool params) {
-        String warehouse = params.get("warehouse");
-        String database = params.get("database");
-        String table = params.get("table");
-        String path = params.get("path");
+    default Tuple3<String, String, String> getTablePath(MultipleParameterToolAdapter params) {
+        String warehouse = params.get(WAREHOUSE);
+        String database = params.get(DATABASE);
+        String table = params.get(TABLE);
+        String path = params.get(PATH);
 
         Tuple3<String, String, String> tablePath = null;
         int count = 0;
@@ -118,9 +129,9 @@ public interface ActionFactory extends Factory {
         return tablePath;
     }
 
-    default List<Map<String, String>> getPartitions(MultipleParameterTool params) {
+    default List<Map<String, String>> getPartitions(MultipleParameterToolAdapter params) {
         List<Map<String, String>> partitions = new ArrayList<>();
-        for (String partition : params.getMultiParameter("partition")) {
+        for (String partition : params.getMultiParameter(PARTITION)) {
             Map<String, String> kvs = parseCommaSeparatedKeyValues(partition);
             partitions.add(kvs);
         }
@@ -128,7 +139,7 @@ public interface ActionFactory extends Factory {
         return partitions;
     }
 
-    default Map<String, String> optionalConfigMap(MultipleParameterTool params, String key) {
+    default Map<String, String> optionalConfigMap(MultipleParameterToolAdapter params, String key) {
         if (!params.has(key)) {
             return Collections.emptyMap();
         }
@@ -140,12 +151,12 @@ public interface ActionFactory extends Factory {
         return config;
     }
 
-    default void checkRequiredArgument(MultipleParameterTool params, String key) {
+    default void checkRequiredArgument(MultipleParameterToolAdapter params, String key) {
         Preconditions.checkArgument(
                 params.has(key), "Argument '%s' is required. Run '<action> --help' for help.", key);
     }
 
-    default String getRequiredValue(MultipleParameterTool params, String key) {
+    default String getRequiredValue(MultipleParameterToolAdapter params, String key) {
         checkRequiredArgument(params, key);
         return params.get(key);
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 0e0c4791d..08c1215df 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.List;
 import java.util.Map;
@@ -30,28 +29,31 @@ public class CompactActionFactory implements ActionFactory {
 
     public static final String IDENTIFIER = "compact";
 
+    private static final String ORDER_STRATEGY = "order_strategy";
+    private static final String ORDER_BY = "order_by";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
 
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
 
         CompactAction action;
-        if (params.has("order-strategy")) {
+        if (params.has(ORDER_STRATEGY)) {
             action =
                     new SortCompactAction(
                                     tablePath.f0,
                                     tablePath.f1,
                                     tablePath.f2,
                                     catalogConfig,
-                                    optionalConfigMap(params, "table-conf"))
-                            .withOrderStrategy(params.get("order-strategy"))
-                            .withOrderColumns(getRequiredValue(params, "order-by").split(","));
+                                    optionalConfigMap(params, TABLE_CONF))
+                            .withOrderStrategy(params.get(ORDER_STRATEGY))
+                            .withOrderColumns(getRequiredValue(params, ORDER_BY).split(","));
         } else {
             action =
                     new CompactAction(
@@ -59,10 +61,10 @@ public class CompactActionFactory implements ActionFactory {
                             tablePath.f1,
                             tablePath.f2,
                             catalogConfig,
-                            optionalConfigMap(params, "table-conf"));
+                            optionalConfigMap(params, TABLE_CONF));
         }
 
-        if (params.has("partition")) {
+        if (params.has(PARTITION)) {
             List<Map<String, String>> partitions = getPartitions(params);
             action.withPartitions(partitions);
         }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
index b7a1a776a..590f5283a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -18,14 +18,17 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
 import java.util.Optional;
 
 /** Factory to create {@link CompactDatabaseAction}. */
 public class CompactDatabaseActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "compact-database";
+    public static final String IDENTIFIER = "compact_database";
+
+    private static final String INCLUDING_DATABASES = "including_databases";
+    private static final String INCLUDING_TABLES = "including_tables";
+    private static final String EXCLUDING_TABLES = "excluding_tables";
+    private static final String MODE = "mode";
 
     @Override
     public String identifier() {
@@ -33,17 +36,17 @@ public class CompactDatabaseActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         CompactDatabaseAction action =
                 new CompactDatabaseAction(
-                        getRequiredValue(params, "warehouse"),
-                        optionalConfigMap(params, "catalog-conf"));
+                        getRequiredValue(params, WAREHOUSE),
+                        optionalConfigMap(params, CATALOG_CONF));
 
-        action.includingDatabases(params.get("including-databases"))
-                .includingTables(params.get("including-tables"))
-                .excludingTables(params.get("excluding-tables"))
-                .withDatabaseCompactMode(params.get("mode"))
-                .withTableOptions(optionalConfigMap(params, "table-conf"));
+        action.includingDatabases(params.get(INCLUDING_DATABASES))
+                .includingTables(params.get(INCLUDING_TABLES))
+                .excludingTables(params.get(EXCLUDING_TABLES))
+                .withDatabaseCompactMode(params.get(MODE))
+                .withTableOptions(optionalConfigMap(params, TABLE_CONF));
 
         return Optional.of(action);
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
index eb3ecf619..45b9597df 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Map;
 import java.util.Optional;
@@ -27,7 +26,10 @@ import java.util.Optional;
 /** Factory to create {@link CreateTagAction}. */
 public class CreateTagActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "create-tag";
+    public static final String IDENTIFIER = "create_tag";
+
+    private static final String TAG_NAME = "tag_name";
+    private static final String SNAPSHOT = "snapshot";
 
     @Override
     public String identifier() {
@@ -35,14 +37,14 @@ public class CreateTagActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "tag-name");
-        checkRequiredArgument(params, "snapshot");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        checkRequiredArgument(params, TAG_NAME);
+        checkRequiredArgument(params, SNAPSHOT);
 
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
-        String tagName = params.get("tag-name");
-        long snapshot = Long.parseLong(params.get("snapshot"));
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+        String tagName = params.get(TAG_NAME);
+        long snapshot = Long.parseLong(params.get(SNAPSHOT));
 
         CreateTagAction action =
                 new CreateTagAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java
index 4b5348664..dee3a68de 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Map;
 import java.util.Optional;
@@ -29,22 +28,24 @@ public class DeleteActionFactory implements ActionFactory {
 
     public static final String IDENTIFIER = "delete";
 
+    private static final String WHERE = "where";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
 
-        String filter = params.get("where");
+        String filter = params.get(WHERE);
         if (filter == null) {
             throw new IllegalArgumentException(
                     "Please specify deletion filter. If you want to delete all records, please use overwrite (see doc).");
         }
 
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
 
         DeleteAction action =
                 new DeleteAction(tablePath.f0, tablePath.f1, tablePath.f2, filter, catalogConfig);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java
index f95716a21..8f0c51482 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Map;
 import java.util.Optional;
@@ -27,7 +26,9 @@ import java.util.Optional;
 /** Factory to create {@link DeleteTagAction}. */
 public class DeleteTagActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "delete-tag";
+    public static final String IDENTIFIER = "delete_tag";
+
+    private static final String TAG_NAME = "tag_name";
 
     @Override
     public String identifier() {
@@ -35,12 +36,12 @@ public class DeleteTagActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "tag-name");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        checkRequiredArgument(params, TAG_NAME);
 
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
-        String tagName = params.get("tag-name");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+        String tagName = params.get(TAG_NAME);
 
         DeleteTagAction action =
                 new DeleteTagAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java
index d06fd2bf8..b177caadd 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.List;
 import java.util.Map;
@@ -28,7 +27,7 @@ import java.util.Optional;
 /** Factory to create {@link DropPartitionAction}. */
 public class DropPartitionActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "drop-partition";
+    public static final String IDENTIFIER = "drop_partition";
 
     @Override
     public String identifier() {
@@ -36,13 +35,13 @@ public class DropPartitionActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
 
-        checkRequiredArgument(params, "partition");
+        checkRequiredArgument(params, PARTITION);
         List<Map<String, String>> partitions = getPartitions(params);
 
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
 
         return Optional.of(
                 new DropPartitionAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
index 27fe6069b..2b87b7293 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,7 +30,7 @@ import java.util.stream.Collectors;
 /** Factory to create {@link MergeIntoAction}. */
 public class MergeIntoActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "merge-into";
+    public static final String IDENTIFIER = "merge_into";
 
     public static final String MATCHED_UPSERT = "matched-upsert";
     public static final String NOT_MATCHED_BY_SOURCE_UPSERT = "not-matched-by-source-upsert";
@@ -39,62 +38,78 @@ public class MergeIntoActionFactory implements ActionFactory {
     public static final String NOT_MATCHED_BY_SOURCE_DELETE = "not-matched-by-source-delete";
     public static final String NOT_MATCHED_INSERT = "not-matched-insert";
 
+    private static final String TARGET_AS = "target_as";
+    private static final String SOURCE_SQL = "source_sql";
+    private static final String SOURCE_TABLE = "source_table";
+    private static final String ON = "on";
+    private static final String MERGE_ACTIONS = "merge_actions";
+    private static final String MATCHED_UPSERT_SET = "matched_upsert_set";
+    private static final String MATCHED_UPSERT_CONDITION = "matched_upsert_condition";
+    private static final String NOT_MATCHED_BY_SOURCE_UPSERT_SET =
+            "not_matched_by_source_upsert_set";
+    private static final String NOT_MATCHED_BY_SOURCE_UPSERT_CONDITION =
+            "not_matched_by_source_upsert_condition";
+    private static final String MATCHED_DELETE_CONDITION = "matched_delete_condition";
+    private static final String NOT_MATCHED_BY_SOURCE_DELETE_CONDITION =
+            "not_matched_by_source_delete_condition";
+    private static final String NOT_MATCHED_INSERT_VALUES = "not_matched_insert_values";
+    private static final String NOT_MATCHED_INSERT_CONDITION = "not_matched_insert_condition";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
 
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
 
         MergeIntoAction action =
                 new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);
 
-        if (params.has("target-as")) {
-            action.withTargetAlias(params.get("target-as"));
+        if (params.has(TARGET_AS)) {
+            action.withTargetAlias(params.get(TARGET_AS));
         }
 
-        if (params.has("source-sql")) {
-            Collection<String> sourceSqls = params.getMultiParameter("source-sql");
+        if (params.has(SOURCE_SQL)) {
+            Collection<String> sourceSqls = params.getMultiParameter(SOURCE_SQL);
             action.withSourceSqls(sourceSqls.toArray(new String[0]));
         }
 
-        checkRequiredArgument(params, "source-table");
-        action.withSourceTable(params.get("source-table"));
+        checkRequiredArgument(params, SOURCE_TABLE);
+        action.withSourceTable(params.get(SOURCE_TABLE));
 
-        checkRequiredArgument(params, "on");
-        action.withMergeCondition(params.get("on"));
+        checkRequiredArgument(params, ON);
+        action.withMergeCondition(params.get(ON));
 
         List<String> actions =
-                Arrays.stream(params.get("merge-actions").split(","))
+                Arrays.stream(params.get(MERGE_ACTIONS).split(","))
                         .map(String::trim)
                         .collect(Collectors.toList());
         if (actions.contains(MATCHED_UPSERT)) {
-            checkRequiredArgument(params, "matched-upsert-set");
+            checkRequiredArgument(params, MATCHED_UPSERT_SET);
             action.withMatchedUpsert(
-                    params.get("matched-upsert-condition"), params.get("matched-upsert-set"));
+                    params.get(MATCHED_UPSERT_CONDITION), params.get(MATCHED_UPSERT_SET));
         }
         if (actions.contains(NOT_MATCHED_BY_SOURCE_UPSERT)) {
-            checkRequiredArgument(params, "not-matched-by-source-upsert-set");
+            checkRequiredArgument(params, NOT_MATCHED_BY_SOURCE_UPSERT_SET);
             action.withNotMatchedBySourceUpsert(
-                    params.get("not-matched-by-source-upsert-condition"),
-                    params.get("not-matched-by-source-upsert-set"));
+                    params.get(NOT_MATCHED_BY_SOURCE_UPSERT_CONDITION),
+                    params.get(NOT_MATCHED_BY_SOURCE_UPSERT_SET));
         }
         if (actions.contains(MATCHED_DELETE)) {
-            action.withMatchedDelete(params.get("matched-delete-condition"));
+            action.withMatchedDelete(params.get(MATCHED_DELETE_CONDITION));
         }
         if (actions.contains(NOT_MATCHED_BY_SOURCE_DELETE)) {
-            action.withNotMatchedBySourceDelete(
-                    params.get("not-matched-by-source-delete-condition"));
+            action.withNotMatchedBySourceDelete(params.get(NOT_MATCHED_BY_SOURCE_DELETE_CONDITION));
         }
         if (actions.contains(NOT_MATCHED_INSERT)) {
-            checkRequiredArgument(params, "not-matched-insert-values");
+            checkRequiredArgument(params, NOT_MATCHED_INSERT_VALUES);
             action.withNotMatchedInsert(
-                    params.get("not-matched-insert-condition"),
-                    params.get("not-matched-insert-values"));
+                    params.get(NOT_MATCHED_INSERT_CONDITION),
+                    params.get(NOT_MATCHED_INSERT_VALUES));
         }
 
         validate(action);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
index a9dd8a512..5ab1fb259 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
 import java.util.Map;
 import java.util.Optional;
 
@@ -28,18 +26,21 @@ public class MigrateTableActionFactory implements ActionFactory {
 
     public static final String IDENTIFIER = "migrate_table";
 
+    private static final String SOURCE_TYPE = "source_type";
+    private static final String OPTIONS = "options";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
-        String warehouse = params.get("warehouse");
-        String connector = params.get("source_type");
-        String sourceHiveTable = params.get("table");
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
-        String tableConf = params.get("options");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        String warehouse = params.get(WAREHOUSE);
+        String connector = params.get(SOURCE_TYPE);
+        String sourceHiveTable = params.get(TABLE);
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+        String tableConf = params.get(OPTIONS);
 
         MigrateTableAction migrateTableAction =
                 new MigrateTableAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
new file mode 100644
index 000000000..e03b8cd69
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Collection;
+
+/** An adapter of {@link MultipleParameterTool} that can deal old style key. */
+public class MultipleParameterToolAdapter {
+
+    private final MultipleParameterTool params;
+
+    public MultipleParameterToolAdapter(MultipleParameterTool params) {
+        this.params = params;
+    }
+
+    public boolean has(String key) {
+        return params.has(key) || params.has(fallback(key));
+    }
+
+    public String get(String key) {
+        String result = params.get(key);
+        return result == null ? params.get(fallback(key)) : result;
+    }
+
+    public Collection<String> getMultiParameter(String key) {
+        Collection<String> result = params.getMultiParameter(key);
+        return result == null ? params.getMultiParameter(fallback(key)) : result;
+    }
+
+    public String fallback(String key) {
+        return key.replaceAll("_", "-");
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
index 4e0ca0316..b5d1f290d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Map;
 import java.util.Optional;
@@ -27,7 +26,9 @@ import java.util.Optional;
 /** Factory to create {@link RemoveOrphanFilesAction}. */
 public class RemoveOrphanFilesActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "remove-orphan-files";
+    public static final String IDENTIFIER = "remove_orphan_files";
+
+    private static final String OLDER_THAN = "older_than";
 
     @Override
     public String identifier() {
@@ -35,16 +36,16 @@ public class RemoveOrphanFilesActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
 
         RemoveOrphanFilesAction action =
                 new RemoveOrphanFilesAction(
                         tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);
 
-        if (params.has("older-than")) {
-            action.olderThan(params.get("olderThan"));
+        if (params.has(OLDER_THAN)) {
+            action.olderThan(params.get(OLDER_THAN));
         }
 
         return Optional.of(action);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
index d8e04bf0e..a6e16b7e8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Map;
 import java.util.Optional;
@@ -27,7 +26,10 @@ import java.util.Optional;
 /** Factory to create {@link ResetConsumerAction}. */
 public class ResetConsumerActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "reset-consumer";
+    public static final String IDENTIFIER = "reset_consumer";
+
+    private static final String CONSUMER_ID = "consumer_id";
+    private static final String NEXT_SNAPSHOT = "next_snapshot";
 
     @Override
     public String identifier() {
@@ -35,19 +37,19 @@ public class ResetConsumerActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
-        checkRequiredArgument(params, "consumer-id");
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        checkRequiredArgument(params, CONSUMER_ID);
 
         Tuple3<String, String, String> tablePath = getTablePath(params);
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
-        String consumerId = params.get("consumer-id");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+        String consumerId = params.get(CONSUMER_ID);
 
         ResetConsumerAction action =
                 new ResetConsumerAction(
                         tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, consumerId);
 
-        if (params.has("next-snapshot")) {
-            action.withNextSnapshotIds(Long.parseLong(params.get("next-snapshot")));
+        if (params.has(NEXT_SNAPSHOT)) {
+            action.withNextSnapshotIds(Long.parseLong(params.get(NEXT_SNAPSHOT)));
         }
 
         return Optional.of(action);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java
index 43ea7a406..222d7d569 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Map;
 import java.util.Optional;
@@ -27,7 +26,9 @@ import java.util.Optional;
 /** Factory to create {@link RollbackToAction}. */
 public class RollbackToActionFactory implements ActionFactory {
 
-    public static final String IDENTIFIER = "rollback-to";
+    public static final String IDENTIFIER = "rollback_to";
+
+    private static final String VERSION = "version";
 
     @Override
     public String identifier() {
@@ -35,13 +36,13 @@ public class RollbackToActionFactory implements ActionFactory {
     }
 
     @Override
-    public Optional<Action> create(MultipleParameterTool params) {
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
         Tuple3<String, String, String> tablePath = getTablePath(params);
 
-        checkRequiredArgument(params, "version");
-        String version = params.get("version");
+        checkRequiredArgument(params, VERSION);
+        String version = params.get(VERSION);
 
-        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+        Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
 
         RollbackToAction action =
                 new RollbackToAction(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index fe48881d4..476727699 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -159,6 +159,31 @@ public abstract class ActionITCaseBase extends AbstractTestBase {
         return env;
     }
 
+    protected <T extends ActionBase> T createAction(Class<T> clazz, List<String> args) {
+        return createAction(clazz, args.toArray(new String[0]));
+    }
+
+    protected <T extends ActionBase> T createAction(Class<T> clazz, String... args) {
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            confuseArgs(args, "_", "-");
+        } else {
+            confuseArgs(args, "-", "_");
+        }
+        return ActionFactory.createAction(args)
+                .filter(clazz::isInstance)
+                .map(clazz::cast)
+                .orElseThrow(() -> new RuntimeException("Failed to create action"));
+    }
+
+    // to test compatibility with old usage
+    private void confuseArgs(String[] args, String regex, String replacement) {
+        args[0] = args[0].replaceAll(regex, replacement);
+        for (int i = 1; i < args.length; i += 2) {
+            String arg = args[i].substring(2);
+            args[i] = "--" + arg.replaceAll(regex, replacement);
+        }
+    }
+
     protected void callProcedure(String procedureStatement) {
         // default execution mode
         callProcedure(procedureStatement, true, false);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 9206cc95f..d40d1aeea 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -250,14 +250,17 @@ public class CompactActionITCase extends CompactActionITCaseBase {
                 Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyMap());
 
         CompactAction compactAction =
-                new CompactAction(
-                                warehouse,
-                                database,
-                                tableName,
-                                Collections.emptyMap(),
-                                Collections.singletonMap(
-                                        FlinkConnectorOptions.SCAN_PARALLELISM.key(), "6"))
-                        .withPartitions(getSpecifiedPartitions());
+                createAction(
+                        CompactAction.class,
+                        "compact",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--table_conf",
+                        FlinkConnectorOptions.SCAN_PARALLELISM.key() + "=6");
 
         assertThat(compactAction.table.options().get(FlinkConnectorOptions.SCAN_PARALLELISM.key()))
                 .isEqualTo("6");
@@ -288,10 +291,21 @@ public class CompactActionITCase extends CompactActionITCaseBase {
     private void runAction(boolean isStreaming) throws Exception {
         StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
 
-        new CompactAction(warehouse, database, tableName)
-                .withPartitions(getSpecifiedPartitions())
-                .withStreamExecutionEnvironment(env)
-                .build();
+        CompactAction action =
+                createAction(
+                        CompactAction.class,
+                        "compact",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--partition",
+                        "dt=20221208,hh=15",
+                        "--partition",
+                        "dt=20221209,hh=15");
+        action.withStreamExecutionEnvironment(env).build();
         if (isStreaming) {
             env.executeAsync();
         } else {
@@ -307,16 +321,4 @@ public class CompactActionITCase extends CompactActionITCaseBase {
                 isStreaming,
                 !isStreaming);
     }
-
-    private List<Map<String, String>> getSpecifiedPartitions() {
-        Map<String, String> partition1 = new HashMap<>();
-        partition1.put("dt", "20221208");
-        partition1.put("hh", "15");
-
-        Map<String, String> partition2 = new HashMap<>();
-        partition2.put("dt", "20221209");
-        partition2.put("hh", "15");
-
-        return Arrays.asList(partition1, partition2);
-    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index c0c0d4337..705826bfa 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -132,8 +132,13 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
 
         if (ThreadLocalRandom.current().nextBoolean()) {
             StreamExecutionEnvironment env = buildDefaultEnv(false);
-            new CompactDatabaseAction(warehouse, Collections.emptyMap())
-                    .withDatabaseCompactMode(mode)
+            createAction(
+                            CompactDatabaseAction.class,
+                            "compact_database",
+                            "--warehouse",
+                            warehouse,
+                            "--mode",
+                            mode)
                     .withStreamExecutionEnvironment(env)
                     .build();
             env.execute();
@@ -207,22 +212,30 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
         }
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            StreamExecutionEnvironment env = buildDefaultEnv(true);
+            CompactDatabaseAction action;
             if (mode.equals("divided")) {
-                new CompactDatabaseAction(warehouse, new HashMap<>())
-                        .withStreamExecutionEnvironment(env)
-                        .build();
+                action =
+                        createAction(
+                                CompactDatabaseAction.class,
+                                "compact_database",
+                                "--warehouse",
+                                warehouse);
             } else {
                 // if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use default value, the cost
                 // time in combined mode will be over 1 min
-                new CompactDatabaseAction(warehouse, new HashMap<>())
-                        .withDatabaseCompactMode("combined")
-                        .withTableOptions(
-                                Collections.singletonMap(
-                                        CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"))
-                        .withStreamExecutionEnvironment(env)
-                        .build();
+                action =
+                        createAction(
+                                CompactDatabaseAction.class,
+                                "compact_database",
+                                "--warehouse",
+                                warehouse,
+                                "--mode",
+                                "combined",
+                                "--table_conf",
+                                CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
             }
+            StreamExecutionEnvironment env = buildDefaultEnv(true);
+            action.withStreamExecutionEnvironment(env).build();
             env.executeAsync();
         } else {
             if (mode.equals("divided")) {
@@ -426,8 +439,8 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
 
     private void includingAndExcludingTablesImpl(
             String mode,
-            String includingPattern,
-            String excludesPattern,
+            @Nullable String includingPattern,
+            @Nullable String excludesPattern,
             List<Identifier> includeTables,
             List<Identifier> excludeTables)
             throws Exception {
@@ -477,18 +490,29 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
         }
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            StreamExecutionEnvironment env = buildDefaultEnv(false);
-            CompactDatabaseAction action =
-                    new CompactDatabaseAction(warehouse, Collections.emptyMap())
-                            .includingTables(includingPattern)
-                            .excludingTables(excludesPattern)
-                            .withDatabaseCompactMode(mode);
+            List<String> args = new ArrayList<>();
+            args.add("compact_database");
+            args.add("--warehouse");
+            args.add(warehouse);
+            if (includingPattern != null) {
+                args.add("--including_tables");
+                args.add(includingPattern);
+            }
+            if (excludesPattern != null) {
+                args.add("--excluding_tables");
+                args.add(excludesPattern);
+            }
+            args.add("--mode");
+            args.add(mode);
             if (mode.equals("combined")) {
-                action.withTableOptions(
-                        Collections.singletonMap(
-                                CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"));
+                args.add("--table_conf");
+                args.add(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
             }
-            action.withStreamExecutionEnvironment(env).build();
+
+            StreamExecutionEnvironment env = buildDefaultEnv(false);
+            createAction(CompactDatabaseAction.class, args)
+                    .withStreamExecutionEnvironment(env)
+                    .build();
             env.execute();
         } else {
             if (mode.equals("divided")) {
@@ -588,7 +612,7 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
 
         if (ThreadLocalRandom.current().nextBoolean()) {
             StreamExecutionEnvironment env = buildDefaultEnv(true);
-            new CompactDatabaseAction(warehouse, new HashMap<>())
+            createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", warehouse)
                     .withStreamExecutionEnvironment(env)
                     .build();
             env.executeAsync();
@@ -661,7 +685,7 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
 
         if (ThreadLocalRandom.current().nextBoolean()) {
             StreamExecutionEnvironment env = buildDefaultEnv(false);
-            new CompactDatabaseAction(warehouse, new HashMap<>())
+            createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", warehouse)
                     .withStreamExecutionEnvironment(env)
                     .build();
             env.execute();
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 771a9e59d..2710cd6d9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -80,11 +81,22 @@ public class ConsumerActionITCase extends ActionITCaseBase {
         assertThat(consumer1).isPresent();
         assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
 
+        List<String> args =
+                Arrays.asList(
+                        "reset_consumer",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--consumer_id",
+                        "myid",
+                        "--next_snapshot",
+                        "1");
         // reset consumer
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new ResetConsumerAction(warehouse, database, tableName, Collections.emptyMap(), "myid")
-                    .withNextSnapshotIds(1L)
-                    .run();
+            createAction(ResetConsumerAction.class, args).run();
         } else {
             callProcedure(
                     String.format(
@@ -96,8 +108,7 @@ public class ConsumerActionITCase extends ActionITCaseBase {
 
         // delete consumer
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new ResetConsumerAction(warehouse, database, tableName, Collections.emptyMap(), "myid")
-                    .run();
+            createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
         } else {
             callProcedure(
                     String.format("CALL sys.reset_consumer('%s.%s', 'myid')", database, tableName));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index 7550d5e81..da1cf763a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -73,7 +73,17 @@ public class DeleteActionITCase extends ActionITCaseBase {
         prepareTable();
 
         DeleteAction action =
-                new DeleteAction(warehouse, database, tableName, "k = 1", Collections.emptyMap());
+                createAction(
+                        DeleteAction.class,
+                        "delete",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--where",
+                        "k=1");
 
         BlockingIterator<Row, Row> iterator =
                 testStreamingRead(buildSimpleQuery(tableName), initialRecords);
@@ -112,7 +122,17 @@ public class DeleteActionITCase extends ActionITCaseBase {
                 });
 
         DeleteAction action =
-                new DeleteAction(warehouse, database, tableName, "k < 3", Collections.emptyMap());
+                createAction(
+                        DeleteAction.class,
+                        "delete",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--where",
+                        "k<3");
 
         insertInto(
                 tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'), (4, 'Paimon', 'D')");
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index 36358a0e0..d489a2843 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -56,12 +56,18 @@ public class DropPartitionActionITCase extends ActionITCaseBase {
         FileStoreTable table = prepareTable(hasPk);
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new DropPartitionAction(
+
+            createAction(
+                            DropPartitionAction.class,
+                            "drop_partition",
+                            "--warehouse",
                             warehouse,
+                            "--database",
                             database,
+                            "--table",
                             tableName,
-                            Collections.singletonList(Collections.singletonMap("partKey0", "0")),
-                            Collections.emptyMap())
+                            "--partition",
+                            "partKey0=0")
                     .run();
         } else {
             callProcedure(
@@ -115,12 +121,19 @@ public class DropPartitionActionITCase extends ActionITCaseBase {
         partitions1.put("partKey1", "0");
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new DropPartitionAction(
+            createAction(
+                            DropPartitionAction.class,
+                            "drop_partition",
+                            "--warehouse",
                             warehouse,
+                            "--database",
                             database,
+                            "--table",
                             tableName,
-                            Arrays.asList(partitions0, partitions1),
-                            Collections.emptyMap())
+                            "--partition",
+                            "partKey0=0,partKey1=1",
+                            "--partition",
+                            "partKey0=1,partKey1=0")
                     .run();
         } else {
             callProcedure(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 7d6ce1f33..5012a1732 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -32,6 +32,9 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,6 +43,11 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
@@ -99,7 +107,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
         // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
         //   SET v = v || '_nmu', last_action = 'not_matched_upsert'
         // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
-        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
         // here test if it works when table S is in default and qualified both
         action.withSourceTable("default.S")
                 .withMergeCondition("T.k = S.k AND T.dt = S.dt")
@@ -112,7 +120,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                 .withNotMatchedBySourceDelete("dt >= '02-28'");
 
         validateActionRunResult(
-                action,
+                action.build(),
                 expected,
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
@@ -150,7 +158,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
     @ParameterizedTest(name = "in-default = {0}")
     @ValueSource(booleans = {true, false})
     public void testTargetAlias(boolean inDefault) throws Exception {
-        MergeIntoAction action;
+        MergeIntoActionBuilder action;
 
         if (!inDefault) {
             // create target table in a new database
@@ -160,9 +168,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
             bEnv.executeSql("USE test_db");
             prepareTargetTable(CoreOptions.ChangelogProducer.NONE);
 
-            action = new MergeIntoAction(warehouse, "test_db", "T");
+            action = new MergeIntoActionBuilder(warehouse, "test_db", "T");
         } else {
-            action = new MergeIntoAction(warehouse, database, "T");
+            action = new MergeIntoActionBuilder(warehouse, database, "T");
         }
 
         action.withTargetAlias("TT")
@@ -192,7 +200,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                         changelogRow("+I", 10, "v_10", "creation", "02-28"));
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            validateActionRunResult(action, streamingExpected, batchExpected);
+            validateActionRunResult(action.build(), streamingExpected, batchExpected);
         } else {
             validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
         }
@@ -201,7 +209,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
     @ParameterizedTest(name = "in-default = {0}")
     @ValueSource(booleans = {true, false})
     public void testSourceName(boolean inDefault) throws Exception {
-        MergeIntoAction action = new MergeIntoAction(warehouse, "default", "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, "default", "T");
         String sourceTableName = "S";
 
         if (!inDefault) {
@@ -245,7 +253,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                         changelogRow("+I", 10, "v_10", "creation", "02-28"));
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            validateActionRunResult(action, streamingExpected, batchExpected);
+            validateActionRunResult(action.build(), streamingExpected, batchExpected);
         } else {
             validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
         }
@@ -275,7 +283,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                         useCatalog ? "S" : "test_cat.`default`.S", id);
         String escapeDdl = ddl.replaceAll("'", "''");
 
-        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
 
         if (useCatalog) {
             action.withSourceSqls(catalog, "USE CATALOG test_cat", ddl);
@@ -316,7 +324,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                         changelogRow("+I", 10, "v_10", "creation", "02-28"));
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            validateActionRunResult(action, streamingExpected, batchExpected);
+            validateActionRunResult(action.build(), streamingExpected, batchExpected);
         } else {
             validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
         }
@@ -326,7 +334,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
     @ValueSource(booleans = {true, false})
     public void testMatchedUpsertSetAll(boolean qualified) throws Exception {
         // build MergeIntoAction
-        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
         action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S")
                 .withSourceTable(qualified ? "default.SS" : "SS")
                 .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
@@ -364,7 +372,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                         changelogRow("+I", 10, "v_10", "creation", "02-28"));
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            validateActionRunResult(action, streamingExpected, batchExpected);
+            validateActionRunResult(action.build(), streamingExpected, batchExpected);
         } else {
             validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
         }
@@ -374,7 +382,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
     @ValueSource(booleans = {true, false})
     public void testNotMatchedInsertAll(boolean qualified) throws Exception {
         // build MergeIntoAction
-        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
         action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S")
                 .withSourceTable(qualified ? "default.SS" : "SS")
                 .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
@@ -408,7 +416,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                         changelogRow("+I", 11, "v_11", "unknown", "02-29"));
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            validateActionRunResult(action, streamingExpected, batchExpected);
+            validateActionRunResult(action.build(), streamingExpected, batchExpected);
         } else {
             validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
         }
@@ -449,7 +457,8 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                         Collections.emptyList(),
                         Collections.emptyList());
 
-        assertThatThrownBy(() -> new MergeIntoAction(warehouse, database, nonPkTable))
+        assertThatThrownBy(
+                        () -> new MergeIntoActionBuilder(warehouse, database, nonPkTable).build())
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessage(
                         "merge-into action doesn't support table with no primary keys defined.");
@@ -458,12 +467,12 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
     @Test
     public void testIncompatibleSchema() {
         // build MergeIntoAction
-        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
         action.withSourceTable("S")
                 .withMergeCondition("T.k = S.k AND T.dt = S.dt")
                 .withNotMatchedInsert(null, "S.k, S.v, 0, S.dt");
 
-        assertThatThrownBy(action::run)
+        assertThatThrownBy(() -> action.build().run())
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessage(
                         "The schema of result in action 'not-matched-insert' is invalid.\n"
@@ -479,13 +488,13 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
         sEnv.executeSql("USE test_db");
         prepareSourceTable();
 
-        MergeIntoAction action = new MergeIntoAction(warehouse, "default", "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, "default", "T");
         // the qualified path of source table is absent
         action.withSourceTable("S")
                 .withMergeCondition("T.k = S.k AND T.dt = S.dt")
                 .withMatchedDelete("S.v IS NULL");
 
-        assertThatThrownBy(action::run)
+        assertThatThrownBy(() -> action.build().run())
                 .satisfies(
                         AssertionUtils.anyCauseMatches(
                                 ValidationException.class, "Object 'S' not found"));
@@ -496,7 +505,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
         // drop table S
         sEnv.executeSql("DROP TABLE S");
 
-        MergeIntoAction action = new MergeIntoAction(warehouse, "default", "T");
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, "default", "T");
         action.withSourceSqls(
                         "CREATE DATABASE test_db",
                         "CREATE TEMPORARY TABLE test_db.S (k INT, v STRING, dt STRING) WITH ('connector' = 'values', 'bounded' = 'true')")
@@ -505,7 +514,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                 .withMergeCondition("T.k = S.k AND T.dt = S.dt")
                 .withMatchedDelete("S.v IS NULL");
 
-        assertThatThrownBy(action::run)
+        assertThatThrownBy(() -> action.build().run())
                 .satisfies(
                         AssertionUtils.anyCauseMatches(
                                 ValidationException.class, "Object 'S' not found"));
@@ -645,4 +654,116 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
                                 changelogRow("-D", 9, "v_9", "creation", "02-28"),
                                 changelogRow("-D", 10, "v_10", "creation", "02-28"))));
     }
+
+    private class MergeIntoActionBuilder {
+
+        private final List<String> args;
+        private final List<String> mergeActions;
+
+        public MergeIntoActionBuilder(String warehouse, String database, String table) {
+            this.args =
+                    new ArrayList<>(
+                            Arrays.asList(
+                                    "merge_into",
+                                    "--warehouse",
+                                    warehouse,
+                                    "--database",
+                                    database,
+                                    "--table",
+                                    table));
+            this.mergeActions = new ArrayList<>();
+        }
+
+        public MergeIntoActionBuilder withTargetAlias(String targetAlias) {
+            if (targetAlias != null) {
+                args.add("--target_as");
+                args.add(targetAlias);
+            }
+            return this;
+        }
+
+        public MergeIntoActionBuilder withSourceTable(String sourceTable) {
+            args.add("--source_table");
+            args.add(sourceTable);
+            return this;
+        }
+
+        public MergeIntoActionBuilder withSourceSqls(String... sourceSqls) {
+            if (sourceSqls != null) {
+                for (String sql : sourceSqls) {
+                    args.add("--source_sql");
+                    args.add(sql);
+                }
+            }
+            return this;
+        }
+
+        public MergeIntoActionBuilder withMergeCondition(String mergeCondition) {
+            args.add("--on");
+            args.add(mergeCondition);
+            return this;
+        }
+
+        public MergeIntoActionBuilder withMatchedUpsert(
+                @Nullable String matchedUpsertCondition, String matchedUpsertSet) {
+            mergeActions.add(MATCHED_UPSERT);
+            args.add("--matched_upsert_set");
+            args.add(matchedUpsertSet);
+            if (matchedUpsertCondition != null) {
+                args.add("--matched_upsert_condition");
+                args.add(matchedUpsertCondition);
+            }
+            return this;
+        }
+
+        public MergeIntoActionBuilder withNotMatchedBySourceUpsert(
+                @Nullable String notMatchedBySourceUpsertCondition,
+                String notMatchedBySourceUpsertSet) {
+            mergeActions.add(NOT_MATCHED_BY_SOURCE_UPSERT);
+            args.add("--not_matched_by_source_upsert_set");
+            args.add(notMatchedBySourceUpsertSet);
+            if (notMatchedBySourceUpsertCondition != null) {
+                args.add("--not_matched_by_source_upsert_condition");
+                args.add(notMatchedBySourceUpsertCondition);
+            }
+            return this;
+        }
+
+        public MergeIntoActionBuilder withMatchedDelete(@Nullable String matchedDeleteCondition) {
+            mergeActions.add(MATCHED_DELETE);
+            if (matchedDeleteCondition != null) {
+                args.add("--matched_delete_condition");
+                args.add(matchedDeleteCondition);
+            }
+            return this;
+        }
+
+        public MergeIntoActionBuilder withNotMatchedBySourceDelete(
+                @Nullable String notMatchedBySourceDeleteCondition) {
+            mergeActions.add(NOT_MATCHED_BY_SOURCE_DELETE);
+            if (notMatchedBySourceDeleteCondition != null) {
+                args.add("--not_matched_by_source_delete_condition");
+                args.add(notMatchedBySourceDeleteCondition);
+            }
+            return this;
+        }
+
+        public MergeIntoActionBuilder withNotMatchedInsert(
+                @Nullable String notMatchedInsertCondition, String notMatchedInsertValues) {
+            mergeActions.add(NOT_MATCHED_INSERT);
+            args.add("--not_matched_insert_values");
+            args.add(notMatchedInsertValues);
+            if (notMatchedInsertCondition != null) {
+                args.add("--not_matched_insert_condition");
+                args.add(notMatchedInsertCondition);
+            }
+            return this;
+        }
+
+        MergeIntoAction build() {
+            args.add("--merge_actions");
+            args.add(String.join(",", mergeActions));
+            return createAction(MergeIntoAction.class, args);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 1c7af1f0f..2eba8261f 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -27,7 +27,10 @@ import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThatCode;
 
@@ -54,11 +57,23 @@ public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
 
         writeData(rowData(1L, BinaryString.fromString("Hi")));
 
-        RemoveOrphanFilesAction action =
-                new RemoveOrphanFilesAction(warehouse, database, tableName, Collections.emptyMap());
-        assertThatCode(action::run).doesNotThrowAnyException();
-        assertThatCode(() -> action.olderThan("2023-12-31 23:59:59").run())
-                .doesNotThrowAnyException();
+        List<String> args =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "remove_orphan_files",
+                                "--warehouse",
+                                warehouse,
+                                "--database",
+                                database,
+                                "--table",
+                                tableName));
+        RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args);
+        assertThatCode(action1::run).doesNotThrowAnyException();
+
+        args.add("--older_than");
+        args.add("2023-12-31 23:59:59");
+        RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args);
+        assertThatCode(action2::run).doesNotThrowAnyException();
 
         String withoutOlderThan =
                 String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 6ecac880d..14c8d9f5f 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -67,7 +67,18 @@ public class RollbackToActionITCase extends ActionITCaseBase {
         writeData(rowData(2L, BinaryString.fromString("Flink")));
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new RollbackToAction(warehouse, database, tableName, "2", Collections.emptyMap()).run();
+            createAction(
+                            RollbackToAction.class,
+                            "rollback_to",
+                            "--warehouse",
+                            warehouse,
+                            "--database",
+                            database,
+                            "--table",
+                            tableName,
+                            "--version",
+                            "2")
+                    .run();
         } else {
             callProcedure(String.format("CALL sys.rollback_to('%s.%s', 2)", database, tableName));
         }
@@ -98,7 +109,17 @@ public class RollbackToActionITCase extends ActionITCaseBase {
         table.createTag("tag3", 3);
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new RollbackToAction(warehouse, database, tableName, "tag2", Collections.emptyMap())
+            createAction(
+                            RollbackToAction.class,
+                            "rollback_to",
+                            "--warehouse",
+                            warehouse,
+                            "--database",
+                            database,
+                            "--table",
+                            tableName,
+                            "--version",
+                            "tag2")
                     .run();
         } else {
             callProcedure(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 2c079f379..45dbfcd9d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -41,7 +41,6 @@ import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
@@ -155,15 +154,7 @@ public class SortCompactActionForDynamicBucketITCase extends ActionITCaseBase {
 
     private void zorder(List<String> columns) throws Exception {
         if (RANDOM.nextBoolean()) {
-            new SortCompactAction(
-                            warehouse,
-                            database,
-                            tableName,
-                            Collections.emptyMap(),
-                            Collections.emptyMap())
-                    .withOrderStrategy("zorder")
-                    .withOrderColumns(columns)
-                    .run();
+            createAction("zorder", columns).run();
         } else {
             callProcedure("zorder", columns);
         }
@@ -171,20 +162,28 @@ public class SortCompactActionForDynamicBucketITCase extends ActionITCaseBase {
 
     private void order(List<String> columns) throws Exception {
         if (RANDOM.nextBoolean()) {
-            new SortCompactAction(
-                            warehouse,
-                            database,
-                            tableName,
-                            Collections.emptyMap(),
-                            Collections.emptyMap())
-                    .withOrderStrategy("order")
-                    .withOrderColumns(columns)
-                    .run();
+            createAction("order", columns).run();
         } else {
             callProcedure("order", columns);
         }
     }
 
+    private SortCompactAction createAction(String orderStrategy, List<String> columns) {
+        return createAction(
+                SortCompactAction.class,
+                "compact",
+                "--warehouse",
+                warehouse,
+                "--database",
+                database,
+                "--table",
+                tableName,
+                "--order_strategy",
+                orderStrategy,
+                "--order_by",
+                String.join(",", columns));
+    }
+
     private void callProcedure(String orderStrategy, List<String> orderByColumns) {
         callProcedure(
                 String.format(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index da524c5a7..7801b472c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -232,15 +232,7 @@ public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
 
     private void zorder(List<String> columns) throws Exception {
         if (RANDOM.nextBoolean()) {
-            new SortCompactAction(
-                            warehouse,
-                            database,
-                            tableName,
-                            Collections.emptyMap(),
-                            Collections.emptyMap())
-                    .withOrderStrategy("zorder")
-                    .withOrderColumns(columns)
-                    .run();
+            createAction("zorder", columns).run();
         } else {
             callProcedure("zorder", columns);
         }
@@ -248,20 +240,28 @@ public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
 
     private void order(List<String> columns) throws Exception {
         if (RANDOM.nextBoolean()) {
-            new SortCompactAction(
-                            warehouse,
-                            database,
-                            tableName,
-                            Collections.emptyMap(),
-                            Collections.emptyMap())
-                    .withOrderStrategy("order")
-                    .withOrderColumns(columns)
-                    .run();
+            createAction("order", columns).run();
         } else {
             callProcedure("order", columns);
         }
     }
 
+    private SortCompactAction createAction(String orderStrategy, List<String> columns) {
+        return createAction(
+                SortCompactAction.class,
+                "compact",
+                "--warehouse",
+                warehouse,
+                "--database",
+                database,
+                "--table",
+                tableName,
+                "--order_strategy",
+                orderStrategy,
+                "--order_by",
+                String.join(",", columns));
+    }
+
     private void callProcedure(String orderStrategy, List<String> orderByColumns) {
         callProcedure(
                 String.format(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index 9691e66fe..b5c616707 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -67,7 +67,19 @@ public class TagActionITCase extends ActionITCaseBase {
         TagManager tagManager = new TagManager(table.fileIO(), table.location());
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new CreateTagAction(warehouse, database, tableName, Collections.emptyMap(), "tag2", 2)
+            createAction(
+                            CreateTagAction.class,
+                            "create_tag",
+                            "--warehouse",
+                            warehouse,
+                            "--database",
+                            database,
+                            "--table",
+                            tableName,
+                            "--tag_name",
+                            "tag2",
+                            "--snapshot",
+                            "2")
                     .run();
         } else {
             callProcedure(
@@ -81,7 +93,17 @@ public class TagActionITCase extends ActionITCaseBase {
                 Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
 
         if (ThreadLocalRandom.current().nextBoolean()) {
-            new DeleteTagAction(warehouse, database, tableName, Collections.emptyMap(), "tag2")
+            createAction(
+                            DeleteTagAction.class,
+                            "delete_tag",
+                            "--warehouse",
+                            warehouse,
+                            "--database",
+                            database,
+                            "--table",
+                            tableName,
+                            "--tag_name",
+                            "tag2")
                     .run();
         } else {
             callProcedure(