You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2021/12/29 07:06:50 UTC

[incubator-seatunnel] branch dev updated: fix param (#843)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5aa7998  fix param (#843)
5aa7998 is described below

commit 5aa79989fc3e618083e57b2f500f5b5e8b52846c
Author: wudi <67...@qq.com>
AuthorDate: Wed Dec 29 15:06:46 2021 +0800

    fix param (#843)
    
    Co-authored-by: wudi <wu...@shuhaisc.com>
---
 docs/en/configuration/sink-plugins/Doris-Flink.md  | 71 ++++++++++++++++++++++
 docs/en/configuration/sink-plugins/Doris.md        |  3 +-
 .../configuration/sink-plugins/Doris-Flink.md      | 70 +++++++++++++++++++++
 docs/zh-CN/configuration/sink-plugins/Doris.md     | 68 ---------------------
 .../org/apache/seatunnel/flink/sink/DorisSink.java | 28 ++++-----
 5 files changed, 156 insertions(+), 84 deletions(-)

diff --git a/docs/en/configuration/sink-plugins/Doris-Flink.md b/docs/en/configuration/sink-plugins/Doris-Flink.md
new file mode 100644
index 0000000..236890f
--- /dev/null
+++ b/docs/en/configuration/sink-plugins/Doris-Flink.md
@@ -0,0 +1,71 @@
+# Sink plugin: Doris [Flink]
+
+### Description
+
+Write Data to a Doris Table.
+
+### Options
+
+| name | type | required | default value | engine |
+| --- | --- | --- | --- | --- |
+| fenodes | string | yes | - | Flink |
+| database | string | yes | - | Flink  |
+| table | string | yes | - | Flink  |
+| user	 | string | yes | - | Flink  |
+| password	 | string | yes | - | Flink  |
+| batch_size	 | int | no |  100 | Flink  |
+| interval	 | int | no |1000 | Flink |
+| max_retries	 | int | no | 1 | Flink|
+| doris.*	 | - | no | - | Flink  |
+
+##### fenodes [string]
+
+Doris FE http address
+
+##### database [string]
+
+Doris database name
+
+##### table [string]
+
+Doris table name
+
+##### user [string]
+
+Doris username
+
+##### password [string]
+
+Doris password
+
+##### batch_size [int]
+
+Maximum number of lines in a single write Doris,default value is 100.
+
+##### interval [int]
+
+The flush interval millisecond, after which the asynchronous thread will write the data in the cache to Doris.Set to 0 to turn off periodic writing.
+
+##### max_retries [int]
+
+Number of retries after writing Doris failed
+
+##### doris.* [string]
+
+The doris stream load parameters.you can use 'doris.' prefix + stream_load properties. eg:doris.column_separator' = ','
+[More Doris stream_load Configurations](https://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
+
+### Examples
+
+```
+DorisSink {
+	 fenodes = "127.0.0.1:8030"
+	 database = database
+	 table = table
+	 user = root
+	 password = password
+	 batch_size = 1
+	 doris.column_separator="\t"
+     doris.columns="id,user_name,user_name_cn,create_time,last_login_time"
+}
+ ```
\ No newline at end of file
diff --git a/docs/en/configuration/sink-plugins/Doris.md b/docs/en/configuration/sink-plugins/Doris.md
index 2a1ee26..0b23039 100644
--- a/docs/en/configuration/sink-plugins/Doris.md
+++ b/docs/en/configuration/sink-plugins/Doris.md
@@ -1,4 +1,4 @@
-# Sink plugin: Doirs
+# Sink plugin: Doirs [Spark]
 
 ### Description:
 Use Spark Batch Engine ETL Data to Doris.
@@ -29,7 +29,6 @@ Doris user's password
 Doris number of submissions per batch
 ##### doris. [string]
 Doris stream_load properties,you can use 'doris.' prefix + stream_load properties
-
 [More Doris stream_load Configurations](https://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
 
 ### Examples
diff --git a/docs/zh-CN/configuration/sink-plugins/Doris-Flink.md b/docs/zh-CN/configuration/sink-plugins/Doris-Flink.md
new file mode 100644
index 0000000..4b0df1b
--- /dev/null
+++ b/docs/zh-CN/configuration/sink-plugins/Doris-Flink.md
@@ -0,0 +1,70 @@
+# Sink plugin: Doris [Flink]
+
+### 描述
+
+向Doris表中写数据
+
+### 配置
+
+| 配置项 | 类型 | 必填 | 默认值 | 支持引擎 |
+| --- | --- | --- | --- | --- |
+| fenodes | string | yes | - | Flink |
+| database | string | yes | - | Flink  |
+| table | string | yes | - | Flink  |
+| user	 | string | yes | - | Flink  |
+| password	 | string | yes | - | Flink  |
+| batch_size	 | int | no |  100 | Flink  |
+| interval	 | int | no |1000 | Flink |
+| max_retries	 | int | no | 1 | Flink|
+| doris.*	 | - | no | - | Flink  |
+
+##### fenodes [string]
+
+Doris FE http 地址
+
+##### database [string]
+
+Doris 数据库名称
+
+##### table [string]
+
+Doris 表名称
+
+##### user [string]
+
+Doris 用户名
+
+##### password [string]
+
+Doris 密码
+
+##### batch_size [int]
+
+单次写Doris的最大行数,默认值100
+
+##### interval [int]
+
+flush 间隔时间(毫秒),超过该时间后异步线程将 缓存中数据写入Doris。设置为0表示关闭定期写入。
+
+##### max_retries [int]
+
+写Doris失败之后的重试次数
+
+##### doris.* [string]
+
+Stream load 的导入参数。例如:'doris.column_separator' = ', ' 定义列分隔符
+
+### Examples
+
+```
+DorisSink {
+	 fenodes = "127.0.0.1:8030"
+	 database = database
+	 table = table
+	 user = root
+	 password = password
+	 batch_size = 1
+	 doris.column_separator="\t"
+     doris.columns="id,user_name,user_name_cn,create_time,last_login_time"
+}
+ ```
diff --git a/docs/zh-CN/configuration/sink-plugins/Doris.md b/docs/zh-CN/configuration/sink-plugins/Doris.md
deleted file mode 100644
index f0dab04..0000000
--- a/docs/zh-CN/configuration/sink-plugins/Doris.md
+++ /dev/null
@@ -1,68 +0,0 @@
-# Sink plugin: Doris
-
-### 描述
-
-向Doris表中写数据
-
-### 配置
-
-| 配置项 | 类型 | 必填 | 默认值 | 支持引擎 |
-| --- | --- | --- | --- | --- |
-| fenodes | string | yes | - | Flink |
-| db_name | string | yes | - | Flink |
-| table_name | string | yes | - | Flink |
-| username	 | string | yes | - | Flink |
-| password	 | string | yes | - | Flink |
-| doris_sink_batch_size	 | int | no |  100 | Flink |
-| doris_sink_interval	 | int | no |1000 | Flink |
-| doris_sink_max_retries	 | int | no | 1 | Flink |
-| doris_sink_properties.*	 | - | no | - | Flink |
-
-##### fenodes [string]
-
-Doris FE http 地址
-
-##### db_name [string]
-
-Doris 数据库名称
-
-##### table_name [string]
-
-Doris 表名称
-
-##### username [string]
-
-Doris 用户名
-
-##### password [string]
-
-Doris 密码
-
-##### doris_sink_batch_size [int]
-
-单次写Doris的最大行数,默认值100
-
-##### doris_sink_interval [int]
-
-flush 间隔时间(毫秒),超过该时间后异步线程将 缓存中数据写入Doris。设置为0表示关闭定期写入。
-
-##### doris_sink_max_retries [int]
-
-写Doris失败之后的重试次数
-
-##### doris_sink_properties.* [string]
-
-Stream load 的导入参数。例如:'doris_sink_properties.column_separator' = ', ' 定义列分隔符
-
-### Examples
-
-```
-DorisSink {
-	 fenodes = "127.0.0.1:8030"
-	 db_name = database
-	 table_name = table
-	 username = root
-	 password = password
-	 doris_sink_size = 1
-}
-```
diff --git a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
index 829ec60..bd8a49b 100644
--- a/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
+++ b/seatunnel-connectors/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/sink/DorisSink.java
@@ -61,30 +61,30 @@ public class DorisSink implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row,
 
     @Override
     public CheckResult checkConfig() {
-        return CheckConfigUtil.check(config, "fenodes", "username", "password", "table_name", "db_name");
+        return CheckConfigUtil.check(config, "fenodes", "user", "password", "table", "database");
     }
 
     @Override
     public void prepare(FlinkEnvironment prepareEnv) {
         fenodes = config.getString("fenodes");
-        username = config.getString("username");
-        tableName = config.getString("table_name");
+        username = config.getString("user");
+        tableName = config.getString("table");
         password = config.getString("password");
-        dbName = config.getString("db_name");
-        if (config.hasPath("doris_sink_batch_size")) {
-            batchSize = config.getInt("doris_sink_batch_size");
-            Preconditions.checkArgument(batchSize > 0, "doris_sink_batch_size must be greater than 0");
+        dbName = config.getString("database");
+        if (config.hasPath("batch_size")) {
+            batchSize = config.getInt("batch_size");
+            Preconditions.checkArgument(batchSize > 0, "batch_size must be greater than 0");
         }
-        if (config.hasPath("doris_sink_interval")) {
-            batchIntervalMs = config.getInt("doris_sink_interval");
-            Preconditions.checkArgument(batchIntervalMs > 0, "doris_sink_interval must be greater than 0");
+        if (config.hasPath("interval")) {
+            batchIntervalMs = config.getInt("interval");
+            Preconditions.checkArgument(batchIntervalMs > 0, "interval must be greater than 0");
         }
-        if (config.hasPath("doris_sink_max_retries")) {
-            maxRetries = config.getInt("doris_sink_max_retries");
-            Preconditions.checkArgument(maxRetries > 0, "doris_sink_max_retries must be greater than 0");
+        if (config.hasPath("max_retries")) {
+            maxRetries = config.getInt("max_retries");
+            Preconditions.checkArgument(maxRetries > 0, "max_retries must be greater than 0");
         }
 
-        String producerPrefix = "doris_sink_properties.";
+        String producerPrefix = "doris.";
         PropertiesUtil.setProperties(config, streamLoadProp, producerPrefix, false);
     }