You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/01 04:49:06 UTC

[incubator-doris] 03/07: support use char like \x01 in flink-doris-sink column & line delimiter (#6937)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b58ae3e02bf7ac0937cbfe502222002a91abb8d8
Author: wunan1210 <wu...@gmail.com>
AuthorDate: Fri Oct 29 13:56:52 2021 +0800

    support use char like \x01 in flink-doris-sink column & line delimiter (#6937)
    
    * support use char like \x01 in flink-doris-sink column & line delimiter
    
    * extend imports
    
    * add docs
---
 docs/en/extending-doris/flink-doris-connector.md   |  2 +-
 .../zh-CN/extending-doris/flink-doris-connector.md |  2 +-
 .../doris/flink/rest/models/RespContent.java       |  4 ++
 .../flink/table/DorisDynamicOutputFormat.java      | 44 +++++++++++++++++++---
 .../apache/doris/flink/table/DorisStreamLoad.java  |  3 +-
 5 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/docs/en/extending-doris/flink-doris-connector.md b/docs/en/extending-doris/flink-doris-connector.md
index c42d237..961da90 100644
--- a/docs/en/extending-doris/flink-doris-connector.md
+++ b/docs/en/extending-doris/flink-doris-connector.md
@@ -257,7 +257,7 @@ outputFormat.close();
 | sink.batch.size                        | 100            | Maximum number of lines in a single write BE                                             |
 | sink.max-retries                        | 1            | Number of retries after writing BE failed                                              |
 | sink.batch.interval                         | 1s            | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
-| sink.properties.*     | --               | The stream load parameters.eg:sink.properties.column_separator' = ','.<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'|
+| sink.properties.*     | --               | The stream load parameters.eg:sink.properties.column_separator' = ','. Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\\x01' will translate to binary 0x01<br /> Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'|
 
 
 ## Doris & Flink Column Type Mapping
diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md b/docs/zh-CN/extending-doris/flink-doris-connector.md
index 9ea1eba..4a0a8cb 100644
--- a/docs/zh-CN/extending-doris/flink-doris-connector.md
+++ b/docs/zh-CN/extending-doris/flink-doris-connector.md
@@ -260,7 +260,7 @@ outputFormat.close();
 | sink.batch.size     | 100                | 单次写BE的最大行数        |
 | sink.max-retries     | 1                | 写BE失败之后的重试次数       |
 | sink.batch.interval     | 1s                | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
-| sink.properties.*     | --               | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' |
+| sink.properties.*     | --               | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。如果需要特殊字符作为分隔符, 可以加上参数'sink.properties.escape_delimiters' = 'true', '\\x01'会被转换为二进制的0x01<br /> 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' |
 
 
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
index b86b3dd..07a356c 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
@@ -93,4 +93,8 @@ public class RespContent {
         }
 
     }
+
+    public String getErrorURL() {
+        return ErrorURL;
+    }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 0fd154a..f4f49bd 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -38,11 +38,14 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 
@@ -62,9 +65,11 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
     private static final String FORMAT_KEY = "format";
     private static final String FORMAT_JSON_VALUE = "json";
     private static final String NULL_VALUE = "\\N";
+    private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
+    private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
 
-    private final String fieldDelimiter;
-    private final String lineDelimiter;
+    private String fieldDelimiter;
+    private String lineDelimiter;
     private final String[] fieldNames;
     private final boolean jsonFormat;
     private DorisOptions options;
@@ -88,10 +93,26 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
-        this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
-                FIELD_DELIMITER_DEFAULT);
-        this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
-                LINE_DELIMITER_DEFAULT);
+
+        Properties streamLoadProp=executionOptions.getStreamLoadProp();
+
+        boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
+        if (ifEscape) {
+            this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
+                    FIELD_DELIMITER_DEFAULT));
+            this.lineDelimiter = escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY,
+                    LINE_DELIMITER_DEFAULT));
+
+            if (streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) {
+                streamLoadProp.remove(ESCAPE_DELIMITERS_KEY);
+            }
+        } else {
+            this.fieldDelimiter = streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
+                    FIELD_DELIMITER_DEFAULT);
+            this.lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
+                    LINE_DELIMITER_DEFAULT);
+        }
+
         this.fieldNames = fieldNames;
         this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
         this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
@@ -100,6 +121,17 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
         }
     }
 
+    private String escapeString( String s) {
+            Pattern p = Pattern.compile("\\\\x(\\d{2})");
+            Matcher m = p.matcher(s);
+
+            StringBuffer buf = new StringBuffer();
+            while (m.find()) {
+                m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1))));
+            }
+            m.appendTail(buf);
+            return buf.toString();
+    }
 
     @Override
     public void configure(Configuration configuration) {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index c37e640..b897ff2 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -94,7 +94,8 @@ public class DorisStreamLoad implements Serializable {
             try {
                 RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class);
                 if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                    throw new StreamLoadException("stream load error: " + respContent.getMessage());
+                    String errMsg=String.format("stream load error: %s, see more in %s",respContent.getMessage(),respContent.getErrorURL());
+                    throw new StreamLoadException(errMsg);
                 }
             } catch (IOException e) {
                 throw new StreamLoadException(e);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org