You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/02 11:01:19 UTC
(incubator-paimon) branch master updated: [flink] All action names and argument keys use underline instead of hyphen (#2437)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0b7d4e44f [flink] All action names and argument keys use underline instead of hyphen (#2437)
0b7d4e44f is described below
commit 0b7d4e44f9d05acb7eee0b8b1f04e47301b037bb
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Sat Dec 2 19:01:15 2023 +0800
[flink] All action names and argument keys use underline instead of hyphen (#2437)
---
.../flink/action/cdc/CdcActionCommonUtils.java | 14 ++
.../cdc/kafka/KafkaSyncDatabaseActionFactory.java | 42 +++---
.../cdc/kafka/KafkaSyncTableActionFactory.java | 36 +++--
.../mongodb/MongoDBSyncDatabaseActionFactory.java | 35 +++--
.../cdc/mongodb/MongoDBSyncTableActionFactory.java | 26 ++--
.../cdc/mysql/MySqlSyncDatabaseActionFactory.java | 54 ++++---
.../cdc/mysql/MySqlSyncTableActionFactory.java | 42 +++---
.../pulsar/PulsarSyncDatabaseActionFactory.java | 42 +++---
.../cdc/pulsar/PulsarSyncTableActionFactory.java | 36 +++--
.../action/cdc/kafka/KafkaActionITCaseBase.java | 23 ++-
.../cdc/mongodb/MongoDBActionITCaseBase.java | 23 ++-
.../action/cdc/mysql/MySqlActionITCaseBase.java | 23 ++-
.../action/cdc/pulsar/PulsarActionITCaseBase.java | 23 ++-
.../apache/paimon/flink/action/ActionFactory.java | 39 +++--
.../paimon/flink/action/CompactActionFactory.java | 20 +--
.../flink/action/CompactDatabaseActionFactory.java | 25 ++--
.../flink/action/CreateTagActionFactory.java | 18 ++-
.../paimon/flink/action/DeleteActionFactory.java | 9 +-
.../flink/action/DeleteTagActionFactory.java | 13 +-
.../flink/action/DropPartitionActionFactory.java | 9 +-
.../flink/action/MergeIntoActionFactory.java | 63 +++++---
.../flink/action/MigrateTableActionFactory.java | 17 ++-
.../flink/action/MultipleParameterToolAdapter.java | 51 +++++++
.../action/RemoveOrphanFilesActionFactory.java | 13 +-
.../flink/action/ResetConsumerActionFactory.java | 18 ++-
.../flink/action/RollbackToActionFactory.java | 13 +-
.../paimon/flink/action/ActionITCaseBase.java | 25 ++++
.../paimon/flink/action/CompactActionITCase.java | 50 ++++---
.../flink/action/CompactDatabaseActionITCase.java | 78 ++++++----
.../paimon/flink/action/ConsumerActionITCase.java | 21 ++-
.../paimon/flink/action/DeleteActionITCase.java | 24 ++-
.../flink/action/DropPartitionActionITCase.java | 25 +++-
.../paimon/flink/action/MergeIntoActionITCase.java | 163 ++++++++++++++++++---
.../action/RemoveOrphanFilesActionITCase.java | 25 +++-
.../flink/action/RollbackToActionITCase.java | 25 +++-
.../SortCompactActionForDynamicBucketITCase.java | 37 +++--
.../SortCompactActionForUnawareBucketITCase.java | 36 ++---
.../paimon/flink/action/TagActionITCase.java | 26 +++-
38 files changed, 834 insertions(+), 428 deletions(-)
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index ced474cb7..97d4c9a10 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -47,6 +47,20 @@ public class CdcActionCommonUtils {
private static final Logger LOG = LoggerFactory.getLogger(CdcActionCommonUtils.class);
+ public static final String KAFKA_CONF = "kafka_conf";
+ public static final String MONGODB_CONF = "mongodb_conf";
+ public static final String MYSQL_CONF = "mysql_conf";
+ public static final String PULSAR_CONF = "pulsar_conf";
+ public static final String TABLE_PREFIX = "table_prefix";
+ public static final String TABLE_SUFFIX = "table_suffix";
+ public static final String INCLUDING_TABLES = "including_tables";
+ public static final String EXCLUDING_TABLES = "excluding_tables";
+ public static final String TYPE_MAPPING = "type_mapping";
+ public static final String PARTITION_KEYS = "partition_keys";
+ public static final String PRIMARY_KEYS = "primary_keys";
+ public static final String COMPUTED_COLUMN = "computed_column";
+ public static final String METADATA_COLUMN = "metadata_column";
+
public static void assertSchemaCompatible(
TableSchema paimonSchema, List<DataField> sourceTableFields) {
if (!schemaCompatible(paimonSchema, sourceTableFields)) {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index c20d925fe..8c55c0bfe 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -20,16 +20,22 @@ package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
/** Factory to create {@link KafkaSyncDatabaseAction}. */
public class KafkaSyncDatabaseActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "kafka-sync-database";
+ public static final String IDENTIFIER = "kafka_sync_database";
@Override
public String identifier() {
@@ -37,24 +43,24 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "kafka-conf");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, KAFKA_CONF);
KafkaSyncDatabaseAction action =
new KafkaSyncDatabaseAction(
- getRequiredValue(params, "warehouse"),
- getRequiredValue(params, "database"),
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "kafka-conf"));
-
- action.withTableConfig(optionalConfigMap(params, "table-conf"))
- .withTablePrefix(params.get("table-prefix"))
- .withTableSuffix(params.get("table-suffix"))
- .includingTables(params.get("including-tables"))
- .excludingTables(params.get("excluding-tables"));
-
- if (params.has("type-mapping")) {
- String[] options = params.get("type-mapping").split(",");
+ getRequiredValue(params, WAREHOUSE),
+ getRequiredValue(params, DATABASE),
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, KAFKA_CONF));
+
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+ .withTablePrefix(params.get(TABLE_PREFIX))
+ .withTableSuffix(params.get(TABLE_SUFFIX))
+ .includingTables(params.get(INCLUDING_TABLES))
+ .excludingTables(params.get(EXCLUDING_TABLES));
+
+ if (params.has(TYPE_MAPPING)) {
+ String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
index e9367b7f0..8a1efcd52 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
@@ -20,18 +20,24 @@ package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.ArrayList;
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
/** Factory to create {@link KafkaSyncTableAction}. */
public class KafkaSyncTableActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "kafka-sync-table";
+ public static final String IDENTIFIER = "kafka_sync_table";
@Override
public String identifier() {
@@ -39,34 +45,34 @@ public class KafkaSyncTableActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- checkRequiredArgument(params, "kafka-conf");
+ checkRequiredArgument(params, KAFKA_CONF);
KafkaSyncTableAction action =
new KafkaSyncTableAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "kafka-conf"));
- action.withTableConfig(optionalConfigMap(params, "table-conf"));
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, KAFKA_CONF));
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
- if (params.has("partition-keys")) {
- action.withPartitionKeys(params.get("partition-keys").split(","));
+ if (params.has(PARTITION_KEYS)) {
+ action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
}
- if (params.has("primary-keys")) {
- action.withPrimaryKeys(params.get("primary-keys").split(","));
+ if (params.has(PRIMARY_KEYS)) {
+ action.withPrimaryKeys(params.get(PRIMARY_KEYS).split(","));
}
- if (params.has("computed-column")) {
+ if (params.has(COMPUTED_COLUMN)) {
action.withComputedColumnArgs(
- new ArrayList<>(params.getMultiParameter("computed-column")));
+ new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
}
- if (params.has("type-mapping")) {
- String[] options = params.get("type-mapping").split(",");
+ if (params.has(TYPE_MAPPING)) {
+ String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
index 2232807fa..b97a73552 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java
@@ -20,36 +20,41 @@ package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
-
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+
/** Factory to create {@link MongoDBSyncDatabaseAction}. */
public class MongoDBSyncDatabaseActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "mongodb-sync-database";
+ public static final String IDENTIFIER = "mongodb_sync_database";
@Override
public String identifier() {
return IDENTIFIER;
}
- public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "mongodb-conf");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, MONGODB_CONF);
MongoDBSyncDatabaseAction action =
new MongoDBSyncDatabaseAction(
- getRequiredValue(params, "warehouse"),
- getRequiredValue(params, "database"),
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "mongodb-conf"));
-
- action.withTableConfig(optionalConfigMap(params, "table-conf"))
- .withTablePrefix(params.get("table-prefix"))
- .withTableSuffix(params.get("table-suffix"))
- .includingTables(params.get("including-tables"))
- .excludingTables(params.get("excluding-tables"));
+ getRequiredValue(params, WAREHOUSE),
+ getRequiredValue(params, DATABASE),
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, MONGODB_CONF));
+
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+ .withTablePrefix(params.get(TABLE_PREFIX))
+ .withTableSuffix(params.get(TABLE_SUFFIX))
+ .includingTables(params.get(INCLUDING_TABLES))
+ .excludingTables(params.get(EXCLUDING_TABLES));
return Optional.of(action);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
index 0a2b8acc3..819bf7815 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java
@@ -20,17 +20,21 @@ package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.ArrayList;
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MONGODB_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+
/** Factory to create {@link MongoDBSyncTableAction}. */
public class MongoDBSyncTableActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "mongodb-sync-table";
+ public static final String IDENTIFIER = "mongodb_sync_table";
@Override
public String identifier() {
@@ -38,27 +42,27 @@ public class MongoDBSyncTableActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- checkRequiredArgument(params, "mongodb-conf");
+ checkRequiredArgument(params, MONGODB_CONF);
MongoDBSyncTableAction action =
new MongoDBSyncTableAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "mongodb-conf"));
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, MONGODB_CONF));
- action.withTableConfig(optionalConfigMap(params, "table-conf"));
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
- if (params.has("partition-keys")) {
- action.withPartitionKeys(params.get("partition-keys").split(","));
+ if (params.has(PARTITION_KEYS)) {
+ action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
}
- if (params.has("computed-column")) {
+ if (params.has(COMPUTED_COLUMN)) {
action.withComputedColumnArgs(
- new ArrayList<>(params.getMultiParameter("computed-column")));
+ new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
}
return Optional.of(action);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index 93ed27def..785083589 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -21,17 +21,28 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
import java.util.Arrays;
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
/** Factory to create {@link MySqlSyncDatabaseAction}. */
public class MySqlSyncDatabaseActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "mysql-sync-database";
+ public static final String IDENTIFIER = "mysql_sync_database";
+
+ private static final String IGNORE_INCOMPATIBLE = "ignore_incompatible";
+ private static final String MERGE_SHARDS = "merge_shards";
+ private static final String MODE = "mode";
@Override
public String identifier() {
@@ -39,32 +50,31 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "mysql-conf");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, MYSQL_CONF);
MySqlSyncDatabaseAction action =
new MySqlSyncDatabaseAction(
- getRequiredValue(params, "warehouse"),
- getRequiredValue(params, "database"),
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "mysql-conf"));
+ getRequiredValue(params, WAREHOUSE),
+ getRequiredValue(params, DATABASE),
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, MYSQL_CONF));
- action.withTableConfig(optionalConfigMap(params, "table-conf"))
- .ignoreIncompatible(Boolean.parseBoolean(params.get("ignore-incompatible")))
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+ .ignoreIncompatible(Boolean.parseBoolean(params.get(IGNORE_INCOMPATIBLE)))
.mergeShards(
- !params.has("merge-shards")
- || Boolean.parseBoolean(params.get("merge-shards")))
- .withTablePrefix(params.get("table-prefix"))
- .withTableSuffix(params.get("table-suffix"))
- .includingTables(params.get("including-tables"))
- .excludingTables(params.get("excluding-tables"))
- .withMode(MultiTablesSinkMode.fromString(params.get("mode")));
- if (params.has("metadata-column")) {
- action.withMetadataKeys(Arrays.asList(params.get("metadata-column").split(",")));
+ !params.has(MERGE_SHARDS) || Boolean.parseBoolean(params.get(MERGE_SHARDS)))
+ .withTablePrefix(params.get(TABLE_PREFIX))
+ .withTableSuffix(params.get(TABLE_SUFFIX))
+ .includingTables(params.get(INCLUDING_TABLES))
+ .excludingTables(params.get(EXCLUDING_TABLES))
+ .withMode(MultiTablesSinkMode.fromString(params.get(MODE)));
+ if (params.has(METADATA_COLUMN)) {
+ action.withMetadataKeys(Arrays.asList(params.get(METADATA_COLUMN).split(",")));
}
- if (params.has("type-mapping")) {
- String[] options = params.get("type-mapping").split(",");
+ if (params.has(TYPE_MAPPING)) {
+ String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
index 3f518c948..293fdd6bb 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
@@ -20,18 +20,25 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.ArrayList;
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MYSQL_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
/** Factory to create {@link MySqlSyncTableAction}. */
public class MySqlSyncTableActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "mysql-sync-table";
+ public static final String IDENTIFIER = "mysql_sync_table";
@Override
public String identifier() {
@@ -39,40 +46,39 @@ public class MySqlSyncTableActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- checkRequiredArgument(params, "mysql-conf");
+ checkRequiredArgument(params, MYSQL_CONF);
MySqlSyncTableAction action =
new MySqlSyncTableAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "mysql-conf"));
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, MYSQL_CONF));
- action.withTableConfig(optionalConfigMap(params, "table-conf"));
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
- if (params.has("partition-keys")) {
- action.withPartitionKeys(params.get("partition-keys").split(","));
+ if (params.has(PARTITION_KEYS)) {
+ action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
}
- if (params.has("primary-keys")) {
- action.withPrimaryKeys(params.get("primary-keys").split(","));
+ if (params.has(PRIMARY_KEYS)) {
+ action.withPrimaryKeys(params.get(PRIMARY_KEYS).split(","));
}
- if (params.has("computed-column")) {
+ if (params.has(COMPUTED_COLUMN)) {
action.withComputedColumnArgs(
- new ArrayList<>(params.getMultiParameter("computed-column")));
+ new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
}
- if (params.has("metadata-column")) {
- action.withMetadataColumns(
- new ArrayList<>(params.getMultiParameter("metadata-column")));
+ if (params.has(METADATA_COLUMN)) {
+ action.withMetadataColumns(new ArrayList<>(params.getMultiParameter(METADATA_COLUMN)));
}
- if (params.has("type-mapping")) {
- String[] options = params.get("type-mapping").split(",");
+ if (params.has(TYPE_MAPPING)) {
+ String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
index b7f0d3ce0..023f139a8 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
@@ -20,16 +20,22 @@ package org.apache.paimon.flink.action.cdc.pulsar;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
/** Factory to create {@link PulsarSyncDatabaseAction}. */
public class PulsarSyncDatabaseActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "pulsar-sync-database";
+ public static final String IDENTIFIER = "pulsar_sync_database";
@Override
public String identifier() {
@@ -37,24 +43,24 @@ public class PulsarSyncDatabaseActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "pulsar-conf");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, PULSAR_CONF);
PulsarSyncDatabaseAction action =
new PulsarSyncDatabaseAction(
- getRequiredValue(params, "warehouse"),
- getRequiredValue(params, "database"),
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "pulsar-conf"));
-
- action.withTableConfig(optionalConfigMap(params, "table-conf"))
- .withTablePrefix(params.get("table-prefix"))
- .withTableSuffix(params.get("table-suffix"))
- .includingTables(params.get("including-tables"))
- .excludingTables(params.get("excluding-tables"));
-
- if (params.has("type-mapping")) {
- String[] options = params.get("type-mapping").split(",");
+ getRequiredValue(params, WAREHOUSE),
+ getRequiredValue(params, DATABASE),
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, PULSAR_CONF));
+
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF))
+ .withTablePrefix(params.get(TABLE_PREFIX))
+ .withTableSuffix(params.get(TABLE_SUFFIX))
+ .includingTables(params.get(INCLUDING_TABLES))
+ .excludingTables(params.get(EXCLUDING_TABLES));
+
+ if (params.has(TYPE_MAPPING)) {
+ String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
index 4876086da..6929687aa 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
@@ -20,18 +20,24 @@ package org.apache.paimon.flink.action.cdc.pulsar;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.ArrayList;
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
+
/** Factory to create {@link PulsarSyncTableAction}. */
public class PulsarSyncTableActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "pulsar-sync-table";
+ public static final String IDENTIFIER = "pulsar_sync_table";
@Override
public String identifier() {
@@ -39,34 +45,34 @@ public class PulsarSyncTableActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- checkRequiredArgument(params, "pulsar-conf");
+ checkRequiredArgument(params, PULSAR_CONF);
PulsarSyncTableAction action =
new PulsarSyncTableAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
- optionalConfigMap(params, "catalog-conf"),
- optionalConfigMap(params, "pulsar-conf"));
- action.withTableConfig(optionalConfigMap(params, "table-conf"));
+ optionalConfigMap(params, CATALOG_CONF),
+ optionalConfigMap(params, PULSAR_CONF));
+ action.withTableConfig(optionalConfigMap(params, TABLE_CONF));
- if (params.has("partition-keys")) {
- action.withPartitionKeys(params.get("partition-keys").split(","));
+ if (params.has(PARTITION_KEYS)) {
+ action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
}
- if (params.has("primary-keys")) {
- action.withPrimaryKeys(params.get("primary-keys").split(","));
+ if (params.has(PRIMARY_KEYS)) {
+ action.withPrimaryKeys(params.get(PRIMARY_KEYS).split(","));
}
- if (params.has("computed-column")) {
+ if (params.has(COMPUTED_COLUMN)) {
action.withComputedColumnArgs(
- new ArrayList<>(params.getMultiParameter("computed-column")));
+ new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
}
- if (params.has("type-mapping")) {
- String[] options = params.get("type-mapping").split(",");
+ if (params.has(TYPE_MAPPING)) {
+ String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 321e71579..d044b38dd 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -24,7 +24,6 @@ import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.util.DockerImageVersions;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
@@ -247,6 +246,7 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
List<String> args =
new ArrayList<>(
Arrays.asList(
+ "kafka_sync_table",
"--warehouse",
warehouse,
"--database",
@@ -264,12 +264,7 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (KafkaSyncTableAction)
- new KafkaSyncTableActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(KafkaSyncTableAction.class, args);
}
}
@@ -296,7 +291,12 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
public KafkaSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
- Arrays.asList("--warehouse", warehouse, "--database", database));
+ Arrays.asList(
+ "kafka_sync_database",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database));
args.addAll(mapToArgs("--kafka-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -309,12 +309,7 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
args.addAll(listToArgs("--type-mapping", typeMappingModes));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (KafkaSyncDatabaseAction)
- new KafkaSyncDatabaseActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(KafkaSyncDatabaseAction.class, args);
}
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
index 4d2fec69d..b4c5eed77 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -24,7 +24,6 @@ import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,6 +110,7 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
List<String> args =
new ArrayList<>(
Arrays.asList(
+ "mongodb_sync_table",
"--warehouse",
warehouse,
"--database",
@@ -125,12 +125,7 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
args.addAll(listToArgs("--partition-keys", partitionKeys));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (MongoDBSyncTableAction)
- new MongoDBSyncTableActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(MongoDBSyncTableAction.class, args);
}
}
@@ -161,7 +156,12 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
public MongoDBSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
- Arrays.asList("--warehouse", warehouse, "--database", database));
+ Arrays.asList(
+ "mongodb_sync_database",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database));
args.addAll(mapToArgs("--mongodb-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -172,12 +172,7 @@ public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
args.addAll(nullableToArgs("--including-tables", includingTables));
args.addAll(nullableToArgs("--excluding-tables", excludingTables));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (MongoDBSyncDatabaseAction)
- new MongoDBSyncDatabaseActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(MongoDBSyncDatabaseAction.class, args);
}
}
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index d6b09f0a3..b9f91b3b4 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.junit.jupiter.api.AfterAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +111,7 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
List<String> args =
new ArrayList<>(
Arrays.asList(
+ "mysql_sync_table",
"--warehouse",
warehouse,
"--database",
@@ -130,12 +130,7 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
args.addAll(listToMultiArgs("--metadata-column", metadataColumns));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (MySqlSyncTableAction)
- new MySqlSyncTableActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(MySqlSyncTableAction.class, args);
}
}
@@ -150,7 +145,12 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
public MySqlSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
- Arrays.asList("--warehouse", warehouse, "--database", database));
+ Arrays.asList(
+ "mysql_sync_database",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database));
args.addAll(mapToArgs("--mysql-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -167,12 +167,7 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase {
args.addAll(listToArgs("--type-mapping", typeMappingModes));
args.addAll(listToArgs("--metadata-column", metadataColumn));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (MySqlSyncDatabaseAction)
- new MySqlSyncDatabaseActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(MySqlSyncDatabaseAction.class, args);
}
}
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
index c696124c8..5e96b3a5f 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
@@ -23,7 +23,6 @@ import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
@@ -277,6 +276,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
List<String> args =
new ArrayList<>(
Arrays.asList(
+ "pulsar_sync_table",
"--warehouse",
warehouse,
"--database",
@@ -294,12 +294,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (PulsarSyncTableAction)
- new PulsarSyncTableActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(PulsarSyncTableAction.class, args);
}
}
@@ -326,7 +321,12 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
public PulsarSyncDatabaseAction build() {
List<String> args =
new ArrayList<>(
- Arrays.asList("--warehouse", warehouse, "--database", database));
+ Arrays.asList(
+ "pulsar_sync_database",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database));
args.addAll(mapToArgs("--pulsar-conf", sourceConfig));
args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -339,12 +339,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
args.addAll(listToArgs("--type-mapping", typeMappingModes));
- MultipleParameterTool params =
- MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
- return (PulsarSyncDatabaseAction)
- new PulsarSyncDatabaseActionFactory()
- .create(params)
- .orElseThrow(RuntimeException::new);
+ return createAction(PulsarSyncDatabaseAction.class, args);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index 8d37d16db..e6ac1dfaa 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -45,10 +45,20 @@ public interface ActionFactory extends Factory {
Logger LOG = LoggerFactory.getLogger(ActionFactory.class);
- Optional<Action> create(MultipleParameterTool params);
+ String HELP = "help";
+ String WAREHOUSE = "warehouse";
+ String DATABASE = "database";
+ String TABLE = "table";
+ String PATH = "path";
+ String CATALOG_CONF = "catalog_conf";
+ String TABLE_CONF = "table_conf";
+ String PARTITION = "partition";
+
+ Optional<Action> create(MultipleParameterToolAdapter params);
static Optional<Action> createAction(String[] args) {
- String action = args[0].toLowerCase();
+ // to be compatible with old usage
+ String action = args[0].toLowerCase().replaceAll("-", "_");
String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
ActionFactory actionFactory;
try {
@@ -62,8 +72,9 @@ public interface ActionFactory extends Factory {
LOG.info("{} job args: {}", actionFactory.identifier(), String.join(" ", actionArgs));
- MultipleParameterTool params = MultipleParameterTool.fromArgs(actionArgs);
- if (params.has("help")) {
+ MultipleParameterToolAdapter params =
+ new MultipleParameterToolAdapter(MultipleParameterTool.fromArgs(actionArgs));
+ if (params.has(HELP)) {
actionFactory.printHelp();
return Optional.empty();
}
@@ -85,11 +96,11 @@ public interface ActionFactory extends Factory {
System.out.println("For detailed options of each action, run <action> --help");
}
- default Tuple3<String, String, String> getTablePath(MultipleParameterTool params) {
- String warehouse = params.get("warehouse");
- String database = params.get("database");
- String table = params.get("table");
- String path = params.get("path");
+ default Tuple3<String, String, String> getTablePath(MultipleParameterToolAdapter params) {
+ String warehouse = params.get(WAREHOUSE);
+ String database = params.get(DATABASE);
+ String table = params.get(TABLE);
+ String path = params.get(PATH);
Tuple3<String, String, String> tablePath = null;
int count = 0;
@@ -118,9 +129,9 @@ public interface ActionFactory extends Factory {
return tablePath;
}
- default List<Map<String, String>> getPartitions(MultipleParameterTool params) {
+ default List<Map<String, String>> getPartitions(MultipleParameterToolAdapter params) {
List<Map<String, String>> partitions = new ArrayList<>();
- for (String partition : params.getMultiParameter("partition")) {
+ for (String partition : params.getMultiParameter(PARTITION)) {
Map<String, String> kvs = parseCommaSeparatedKeyValues(partition);
partitions.add(kvs);
}
@@ -128,7 +139,7 @@ public interface ActionFactory extends Factory {
return partitions;
}
- default Map<String, String> optionalConfigMap(MultipleParameterTool params, String key) {
+ default Map<String, String> optionalConfigMap(MultipleParameterToolAdapter params, String key) {
if (!params.has(key)) {
return Collections.emptyMap();
}
@@ -140,12 +151,12 @@ public interface ActionFactory extends Factory {
return config;
}
- default void checkRequiredArgument(MultipleParameterTool params, String key) {
+ default void checkRequiredArgument(MultipleParameterToolAdapter params, String key) {
Preconditions.checkArgument(
params.has(key), "Argument '%s' is required. Run '<action> --help' for help.", key);
}
- default String getRequiredValue(MultipleParameterTool params, String key) {
+ default String getRequiredValue(MultipleParameterToolAdapter params, String key) {
checkRequiredArgument(params, key);
return params.get(key);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 0e0c4791d..08c1215df 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.List;
import java.util.Map;
@@ -30,28 +29,31 @@ public class CompactActionFactory implements ActionFactory {
public static final String IDENTIFIER = "compact";
+ private static final String ORDER_STRATEGY = "order_strategy";
+ private static final String ORDER_BY = "order_by";
+
@Override
public String identifier() {
return IDENTIFIER;
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
CompactAction action;
- if (params.has("order-strategy")) {
+ if (params.has(ORDER_STRATEGY)) {
action =
new SortCompactAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
catalogConfig,
- optionalConfigMap(params, "table-conf"))
- .withOrderStrategy(params.get("order-strategy"))
- .withOrderColumns(getRequiredValue(params, "order-by").split(","));
+ optionalConfigMap(params, TABLE_CONF))
+ .withOrderStrategy(params.get(ORDER_STRATEGY))
+ .withOrderColumns(getRequiredValue(params, ORDER_BY).split(","));
} else {
action =
new CompactAction(
@@ -59,10 +61,10 @@ public class CompactActionFactory implements ActionFactory {
tablePath.f1,
tablePath.f2,
catalogConfig,
- optionalConfigMap(params, "table-conf"));
+ optionalConfigMap(params, TABLE_CONF));
}
- if (params.has("partition")) {
+ if (params.has(PARTITION)) {
List<Map<String, String>> partitions = getPartitions(params);
action.withPartitions(partitions);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
index b7a1a776a..590f5283a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -18,14 +18,17 @@
package org.apache.paimon.flink.action;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
import java.util.Optional;
/** Factory to create {@link CompactDatabaseAction}. */
public class CompactDatabaseActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "compact-database";
+ public static final String IDENTIFIER = "compact_database";
+
+ private static final String INCLUDING_DATABASES = "including_databases";
+ private static final String INCLUDING_TABLES = "including_tables";
+ private static final String EXCLUDING_TABLES = "excluding_tables";
+ private static final String MODE = "mode";
@Override
public String identifier() {
@@ -33,17 +36,17 @@ public class CompactDatabaseActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
CompactDatabaseAction action =
new CompactDatabaseAction(
- getRequiredValue(params, "warehouse"),
- optionalConfigMap(params, "catalog-conf"));
+ getRequiredValue(params, WAREHOUSE),
+ optionalConfigMap(params, CATALOG_CONF));
- action.includingDatabases(params.get("including-databases"))
- .includingTables(params.get("including-tables"))
- .excludingTables(params.get("excluding-tables"))
- .withDatabaseCompactMode(params.get("mode"))
- .withTableOptions(optionalConfigMap(params, "table-conf"));
+ action.includingDatabases(params.get(INCLUDING_DATABASES))
+ .includingTables(params.get(INCLUDING_TABLES))
+ .excludingTables(params.get(EXCLUDING_TABLES))
+ .withDatabaseCompactMode(params.get(MODE))
+ .withTableOptions(optionalConfigMap(params, TABLE_CONF));
return Optional.of(action);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
index eb3ecf619..45b9597df 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Map;
import java.util.Optional;
@@ -27,7 +26,10 @@ import java.util.Optional;
/** Factory to create {@link CreateTagAction}. */
public class CreateTagActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "create-tag";
+ public static final String IDENTIFIER = "create_tag";
+
+ private static final String TAG_NAME = "tag_name";
+ private static final String SNAPSHOT = "snapshot";
@Override
public String identifier() {
@@ -35,14 +37,14 @@ public class CreateTagActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "tag-name");
- checkRequiredArgument(params, "snapshot");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, TAG_NAME);
+ checkRequiredArgument(params, SNAPSHOT);
Tuple3<String, String, String> tablePath = getTablePath(params);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
- String tagName = params.get("tag-name");
- long snapshot = Long.parseLong(params.get("snapshot"));
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+ String tagName = params.get(TAG_NAME);
+ long snapshot = Long.parseLong(params.get(SNAPSHOT));
CreateTagAction action =
new CreateTagAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java
index 4b5348664..dee3a68de 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Map;
import java.util.Optional;
@@ -29,22 +28,24 @@ public class DeleteActionFactory implements ActionFactory {
public static final String IDENTIFIER = "delete";
+ private static final String WHERE = "where";
+
@Override
public String identifier() {
return IDENTIFIER;
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- String filter = params.get("where");
+ String filter = params.get(WHERE);
if (filter == null) {
throw new IllegalArgumentException(
"Please specify deletion filter. If you want to delete all records, please use overwrite (see doc).");
}
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
DeleteAction action =
new DeleteAction(tablePath.f0, tablePath.f1, tablePath.f2, filter, catalogConfig);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java
index f95716a21..8f0c51482 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Map;
import java.util.Optional;
@@ -27,7 +26,9 @@ import java.util.Optional;
/** Factory to create {@link DeleteTagAction}. */
public class DeleteTagActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "delete-tag";
+ public static final String IDENTIFIER = "delete_tag";
+
+ private static final String TAG_NAME = "tag_name";
@Override
public String identifier() {
@@ -35,12 +36,12 @@ public class DeleteTagActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "tag-name");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, TAG_NAME);
Tuple3<String, String, String> tablePath = getTablePath(params);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
- String tagName = params.get("tag-name");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+ String tagName = params.get(TAG_NAME);
DeleteTagAction action =
new DeleteTagAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java
index d06fd2bf8..b177caadd 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.List;
import java.util.Map;
@@ -28,7 +27,7 @@ import java.util.Optional;
/** Factory to create {@link DropPartitionAction}. */
public class DropPartitionActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "drop-partition";
+ public static final String IDENTIFIER = "drop_partition";
@Override
public String identifier() {
@@ -36,13 +35,13 @@ public class DropPartitionActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- checkRequiredArgument(params, "partition");
+ checkRequiredArgument(params, PARTITION);
List<Map<String, String>> partitions = getPartitions(params);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
return Optional.of(
new DropPartitionAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
index 27fe6069b..2b87b7293 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Arrays;
import java.util.Collection;
@@ -31,7 +30,7 @@ import java.util.stream.Collectors;
/** Factory to create {@link MergeIntoAction}. */
public class MergeIntoActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "merge-into";
+ public static final String IDENTIFIER = "merge_into";
public static final String MATCHED_UPSERT = "matched-upsert";
public static final String NOT_MATCHED_BY_SOURCE_UPSERT = "not-matched-by-source-upsert";
@@ -39,62 +38,78 @@ public class MergeIntoActionFactory implements ActionFactory {
public static final String NOT_MATCHED_BY_SOURCE_DELETE = "not-matched-by-source-delete";
public static final String NOT_MATCHED_INSERT = "not-matched-insert";
+ private static final String TARGET_AS = "target_as";
+ private static final String SOURCE_SQL = "source_sql";
+ private static final String SOURCE_TABLE = "source_table";
+ private static final String ON = "on";
+ private static final String MERGE_ACTIONS = "merge_actions";
+ private static final String MATCHED_UPSERT_SET = "matched_upsert_set";
+ private static final String MATCHED_UPSERT_CONDITION = "matched_upsert_condition";
+ private static final String NOT_MATCHED_BY_SOURCE_UPSERT_SET =
+ "not_matched_by_source_upsert_set";
+ private static final String NOT_MATCHED_BY_SOURCE_UPSERT_CONDITION =
+ "not_matched_by_source_upsert_condition";
+ private static final String MATCHED_DELETE_CONDITION = "matched_delete_condition";
+ private static final String NOT_MATCHED_BY_SOURCE_DELETE_CONDITION =
+ "not_matched_by_source_delete_condition";
+ private static final String NOT_MATCHED_INSERT_VALUES = "not_matched_insert_values";
+ private static final String NOT_MATCHED_INSERT_CONDITION = "not_matched_insert_condition";
+
@Override
public String identifier() {
return IDENTIFIER;
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
MergeIntoAction action =
new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);
- if (params.has("target-as")) {
- action.withTargetAlias(params.get("target-as"));
+ if (params.has(TARGET_AS)) {
+ action.withTargetAlias(params.get(TARGET_AS));
}
- if (params.has("source-sql")) {
- Collection<String> sourceSqls = params.getMultiParameter("source-sql");
+ if (params.has(SOURCE_SQL)) {
+ Collection<String> sourceSqls = params.getMultiParameter(SOURCE_SQL);
action.withSourceSqls(sourceSqls.toArray(new String[0]));
}
- checkRequiredArgument(params, "source-table");
- action.withSourceTable(params.get("source-table"));
+ checkRequiredArgument(params, SOURCE_TABLE);
+ action.withSourceTable(params.get(SOURCE_TABLE));
- checkRequiredArgument(params, "on");
- action.withMergeCondition(params.get("on"));
+ checkRequiredArgument(params, ON);
+ action.withMergeCondition(params.get(ON));
List<String> actions =
- Arrays.stream(params.get("merge-actions").split(","))
+ Arrays.stream(params.get(MERGE_ACTIONS).split(","))
.map(String::trim)
.collect(Collectors.toList());
if (actions.contains(MATCHED_UPSERT)) {
- checkRequiredArgument(params, "matched-upsert-set");
+ checkRequiredArgument(params, MATCHED_UPSERT_SET);
action.withMatchedUpsert(
- params.get("matched-upsert-condition"), params.get("matched-upsert-set"));
+ params.get(MATCHED_UPSERT_CONDITION), params.get(MATCHED_UPSERT_SET));
}
if (actions.contains(NOT_MATCHED_BY_SOURCE_UPSERT)) {
- checkRequiredArgument(params, "not-matched-by-source-upsert-set");
+ checkRequiredArgument(params, NOT_MATCHED_BY_SOURCE_UPSERT_SET);
action.withNotMatchedBySourceUpsert(
- params.get("not-matched-by-source-upsert-condition"),
- params.get("not-matched-by-source-upsert-set"));
+ params.get(NOT_MATCHED_BY_SOURCE_UPSERT_CONDITION),
+ params.get(NOT_MATCHED_BY_SOURCE_UPSERT_SET));
}
if (actions.contains(MATCHED_DELETE)) {
- action.withMatchedDelete(params.get("matched-delete-condition"));
+ action.withMatchedDelete(params.get(MATCHED_DELETE_CONDITION));
}
if (actions.contains(NOT_MATCHED_BY_SOURCE_DELETE)) {
- action.withNotMatchedBySourceDelete(
- params.get("not-matched-by-source-delete-condition"));
+ action.withNotMatchedBySourceDelete(params.get(NOT_MATCHED_BY_SOURCE_DELETE_CONDITION));
}
if (actions.contains(NOT_MATCHED_INSERT)) {
- checkRequiredArgument(params, "not-matched-insert-values");
+ checkRequiredArgument(params, NOT_MATCHED_INSERT_VALUES);
action.withNotMatchedInsert(
- params.get("not-matched-insert-condition"),
- params.get("not-matched-insert-values"));
+ params.get(NOT_MATCHED_INSERT_CONDITION),
+ params.get(NOT_MATCHED_INSERT_VALUES));
}
validate(action);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
index a9dd8a512..5ab1fb259 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
@@ -18,8 +18,6 @@
package org.apache.paimon.flink.action;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
-
import java.util.Map;
import java.util.Optional;
@@ -28,18 +26,21 @@ public class MigrateTableActionFactory implements ActionFactory {
public static final String IDENTIFIER = "migrate_table";
+ private static final String SOURCE_TYPE = "source_type";
+ private static final String OPTIONS = "options";
+
@Override
public String identifier() {
return IDENTIFIER;
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
- String warehouse = params.get("warehouse");
- String connector = params.get("source_type");
- String sourceHiveTable = params.get("table");
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
- String tableConf = params.get("options");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ String warehouse = params.get(WAREHOUSE);
+ String connector = params.get(SOURCE_TYPE);
+ String sourceHiveTable = params.get(TABLE);
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+ String tableConf = params.get(OPTIONS);
MigrateTableAction migrateTableAction =
new MigrateTableAction(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
new file mode 100644
index 000000000..e03b8cd69
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Collection;
+
+/** An adapter of {@link MultipleParameterTool} that can deal old style key. */
+public class MultipleParameterToolAdapter {
+
+ private final MultipleParameterTool params;
+
+ public MultipleParameterToolAdapter(MultipleParameterTool params) {
+ this.params = params;
+ }
+
+ public boolean has(String key) {
+ return params.has(key) || params.has(fallback(key));
+ }
+
+ public String get(String key) {
+ String result = params.get(key);
+ return result == null ? params.get(fallback(key)) : result;
+ }
+
+ public Collection<String> getMultiParameter(String key) {
+ Collection<String> result = params.getMultiParameter(key);
+ return result == null ? params.getMultiParameter(fallback(key)) : result;
+ }
+
+ public String fallback(String key) {
+ return key.replaceAll("_", "-");
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
index 4e0ca0316..b5d1f290d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Map;
import java.util.Optional;
@@ -27,7 +26,9 @@ import java.util.Optional;
/** Factory to create {@link RemoveOrphanFilesAction}. */
public class RemoveOrphanFilesActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "remove-orphan-files";
+ public static final String IDENTIFIER = "remove_orphan_files";
+
+ private static final String OLDER_THAN = "older_than";
@Override
public String identifier() {
@@ -35,16 +36,16 @@ public class RemoveOrphanFilesActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
RemoveOrphanFilesAction action =
new RemoveOrphanFilesAction(
tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);
- if (params.has("older-than")) {
- action.olderThan(params.get("olderThan"));
+ if (params.has(OLDER_THAN)) {
+ action.olderThan(params.get(OLDER_THAN));
}
return Optional.of(action);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
index d8e04bf0e..a6e16b7e8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Map;
import java.util.Optional;
@@ -27,7 +26,10 @@ import java.util.Optional;
/** Factory to create {@link ResetConsumerAction}. */
public class ResetConsumerActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "reset-consumer";
+ public static final String IDENTIFIER = "reset_consumer";
+
+ private static final String CONSUMER_ID = "consumer_id";
+ private static final String NEXT_SNAPSHOT = "next_snapshot";
@Override
public String identifier() {
@@ -35,19 +37,19 @@ public class ResetConsumerActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
- checkRequiredArgument(params, "consumer-id");
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, CONSUMER_ID);
Tuple3<String, String, String> tablePath = getTablePath(params);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
- String consumerId = params.get("consumer-id");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+ String consumerId = params.get(CONSUMER_ID);
ResetConsumerAction action =
new ResetConsumerAction(
tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, consumerId);
- if (params.has("next-snapshot")) {
- action.withNextSnapshotIds(Long.parseLong(params.get("next-snapshot")));
+ if (params.has(NEXT_SNAPSHOT)) {
+ action.withNextSnapshotIds(Long.parseLong(params.get(NEXT_SNAPSHOT)));
}
return Optional.of(action);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java
index 43ea7a406..222d7d569 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Map;
import java.util.Optional;
@@ -27,7 +26,9 @@ import java.util.Optional;
/** Factory to create {@link RollbackToAction}. */
public class RollbackToActionFactory implements ActionFactory {
- public static final String IDENTIFIER = "rollback-to";
+ public static final String IDENTIFIER = "rollback_to";
+
+ private static final String VERSION = "version";
@Override
public String identifier() {
@@ -35,13 +36,13 @@ public class RollbackToActionFactory implements ActionFactory {
}
@Override
- public Optional<Action> create(MultipleParameterTool params) {
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);
- checkRequiredArgument(params, "version");
- String version = params.get("version");
+ checkRequiredArgument(params, VERSION);
+ String version = params.get(VERSION);
- Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+ Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
RollbackToAction action =
new RollbackToAction(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index fe48881d4..476727699 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -159,6 +159,31 @@ public abstract class ActionITCaseBase extends AbstractTestBase {
return env;
}
+ protected <T extends ActionBase> T createAction(Class<T> clazz, List<String> args) {
+ return createAction(clazz, args.toArray(new String[0]));
+ }
+
+ protected <T extends ActionBase> T createAction(Class<T> clazz, String... args) {
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ confuseArgs(args, "_", "-");
+ } else {
+ confuseArgs(args, "-", "_");
+ }
+ return ActionFactory.createAction(args)
+ .filter(clazz::isInstance)
+ .map(clazz::cast)
+ .orElseThrow(() -> new RuntimeException("Failed to create action"));
+ }
+
+ // to test compatibility with old usage
+ private void confuseArgs(String[] args, String regex, String replacement) {
+ args[0] = args[0].replaceAll(regex, replacement);
+ for (int i = 1; i < args.length; i += 2) {
+ String arg = args[i].substring(2);
+ args[i] = "--" + arg.replaceAll(regex, replacement);
+ }
+ }
+
protected void callProcedure(String procedureStatement) {
// default execution mode
callProcedure(procedureStatement, true, false);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 9206cc95f..d40d1aeea 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -250,14 +250,17 @@ public class CompactActionITCase extends CompactActionITCaseBase {
Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyMap());
CompactAction compactAction =
- new CompactAction(
- warehouse,
- database,
- tableName,
- Collections.emptyMap(),
- Collections.singletonMap(
- FlinkConnectorOptions.SCAN_PARALLELISM.key(), "6"))
- .withPartitions(getSpecifiedPartitions());
+ createAction(
+ CompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--table_conf",
+ FlinkConnectorOptions.SCAN_PARALLELISM.key() + "=6");
assertThat(compactAction.table.options().get(FlinkConnectorOptions.SCAN_PARALLELISM.key()))
.isEqualTo("6");
@@ -288,10 +291,21 @@ public class CompactActionITCase extends CompactActionITCaseBase {
private void runAction(boolean isStreaming) throws Exception {
StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
- new CompactAction(warehouse, database, tableName)
- .withPartitions(getSpecifiedPartitions())
- .withStreamExecutionEnvironment(env)
- .build();
+ CompactAction action =
+ createAction(
+ CompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--partition",
+ "dt=20221208,hh=15",
+ "--partition",
+ "dt=20221209,hh=15");
+ action.withStreamExecutionEnvironment(env).build();
if (isStreaming) {
env.executeAsync();
} else {
@@ -307,16 +321,4 @@ public class CompactActionITCase extends CompactActionITCaseBase {
isStreaming,
!isStreaming);
}
-
- private List<Map<String, String>> getSpecifiedPartitions() {
- Map<String, String> partition1 = new HashMap<>();
- partition1.put("dt", "20221208");
- partition1.put("hh", "15");
-
- Map<String, String> partition2 = new HashMap<>();
- partition2.put("dt", "20221209");
- partition2.put("hh", "15");
-
- return Arrays.asList(partition1, partition2);
- }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index c0c0d4337..705826bfa 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -132,8 +132,13 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
if (ThreadLocalRandom.current().nextBoolean()) {
StreamExecutionEnvironment env = buildDefaultEnv(false);
- new CompactDatabaseAction(warehouse, Collections.emptyMap())
- .withDatabaseCompactMode(mode)
+ createAction(
+ CompactDatabaseAction.class,
+ "compact_database",
+ "--warehouse",
+ warehouse,
+ "--mode",
+ mode)
.withStreamExecutionEnvironment(env)
.build();
env.execute();
@@ -207,22 +212,30 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
}
if (ThreadLocalRandom.current().nextBoolean()) {
- StreamExecutionEnvironment env = buildDefaultEnv(true);
+ CompactDatabaseAction action;
if (mode.equals("divided")) {
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .withStreamExecutionEnvironment(env)
- .build();
+ action =
+ createAction(
+ CompactDatabaseAction.class,
+ "compact_database",
+ "--warehouse",
+ warehouse);
} else {
// if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use default value, the cost
// time in combined mode will be over 1 min
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .withDatabaseCompactMode("combined")
- .withTableOptions(
- Collections.singletonMap(
- CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"))
- .withStreamExecutionEnvironment(env)
- .build();
+ action =
+ createAction(
+ CompactDatabaseAction.class,
+ "compact_database",
+ "--warehouse",
+ warehouse,
+ "--mode",
+ "combined",
+ "--table_conf",
+ CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
}
+ StreamExecutionEnvironment env = buildDefaultEnv(true);
+ action.withStreamExecutionEnvironment(env).build();
env.executeAsync();
} else {
if (mode.equals("divided")) {
@@ -426,8 +439,8 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
private void includingAndExcludingTablesImpl(
String mode,
- String includingPattern,
- String excludesPattern,
+ @Nullable String includingPattern,
+ @Nullable String excludesPattern,
List<Identifier> includeTables,
List<Identifier> excludeTables)
throws Exception {
@@ -477,18 +490,29 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
}
if (ThreadLocalRandom.current().nextBoolean()) {
- StreamExecutionEnvironment env = buildDefaultEnv(false);
- CompactDatabaseAction action =
- new CompactDatabaseAction(warehouse, Collections.emptyMap())
- .includingTables(includingPattern)
- .excludingTables(excludesPattern)
- .withDatabaseCompactMode(mode);
+ List<String> args = new ArrayList<>();
+ args.add("compact_database");
+ args.add("--warehouse");
+ args.add(warehouse);
+ if (includingPattern != null) {
+ args.add("--including_tables");
+ args.add(includingPattern);
+ }
+ if (excludesPattern != null) {
+ args.add("--excluding_tables");
+ args.add(excludesPattern);
+ }
+ args.add("--mode");
+ args.add(mode);
if (mode.equals("combined")) {
- action.withTableOptions(
- Collections.singletonMap(
- CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"));
+ args.add("--table_conf");
+ args.add(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() + "=1s");
}
- action.withStreamExecutionEnvironment(env).build();
+
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+ createAction(CompactDatabaseAction.class, args)
+ .withStreamExecutionEnvironment(env)
+ .build();
env.execute();
} else {
if (mode.equals("divided")) {
@@ -588,7 +612,7 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
if (ThreadLocalRandom.current().nextBoolean()) {
StreamExecutionEnvironment env = buildDefaultEnv(true);
- new CompactDatabaseAction(warehouse, new HashMap<>())
+ createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", warehouse)
.withStreamExecutionEnvironment(env)
.build();
env.executeAsync();
@@ -661,7 +685,7 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
if (ThreadLocalRandom.current().nextBoolean()) {
StreamExecutionEnvironment env = buildDefaultEnv(false);
- new CompactDatabaseAction(warehouse, new HashMap<>())
+ createAction(CompactDatabaseAction.class, "compact_database", "--warehouse", warehouse)
.withStreamExecutionEnvironment(env)
.build();
env.execute();
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 771a9e59d..2710cd6d9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
@@ -80,11 +81,22 @@ public class ConsumerActionITCase extends ActionITCaseBase {
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
+ List<String> args =
+ Arrays.asList(
+ "reset_consumer",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--consumer_id",
+ "myid",
+ "--next_snapshot",
+ "1");
// reset consumer
if (ThreadLocalRandom.current().nextBoolean()) {
- new ResetConsumerAction(warehouse, database, tableName, Collections.emptyMap(), "myid")
- .withNextSnapshotIds(1L)
- .run();
+ createAction(ResetConsumerAction.class, args).run();
} else {
callProcedure(
String.format(
@@ -96,8 +108,7 @@ public class ConsumerActionITCase extends ActionITCaseBase {
// delete consumer
if (ThreadLocalRandom.current().nextBoolean()) {
- new ResetConsumerAction(warehouse, database, tableName, Collections.emptyMap(), "myid")
- .run();
+ createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
} else {
callProcedure(
String.format("CALL sys.reset_consumer('%s.%s', 'myid')", database, tableName));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index 7550d5e81..da1cf763a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -73,7 +73,17 @@ public class DeleteActionITCase extends ActionITCaseBase {
prepareTable();
DeleteAction action =
- new DeleteAction(warehouse, database, tableName, "k = 1", Collections.emptyMap());
+ createAction(
+ DeleteAction.class,
+ "delete",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--where",
+ "k=1");
BlockingIterator<Row, Row> iterator =
testStreamingRead(buildSimpleQuery(tableName), initialRecords);
@@ -112,7 +122,17 @@ public class DeleteActionITCase extends ActionITCaseBase {
});
DeleteAction action =
- new DeleteAction(warehouse, database, tableName, "k < 3", Collections.emptyMap());
+ createAction(
+ DeleteAction.class,
+ "delete",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--where",
+ "k<3");
insertInto(
tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'), (4, 'Paimon', 'D')");
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index 36358a0e0..d489a2843 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -56,12 +56,18 @@ public class DropPartitionActionITCase extends ActionITCaseBase {
FileStoreTable table = prepareTable(hasPk);
if (ThreadLocalRandom.current().nextBoolean()) {
- new DropPartitionAction(
+
+ createAction(
+ DropPartitionAction.class,
+ "drop_partition",
+ "--warehouse",
warehouse,
+ "--database",
database,
+ "--table",
tableName,
- Collections.singletonList(Collections.singletonMap("partKey0", "0")),
- Collections.emptyMap())
+ "--partition",
+ "partKey0=0")
.run();
} else {
callProcedure(
@@ -115,12 +121,19 @@ public class DropPartitionActionITCase extends ActionITCaseBase {
partitions1.put("partKey1", "0");
if (ThreadLocalRandom.current().nextBoolean()) {
- new DropPartitionAction(
+ createAction(
+ DropPartitionAction.class,
+ "drop_partition",
+ "--warehouse",
warehouse,
+ "--database",
database,
+ "--table",
tableName,
- Arrays.asList(partitions0, partitions1),
- Collections.emptyMap())
+ "--partition",
+ "partKey0=0,partKey1=1",
+ "--partition",
+ "partKey0=1,partKey1=0")
.run();
} else {
callProcedure(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 7d6ce1f33..5012a1732 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -32,6 +32,9 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -40,6 +43,11 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
+import static org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
@@ -99,7 +107,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
// WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
// SET v = v || '_nmu', last_action = 'not_matched_upsert'
// WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
- MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
// here test if it works when table S is in default and qualified both
action.withSourceTable("default.S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
@@ -112,7 +120,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
.withNotMatchedBySourceDelete("dt >= '02-28'");
validateActionRunResult(
- action,
+ action.build(),
expected,
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
@@ -150,7 +158,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
@ParameterizedTest(name = "in-default = {0}")
@ValueSource(booleans = {true, false})
public void testTargetAlias(boolean inDefault) throws Exception {
- MergeIntoAction action;
+ MergeIntoActionBuilder action;
if (!inDefault) {
// create target table in a new database
@@ -160,9 +168,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
bEnv.executeSql("USE test_db");
prepareTargetTable(CoreOptions.ChangelogProducer.NONE);
- action = new MergeIntoAction(warehouse, "test_db", "T");
+ action = new MergeIntoActionBuilder(warehouse, "test_db", "T");
} else {
- action = new MergeIntoAction(warehouse, database, "T");
+ action = new MergeIntoActionBuilder(warehouse, database, "T");
}
action.withTargetAlias("TT")
@@ -192,7 +200,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
changelogRow("+I", 10, "v_10", "creation", "02-28"));
if (ThreadLocalRandom.current().nextBoolean()) {
- validateActionRunResult(action, streamingExpected, batchExpected);
+ validateActionRunResult(action.build(), streamingExpected, batchExpected);
} else {
validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
}
@@ -201,7 +209,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
@ParameterizedTest(name = "in-default = {0}")
@ValueSource(booleans = {true, false})
public void testSourceName(boolean inDefault) throws Exception {
- MergeIntoAction action = new MergeIntoAction(warehouse, "default", "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, "default", "T");
String sourceTableName = "S";
if (!inDefault) {
@@ -245,7 +253,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
changelogRow("+I", 10, "v_10", "creation", "02-28"));
if (ThreadLocalRandom.current().nextBoolean()) {
- validateActionRunResult(action, streamingExpected, batchExpected);
+ validateActionRunResult(action.build(), streamingExpected, batchExpected);
} else {
validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
}
@@ -275,7 +283,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
useCatalog ? "S" : "test_cat.`default`.S", id);
String escapeDdl = ddl.replaceAll("'", "''");
- MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
if (useCatalog) {
action.withSourceSqls(catalog, "USE CATALOG test_cat", ddl);
@@ -316,7 +324,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
changelogRow("+I", 10, "v_10", "creation", "02-28"));
if (ThreadLocalRandom.current().nextBoolean()) {
- validateActionRunResult(action, streamingExpected, batchExpected);
+ validateActionRunResult(action.build(), streamingExpected, batchExpected);
} else {
validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
}
@@ -326,7 +334,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
@ValueSource(booleans = {true, false})
public void testMatchedUpsertSetAll(boolean qualified) throws Exception {
// build MergeIntoAction
- MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S")
.withSourceTable(qualified ? "default.SS" : "SS")
.withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
@@ -364,7 +372,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
changelogRow("+I", 10, "v_10", "creation", "02-28"));
if (ThreadLocalRandom.current().nextBoolean()) {
- validateActionRunResult(action, streamingExpected, batchExpected);
+ validateActionRunResult(action.build(), streamingExpected, batchExpected);
} else {
validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
}
@@ -374,7 +382,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
@ValueSource(booleans = {true, false})
public void testNotMatchedInsertAll(boolean qualified) throws Exception {
// build MergeIntoAction
- MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S")
.withSourceTable(qualified ? "default.SS" : "SS")
.withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
@@ -408,7 +416,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
changelogRow("+I", 11, "v_11", "unknown", "02-29"));
if (ThreadLocalRandom.current().nextBoolean()) {
- validateActionRunResult(action, streamingExpected, batchExpected);
+ validateActionRunResult(action.build(), streamingExpected, batchExpected);
} else {
validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
}
@@ -449,7 +457,8 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
Collections.emptyList(),
Collections.emptyList());
- assertThatThrownBy(() -> new MergeIntoAction(warehouse, database, nonPkTable))
+ assertThatThrownBy(
+ () -> new MergeIntoActionBuilder(warehouse, database, nonPkTable).build())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"merge-into action doesn't support table with no primary keys defined.");
@@ -458,12 +467,12 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
@Test
public void testIncompatibleSchema() {
// build MergeIntoAction
- MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
action.withSourceTable("S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withNotMatchedInsert(null, "S.k, S.v, 0, S.dt");
- assertThatThrownBy(action::run)
+ assertThatThrownBy(() -> action.build().run())
.isInstanceOf(IllegalStateException.class)
.hasMessage(
"The schema of result in action 'not-matched-insert' is invalid.\n"
@@ -479,13 +488,13 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
sEnv.executeSql("USE test_db");
prepareSourceTable();
- MergeIntoAction action = new MergeIntoAction(warehouse, "default", "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, "default", "T");
// the qualified path of source table is absent
action.withSourceTable("S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withMatchedDelete("S.v IS NULL");
- assertThatThrownBy(action::run)
+ assertThatThrownBy(() -> action.build().run())
.satisfies(
AssertionUtils.anyCauseMatches(
ValidationException.class, "Object 'S' not found"));
@@ -496,7 +505,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
// drop table S
sEnv.executeSql("DROP TABLE S");
- MergeIntoAction action = new MergeIntoAction(warehouse, "default", "T");
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, "default", "T");
action.withSourceSqls(
"CREATE DATABASE test_db",
"CREATE TEMPORARY TABLE test_db.S (k INT, v STRING, dt STRING) WITH ('connector' = 'values', 'bounded' = 'true')")
@@ -505,7 +514,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withMatchedDelete("S.v IS NULL");
- assertThatThrownBy(action::run)
+ assertThatThrownBy(() -> action.build().run())
.satisfies(
AssertionUtils.anyCauseMatches(
ValidationException.class, "Object 'S' not found"));
@@ -645,4 +654,116 @@ public class MergeIntoActionITCase extends ActionITCaseBase {
changelogRow("-D", 9, "v_9", "creation", "02-28"),
changelogRow("-D", 10, "v_10", "creation", "02-28"))));
}
+
+ private class MergeIntoActionBuilder {
+
+ private final List<String> args;
+ private final List<String> mergeActions;
+
+ public MergeIntoActionBuilder(String warehouse, String database, String table) {
+ this.args =
+ new ArrayList<>(
+ Arrays.asList(
+ "merge_into",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ table));
+ this.mergeActions = new ArrayList<>();
+ }
+
+ public MergeIntoActionBuilder withTargetAlias(String targetAlias) {
+ if (targetAlias != null) {
+ args.add("--target_as");
+ args.add(targetAlias);
+ }
+ return this;
+ }
+
+ public MergeIntoActionBuilder withSourceTable(String sourceTable) {
+ args.add("--source_table");
+ args.add(sourceTable);
+ return this;
+ }
+
+ public MergeIntoActionBuilder withSourceSqls(String... sourceSqls) {
+ if (sourceSqls != null) {
+ for (String sql : sourceSqls) {
+ args.add("--source_sql");
+ args.add(sql);
+ }
+ }
+ return this;
+ }
+
+ public MergeIntoActionBuilder withMergeCondition(String mergeCondition) {
+ args.add("--on");
+ args.add(mergeCondition);
+ return this;
+ }
+
+ public MergeIntoActionBuilder withMatchedUpsert(
+ @Nullable String matchedUpsertCondition, String matchedUpsertSet) {
+ mergeActions.add(MATCHED_UPSERT);
+ args.add("--matched_upsert_set");
+ args.add(matchedUpsertSet);
+ if (matchedUpsertCondition != null) {
+ args.add("--matched_upsert_condition");
+ args.add(matchedUpsertCondition);
+ }
+ return this;
+ }
+
+ public MergeIntoActionBuilder withNotMatchedBySourceUpsert(
+ @Nullable String notMatchedBySourceUpsertCondition,
+ String notMatchedBySourceUpsertSet) {
+ mergeActions.add(NOT_MATCHED_BY_SOURCE_UPSERT);
+ args.add("--not_matched_by_source_upsert_set");
+ args.add(notMatchedBySourceUpsertSet);
+ if (notMatchedBySourceUpsertCondition != null) {
+ args.add("--not_matched_by_source_upsert_condition");
+ args.add(notMatchedBySourceUpsertCondition);
+ }
+ return this;
+ }
+
+ public MergeIntoActionBuilder withMatchedDelete(@Nullable String matchedDeleteCondition) {
+ mergeActions.add(MATCHED_DELETE);
+ if (matchedDeleteCondition != null) {
+ args.add("--matched_delete_condition");
+ args.add(matchedDeleteCondition);
+ }
+ return this;
+ }
+
+ public MergeIntoActionBuilder withNotMatchedBySourceDelete(
+ @Nullable String notMatchedBySourceDeleteCondition) {
+ mergeActions.add(NOT_MATCHED_BY_SOURCE_DELETE);
+ if (notMatchedBySourceDeleteCondition != null) {
+ args.add("--not_matched_by_source_delete_condition");
+ args.add(notMatchedBySourceDeleteCondition);
+ }
+ return this;
+ }
+
+ public MergeIntoActionBuilder withNotMatchedInsert(
+ @Nullable String notMatchedInsertCondition, String notMatchedInsertValues) {
+ mergeActions.add(NOT_MATCHED_INSERT);
+ args.add("--not_matched_insert_values");
+ args.add(notMatchedInsertValues);
+ if (notMatchedInsertCondition != null) {
+ args.add("--not_matched_insert_condition");
+ args.add(notMatchedInsertCondition);
+ }
+ return this;
+ }
+
+ MergeIntoAction build() {
+ args.add("--merge_actions");
+ args.add(String.join(",", mergeActions));
+ return createAction(MergeIntoAction.class, args);
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 1c7af1f0f..2eba8261f 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -27,7 +27,10 @@ import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -54,11 +57,23 @@ public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
writeData(rowData(1L, BinaryString.fromString("Hi")));
- RemoveOrphanFilesAction action =
- new RemoveOrphanFilesAction(warehouse, database, tableName, Collections.emptyMap());
- assertThatCode(action::run).doesNotThrowAnyException();
- assertThatCode(() -> action.olderThan("2023-12-31 23:59:59").run())
- .doesNotThrowAnyException();
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "remove_orphan_files",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName));
+ RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args);
+ assertThatCode(action1::run).doesNotThrowAnyException();
+
+ args.add("--older_than");
+ args.add("2023-12-31 23:59:59");
+ RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args);
+ assertThatCode(action2::run).doesNotThrowAnyException();
String withoutOlderThan =
String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 6ecac880d..14c8d9f5f 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -67,7 +67,18 @@ public class RollbackToActionITCase extends ActionITCaseBase {
writeData(rowData(2L, BinaryString.fromString("Flink")));
if (ThreadLocalRandom.current().nextBoolean()) {
- new RollbackToAction(warehouse, database, tableName, "2", Collections.emptyMap()).run();
+ createAction(
+ RollbackToAction.class,
+ "rollback_to",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--version",
+ "2")
+ .run();
} else {
callProcedure(String.format("CALL sys.rollback_to('%s.%s', 2)", database, tableName));
}
@@ -98,7 +109,17 @@ public class RollbackToActionITCase extends ActionITCaseBase {
table.createTag("tag3", 3);
if (ThreadLocalRandom.current().nextBoolean()) {
- new RollbackToAction(warehouse, database, tableName, "tag2", Collections.emptyMap())
+ createAction(
+ RollbackToAction.class,
+ "rollback_to",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--version",
+ "tag2")
.run();
} else {
callProcedure(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 2c079f379..45dbfcd9d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -41,7 +41,6 @@ import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -155,15 +154,7 @@ public class SortCompactActionForDynamicBucketITCase extends ActionITCaseBase {
private void zorder(List<String> columns) throws Exception {
if (RANDOM.nextBoolean()) {
- new SortCompactAction(
- warehouse,
- database,
- tableName,
- Collections.emptyMap(),
- Collections.emptyMap())
- .withOrderStrategy("zorder")
- .withOrderColumns(columns)
- .run();
+ createAction("zorder", columns).run();
} else {
callProcedure("zorder", columns);
}
@@ -171,20 +162,28 @@ public class SortCompactActionForDynamicBucketITCase extends ActionITCaseBase {
private void order(List<String> columns) throws Exception {
if (RANDOM.nextBoolean()) {
- new SortCompactAction(
- warehouse,
- database,
- tableName,
- Collections.emptyMap(),
- Collections.emptyMap())
- .withOrderStrategy("order")
- .withOrderColumns(columns)
- .run();
+ createAction("order", columns).run();
} else {
callProcedure("order", columns);
}
}
+ private SortCompactAction createAction(String orderStrategy, List<String> columns) {
+ return createAction(
+ SortCompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--order_strategy",
+ orderStrategy,
+ "--order_by",
+ String.join(",", columns));
+ }
+
private void callProcedure(String orderStrategy, List<String> orderByColumns) {
callProcedure(
String.format(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index da524c5a7..7801b472c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -232,15 +232,7 @@ public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
private void zorder(List<String> columns) throws Exception {
if (RANDOM.nextBoolean()) {
- new SortCompactAction(
- warehouse,
- database,
- tableName,
- Collections.emptyMap(),
- Collections.emptyMap())
- .withOrderStrategy("zorder")
- .withOrderColumns(columns)
- .run();
+ createAction("zorder", columns).run();
} else {
callProcedure("zorder", columns);
}
@@ -248,20 +240,28 @@ public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase {
private void order(List<String> columns) throws Exception {
if (RANDOM.nextBoolean()) {
- new SortCompactAction(
- warehouse,
- database,
- tableName,
- Collections.emptyMap(),
- Collections.emptyMap())
- .withOrderStrategy("order")
- .withOrderColumns(columns)
- .run();
+ createAction("order", columns).run();
} else {
callProcedure("order", columns);
}
}
+ private SortCompactAction createAction(String orderStrategy, List<String> columns) {
+ return createAction(
+ SortCompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--order_strategy",
+ orderStrategy,
+ "--order_by",
+ String.join(",", columns));
+ }
+
private void callProcedure(String orderStrategy, List<String> orderByColumns) {
callProcedure(
String.format(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index 9691e66fe..b5c616707 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -67,7 +67,19 @@ public class TagActionITCase extends ActionITCaseBase {
TagManager tagManager = new TagManager(table.fileIO(), table.location());
if (ThreadLocalRandom.current().nextBoolean()) {
- new CreateTagAction(warehouse, database, tableName, Collections.emptyMap(), "tag2", 2)
+ createAction(
+ CreateTagAction.class,
+ "create_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--tag_name",
+ "tag2",
+ "--snapshot",
+ "2")
.run();
} else {
callProcedure(
@@ -81,7 +93,17 @@ public class TagActionITCase extends ActionITCaseBase {
Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
if (ThreadLocalRandom.current().nextBoolean()) {
- new DeleteTagAction(warehouse, database, tableName, Collections.emptyMap(), "tag2")
+ createAction(
+ DeleteTagAction.class,
+ "delete_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--tag_name",
+ "tag2")
.run();
} else {
callProcedure(