You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by yu...@apache.org on 2024/01/08 07:16:14 UTC
(incubator-paimon) branch master updated: [hotfix] Use the tool convertToPropertiesPrefixKey to optimize obtaining configuration by specified prefix (#2649)
This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3bd1212d2 [hotfix] Use the tool convertToPropertiesPrefixKey to optimize obtaining configuration by specified prefix (#2649)
3bd1212d2 is described below
commit 3bd1212d2a916edc79e6de0c692f156e7dcfaddd
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Mon Jan 8 15:16:10 2024 +0800
[hotfix] Use the tool convertToPropertiesPrefixKey to optimize obtaining configuration by specified prefix (#2649)
---
.../java/org/apache/paimon/options/Options.java | 12 +++--------
.../flink/action/cdc/kafka/KafkaActionUtils.java | 9 ++-------
.../flink/action/cdc/mysql/MySqlActionUtils.java | 23 +++++-----------------
.../java/org/apache/paimon/hive/HiveCatalog.java | 11 ++---------
4 files changed, 12 insertions(+), 43 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index 4510dc8a0..d658ce123 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -33,6 +33,7 @@ import java.util.function.BiFunction;
import static org.apache.paimon.options.OptionsUtils.canBePrefixMap;
import static org.apache.paimon.options.OptionsUtils.containsPrefixMap;
+import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixed;
import static org.apache.paimon.options.OptionsUtils.removePrefixMap;
@@ -138,14 +139,7 @@ public class Options implements Serializable {
}
public synchronized Options removePrefix(String prefix) {
- Map<String, String> newData = new HashMap<>();
- data.forEach(
- (k, v) -> {
- if (k.startsWith(prefix)) {
- newData.put(k.substring(prefix.length()), v);
- }
- });
- return new Options(newData);
+ return new Options(convertToPropertiesPrefixKey(data, prefix));
}
public synchronized void remove(String key) {
@@ -156,7 +150,7 @@ public class Options implements Serializable {
return data.containsKey(key);
}
- /** Adds all entries in this options to the given {@link Properties}. */
+ /** Adds all entries in these options to the given {@link Properties}. */
public synchronized void addAllToProperties(Properties props) {
props.putAll(this.data);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 63702e6be..fcd0eeb88 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -59,6 +59,7 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
/** Utils for Kafka Action. */
public class KafkaActionUtils {
@@ -284,13 +285,7 @@ public class KafkaActionUtils {
private static Properties createKafkaProperties(Configuration kafkaConfig) {
Properties props = new Properties();
- for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if (key.startsWith(PROPERTIES_PREFIX)) {
- props.put(key.substring(PROPERTIES_PREFIX.length()), value);
- }
- }
+ props.putAll(convertToPropertiesPrefixKey(kafkaConfig.toMap(), PROPERTIES_PREFIX));
return props;
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 26f6afc2a..88a733a8f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
+import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
/** Utils for MySQL Action. */
public class MySqlActionUtils {
@@ -208,14 +209,9 @@ public class MySqlActionUtils {
sourceBuilder.jdbcProperties(jdbcProperties);
Properties debeziumProperties = new Properties();
- for (Map.Entry<String, String> entry : mySqlConfig.toMap().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
- debeziumProperties.put(
- key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
- }
- }
+ debeziumProperties.putAll(
+ convertToPropertiesPrefixKey(
+ mySqlConfig.toMap(), DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX));
sourceBuilder.debeziumProperties(debeziumProperties);
Map<String, Object> customConverterConfigs = new HashMap<>();
@@ -238,16 +234,7 @@ public class MySqlActionUtils {
private static Map<String, String> getJdbcProperties(
TypeMapping typeMapping, Configuration mySqlConfig) {
Map<String, String> jdbcProperties =
- mySqlConfig.toMap().entrySet().stream()
- .filter(e -> e.getKey().startsWith(JdbcUrlUtils.PROPERTIES_PREFIX))
- .collect(
- Collectors.toMap(
- e ->
- e.getKey()
- .substring(
- JdbcUrlUtils.PROPERTIES_PREFIX
- .length()),
- Map.Entry::getValue));
+ convertToPropertiesPrefixKey(mySqlConfig.toMap(), JdbcUrlUtils.PROPERTIES_PREFIX);
if (typeMapping.containsMode(TINYINT1_NOT_BOOL)) {
String tinyInt1isBit = jdbcProperties.get("tinyInt1isBit");
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 42cc831e6..16a73498e 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -85,6 +85,7 @@ import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
+import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -106,7 +107,6 @@ public class HiveCatalog extends AbstractCatalog {
private static final String STORAGE_HANDLER_CLASS_NAME =
"org.apache.paimon.hive.PaimonStorageHandler";
private static final String HIVE_PREFIX = "hive.";
- private static final int HIVE_PREFIX_LENGTH = HIVE_PREFIX.length();
public static final String HIVE_SITE_FILE = "hive-site.xml";
private final HiveConf hiveConf;
@@ -353,14 +353,7 @@ public class HiveCatalog extends AbstractCatalog {
Table table =
newHmsTable(
identifier,
- tableSchema.options().entrySet().stream()
- .filter(entry -> entry.getKey().startsWith(HIVE_PREFIX))
- .collect(
- Collectors.toMap(
- entry ->
- entry.getKey()
- .substring(HIVE_PREFIX_LENGTH),
- Map.Entry::getValue)));
+ convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX));
try {
updateHmsTable(table, identifier, tableSchema);
client.createTable(table);