You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/01/07 07:35:56 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Doris] Change Doris Config Prefix (#3856)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 16e39a506 [Improve][Connector-V2][Doris] Change Doris Config Prefix (#3856)
16e39a506 is described below
commit 16e39a506b61ca2bf3c0ec53fb77beed43133a7f
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Sat Jan 7 15:35:49 2023 +0800
[Improve][Connector-V2][Doris] Change Doris Config Prefix (#3856)
* [Improve][Connector-V2][Doris] Change Doris Config Prefix
---
docs/en/connector-v2/sink/Doris.md | 26 ++++++++++++++--------
.../connectors/doris/config/SinkConfig.java | 20 ++++++++---------
.../connectors/doris/sink/DorisSinkFactory.java | 2 +-
.../src/test/resources/doris-jdbc-to-doris.conf | 6 +++--
4 files changed, 32 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index df75b5ee7..1886fd523 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -25,7 +25,7 @@ The internal implementation of Doris sink connector is cached and imported by st
| max_retries | int | no | 1 |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
-| sink.properties.* | doris stream load config | no | - |
+| doris.config | map | no | - |
### node_urls [list]
@@ -75,11 +75,11 @@ Using as a multiplier for generating the next delay for backoff
The amount of time to wait before attempting to retry a request to `Doris`
-### sink.properties.* [doris stream load config]
+### doris.config [map]
-The parameter of the stream load `data_desc`
-The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name
-For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`
+The parameter of the stream load `data_desc`, you can get more detail at this link:
+
+https://doris.apache.org/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD/
#### Supported import data formats
@@ -98,8 +98,10 @@ sink {
database = "test"
table = "e2e_table_sink"
batch_max_rows = 100
- sink.properties.format = "JSON"
- sink.properties.strip_outer_array = true
+ doris.config = {
+ format = "JSON"
+ strip_outer_array = true
+ }
}
}
@@ -118,12 +120,18 @@ sink {
batch_max_rows = 100
sink.properties.format = "CSV"
sink.properties.column_separator = ","
+ doris.config = {
+ format = "CSV"
+ column_separator = ","
+ }
}
}
```
## Changelog
-### next version
+### 2.3.0-beta 2022-10-20
+- Add Doris Sink Connector
-- Add Doris Sink Connector
\ No newline at end of file
+### Next version
+- [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856)
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
index d1f88cbc6..dab49a8c0 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.connectors.doris.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -70,11 +70,11 @@ public class SinkConfig {
.noDefaultValue()
.withDescription("The name of Doris table");
- public static final Option<String> DORIS_SINK_CONFIG_PREFIX = Options.key("sink.properties.")
- .stringType()
+ public static final Option<Map<String, String>> DORIS_CONFIG = Options.key("doris.config")
+ .mapType()
.noDefaultValue()
.withDescription("The parameter of the stream load data_desc. " +
- "The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name ");
+ "The way to specify the parameter is to add the original stream load parameter into map");
public static final Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
.intType()
@@ -178,11 +178,11 @@ public class SinkConfig {
}
private static void parseSinkStreamLoadProperties(Config pluginConfig, SinkConfig sinkConfig) {
- Config dorisConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
- DORIS_SINK_CONFIG_PREFIX.key(), false);
- dorisConfig.entrySet().forEach(entry -> {
- final String configKey = entry.getKey().toLowerCase();
- sinkConfig.streamLoadProps.put(configKey, entry.getValue().unwrapped().toString());
- });
+ if (CheckConfigUtil.isValidParam(pluginConfig, DORIS_CONFIG.key())) {
+ pluginConfig.getObject(DORIS_CONFIG.key()).forEach((key, value) -> {
+ final String configKey = key.toLowerCase();
+ sinkConfig.streamLoadProps.put(configKey, value.unwrapped().toString());
+ });
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
index 5fb721e7e..6d111378f 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
@@ -37,7 +37,7 @@ public class DorisSinkFactory implements TableSinkFactory {
.required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
.optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, SinkConfig.MAX_RETRY_BACKOFF_MS,
- SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_SINK_CONFIG_PREFIX)
+ SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_CONFIG)
.build();
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf
index ca6fb0437..3d2ea5f9b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf
@@ -41,8 +41,10 @@ sink {
database = "test"
table = "e2e_table_sink"
batch_max_rows = 100
- sink.properties.format = "JSON"
- sink.properties.strip_outer_array = true
max_retries = 3
+ doris.config = {
+ format = "JSON"
+ strip_outer_array = true
+ }
}
}
\ No newline at end of file