You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/01/07 07:35:56 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Doris] Change Doris Config Prefix (#3856)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 16e39a506 [Improve][Connector-V2][Doris] Change Doris Config Prefix (#3856)
16e39a506 is described below

commit 16e39a506b61ca2bf3c0ec53fb77beed43133a7f
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Sat Jan 7 15:35:49 2023 +0800

    [Improve][Connector-V2][Doris] Change Doris Config Prefix (#3856)
    
    * [Improve][Connector-V2][Doris] Change Doris Config Prefix
---
 docs/en/connector-v2/sink/Doris.md                 | 26 ++++++++++++++--------
 .../connectors/doris/config/SinkConfig.java        | 20 ++++++++---------
 .../connectors/doris/sink/DorisSinkFactory.java    |  2 +-
 .../src/test/resources/doris-jdbc-to-doris.conf    |  6 +++--
 4 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index df75b5ee7..1886fd523 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -25,7 +25,7 @@ The internal implementation of Doris sink connector is cached and imported by st
 | max_retries                 | int                          | no       | 1               |
 | retry_backoff_multiplier_ms | int                          | no       | -               |
 | max_retry_backoff_ms        | int                          | no       | -               |
-| sink.properties.*           | doris stream load config     | no       | -               |
+| doris.config                | map                          | no       | -               |
 
 ### node_urls [list]
 
@@ -75,11 +75,11 @@ Using as a multiplier for generating the next delay for backoff
 
 The amount of time to wait before attempting to retry a request to `Doris`
 
-### sink.properties.*  [doris stream load config]
+### doris.config [map]
 
-The parameter of the stream load `data_desc`
-The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name 
-For example, the way to specify `strip_outer_array` is: `sink.properties.strip_outer_array`
+The parameter of the stream load `data_desc`, you can get more detail at this link:
+
+https://doris.apache.org/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD/
 
 #### Supported import data formats
 
@@ -98,8 +98,10 @@ sink {
         database = "test"
         table = "e2e_table_sink"
         batch_max_rows = 100
-        sink.properties.format = "JSON"
-        sink.properties.strip_outer_array = true
+        doris.config = {
+          format = "JSON"
+          strip_outer_array = true
+        }
     }
 }
 
@@ -118,12 +120,18 @@ sink {
         batch_max_rows = 100
         sink.properties.format = "CSV"
         sink.properties.column_separator = ","
+        doris.config = {
+          format = "CSV"
+          column_separator = ","
+        }
     }
 }
 ```
 
 ## Changelog
 
-### next version
+### 2.3.0-beta 2022-10-20
+- Add Doris Sink Connector
 
-- Add Doris Sink Connector
\ No newline at end of file
+### Next version
+- [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856)
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
index d1f88cbc6..dab49a8c0 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.connectors.doris.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -70,11 +70,11 @@ public class SinkConfig {
             .noDefaultValue()
             .withDescription("The name of Doris table");
 
-    public static final Option<String> DORIS_SINK_CONFIG_PREFIX = Options.key("sink.properties.")
-            .stringType()
+    public static final Option<Map<String, String>> DORIS_CONFIG = Options.key("doris.config")
+            .mapType()
             .noDefaultValue()
             .withDescription("The parameter of the stream load data_desc. " +
-                    "The way to specify the parameter is to add the prefix `sink.properties.` to the original stream load parameter name ");
+                    "The way to specify the parameter is to add the original stream load parameter into map");
 
     public static final Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
             .intType()
@@ -178,11 +178,11 @@ public class SinkConfig {
     }
 
     private static void parseSinkStreamLoadProperties(Config pluginConfig, SinkConfig sinkConfig) {
-        Config dorisConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
-                DORIS_SINK_CONFIG_PREFIX.key(), false);
-        dorisConfig.entrySet().forEach(entry -> {
-            final String configKey = entry.getKey().toLowerCase();
-            sinkConfig.streamLoadProps.put(configKey, entry.getValue().unwrapped().toString());
-        });
+        if (CheckConfigUtil.isValidParam(pluginConfig, DORIS_CONFIG.key())) {
+            pluginConfig.getObject(DORIS_CONFIG.key()).forEach((key, value) -> {
+                final String configKey = key.toLowerCase();
+                sinkConfig.streamLoadProps.put(configKey, value.unwrapped().toString());
+            });
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
index 5fb721e7e..6d111378f 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
@@ -37,7 +37,7 @@ public class DorisSinkFactory implements TableSinkFactory {
                 .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
                 .optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
                         SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, SinkConfig.MAX_RETRY_BACKOFF_MS,
-                        SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_SINK_CONFIG_PREFIX)
+                        SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.DORIS_CONFIG)
                 .build();
     }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf
index ca6fb0437..3d2ea5f9b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris-jdbc-to-doris.conf
@@ -41,8 +41,10 @@ sink {
         database = "test"
         table = "e2e_table_sink"
         batch_max_rows = 100
-        sink.properties.format = "JSON"
-        sink.properties.strip_outer_array = true
         max_retries = 3
+        doris.config = {
+          format = "JSON"
+          strip_outer_array = true
+        }
     }
 }
\ No newline at end of file