You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/26 06:12:45 UTC

[doris] branch branch-1.2-lts updated: [feature](load) stream load trim double quotes for csv (#15241)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 681155c68a [feature](load) stream load trim double quotes for csv (#15241)
681155c68a is described below

commit 681155c68a92c705792e0f1bf1bf4a99af863255
Author: Xin Liao <li...@126.com>
AuthorDate: Mon Dec 26 11:45:54 2022 +0800

    [feature](load) stream load trim double quotes for csv (#15241)
---
 be/src/http/action/stream_load.cpp                 |  7 +++
 be/src/http/http_common.h                          |  1 +
 be/src/vec/exec/format/csv/csv_reader.cpp          | 14 +++++
 be/src/vec/exec/format/csv/csv_reader.h            |  1 +
 .../Load/STREAM-LOAD.md                            |  4 +-
 .../Load/STREAM-LOAD.md                            |  4 +-
 .../org/apache/doris/analysis/DataDescription.java |  6 ++
 .../org/apache/doris/load/BrokerFileGroup.java     |  6 ++
 .../apache/doris/planner/StreamLoadScanNode.java   |  1 +
 .../doris/planner/external/LoadScanProvider.java   |  1 +
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  4 ++
 .../java/org/apache/doris/task/StreamLoadTask.java |  9 +++
 gensrc/thrift/FrontendService.thrift               |  1 +
 gensrc/thrift/PlanNodes.thrift                     |  4 ++
 .../load_p0/stream_load/csv_with_double_quotes.csv |  8 +++
 .../stream_load/test_csv_with_double_quotes.out    | 21 +++++++
 .../stream_load/test_csv_with_double_quotes.groovy | 64 ++++++++++++++++++++++
 17 files changed, 154 insertions(+), 2 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index b2acc0cedb..7a1b9636ec 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -565,6 +565,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
     if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
         request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS));
     }
+    if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) {
+        if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) {
+            request.__set_trim_double_quotes(true);
+        } else {
+            request.__set_trim_double_quotes(false);
+        }
+    }
 
 #ifndef BE_TEST
     // plan this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 1ae254b062..e61c828fb4 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -51,6 +51,7 @@ static const std::string HTTP_COMPRESS_TYPE = "compress_type";
 static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism";
 static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
 static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns";
+static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
 
 static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
 static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 48f8d84dc7..1eaef7ad87 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -124,6 +124,10 @@ Status CsvReader::init_reader(bool is_load) {
     _line_delimiter = _params.file_attributes.text_params.line_delimiter;
     _line_delimiter_length = _line_delimiter.size();
 
+    if (_params.file_attributes.__isset.trim_double_quotes) {
+        _trim_double_quotes = _params.file_attributes.trim_double_quotes;
+    }
+
     // create decompressor.
     // _decompressor may be nullptr if this is not a compressed file
     RETURN_IF_ERROR(_create_decompressor());
@@ -412,6 +416,11 @@ void CsvReader::_split_line(const Slice& line) {
                             non_space--;
                         }
                     }
+                    if (_trim_double_quotes && (non_space - 1) > start &&
+                        *(value + start) == '\"' && *(value + non_space - 1) == '\"') {
+                        start++;
+                        non_space--;
+                    }
                     _split_values.emplace_back(value + start, non_space - start);
                     start = curpos + _value_separator_length;
                     curpos = start;
@@ -428,6 +437,11 @@ void CsvReader::_split_line(const Slice& line) {
                 non_space--;
             }
         }
+        if (_trim_double_quotes && (non_space - 1) > start && *(value + start) == '\"' &&
+            *(value + non_space - 1) == '\"') {
+            start++;
+            non_space--;
+        }
         _split_values.emplace_back(value + start, non_space - start);
     }
 }
diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h
index 5bb14523e3..972c87da3c 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -107,6 +107,7 @@ private:
     std::string _line_delimiter;
     int _value_separator_length;
     int _line_delimiter_length;
+    bool _trim_double_quotes = false;
 
     // save source text which have been splitted.
     std::vector<Slice> _split_values;
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 560cc80ea3..fcb2b92b31 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -177,10 +177,12 @@ ERRORS:
 
     where url is the url given by ErrorURL.
 
-23: compress_type
+24. compress_type
 
     Specify compress type file. Only support compressed csv file now. Support gz, lzo, bz2, lz4, lzop, deflate.
 
+25. trim_double_quotes: Boolean type, The default value is false. True means that the outermost double quotes of each field in the csv file are trimmed.
+
 ### Example
 
 1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 97473b3d1d..5dadc7f0e7 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -174,10 +174,12 @@ ERRORS:
 
     其中 url 为 ErrorURL 给出的 url。
 
-23: compress_type
+24. compress_type
 
     指定文件的压缩格式。目前只支持 csv 文件的压缩。支持 gz, lzo, bz2, lz4, lzop, deflate 压缩格式。
 
+25. trim_double_quotes: 布尔类型,默认值为 false,为 true 时表示裁剪掉 csv 文件每个字段最外层的双引号。
+
 ### Example
 
 1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index b67574e743..72b494a89d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -145,6 +145,7 @@ public class DataDescription {
     private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;
     private final Expr deleteCondition;
     private final Map<String, String> properties;
+    private boolean trimDoubleQuotes = false;
 
     public DataDescription(String tableName,
                            PartitionNames partitionNames,
@@ -250,6 +251,7 @@ public class DataDescription {
         this.readJsonByLine = taskInfo.isReadJsonByLine();
         this.numAsString = taskInfo.isNumAsString();
         this.properties = Maps.newHashMap();
+        this.trimDoubleQuotes = taskInfo.getTrimDoubleQuotes();
     }
 
     private void getFileFormatAndCompressType(LoadTaskInfo taskInfo) {
@@ -641,6 +643,10 @@ public class DataDescription {
         return readJsonByLine;
     }
 
+    public boolean getTrimDoubleQuotes() {
+        return trimDoubleQuotes;
+    }
+
     /*
      * Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList
      * Example:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 497f0c2d23..887062bda1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -105,6 +105,7 @@ public class BrokerFileGroup implements Writable {
     private boolean fuzzyParse = true;
     private boolean readJsonByLine = false;
     private boolean numAsString = false;
+    private boolean trimDoubleQuotes = false;
 
     // for unit test and edit log persistence
     private BrokerFileGroup() {
@@ -262,6 +263,7 @@ public class BrokerFileGroup implements Writable {
             readJsonByLine = dataDescription.isReadJsonByLine();
             numAsString = dataDescription.isNumAsString();
         }
+        trimDoubleQuotes = dataDescription.getTrimDoubleQuotes();
     }
 
     public long getTableId() {
@@ -416,6 +418,10 @@ public class BrokerFileGroup implements Writable {
         return fileFormat.equalsIgnoreCase("parquet") || fileFormat.equalsIgnoreCase("orc");
     }
 
+    public boolean getTrimDoubleQuotes() {
+        return trimDoubleQuotes;
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 96e8410aef..621c7a9306 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -179,6 +179,7 @@ public class StreamLoadScanNode extends LoadScanNode {
             params.setLineDelimiter((byte) '\n');
             params.setLineDelimiterLength(1);
         }
+        params.setTrimDoubleQuotes(taskInfo.getTrimDoubleQuotes());
         params.setDestTupleId(desc.getId().asInt());
         brokerScanRange.setParams(params);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 8f7ea2bb6e..ae2bf5d4ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -131,6 +131,7 @@ public class LoadScanProvider implements FileScanProviderIf {
         fileAttributes.setReadJsonByLine(fileGroup.isReadJsonByLine());
         fileAttributes.setReadByColumnDef(true);
         fileAttributes.setHeaderType(getHeaderType(fileGroup.getFileFormat()));
+        fileAttributes.setTrimDoubleQuotes(fileGroup.getTrimDoubleQuotes());
     }
 
     private String getHeaderType(String formatType) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 642597246f..e384394653 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -97,6 +97,10 @@ public interface LoadTaskInfo {
 
     List<String> getHiddenColumns();
 
+    default boolean getTrimDoubleQuotes() {
+        return false;
+    }
+
     class ImportColumnDescs {
         public List<ImportColumnDesc> descs = Lists.newArrayList();
         public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 59393b7d01..2a80ceef90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -82,6 +82,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private boolean loadToSingleTablet = false;
     private String headerType = "";
     private List<String> hiddenColumns;
+    private boolean trimDoubleQuotes = false;
 
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType,
             TFileCompressType compressType) {
@@ -251,6 +252,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return hiddenColumns;
     }
 
+    @Override
+    public boolean getTrimDoubleQuotes() {
+        return trimDoubleQuotes;
+    }
+
     public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
         StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
                 request.getFileType(), request.getFormatType(),
@@ -350,6 +356,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetHiddenColumns()) {
             hiddenColumns = Arrays.asList(request.getHiddenColumns().replaceAll("\\s+", "").split(","));
         }
+        if (request.isSetTrimDoubleQuotes()) {
+            trimDoubleQuotes = request.isTrimDoubleQuotes();
+        }
     }
 
     // used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 3036c1d53f..4c7d062727 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -546,6 +546,7 @@ struct TStreamLoadPutRequest {
     39: optional string hidden_columns
     40: optional PlanNodes.TFileCompressType compress_type
     41: optional i64 file_size // only for stream load with parquet or orc
+    42: optional bool trim_double_quotes // trim double quotes for csv
 }
 
 struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index b14610587a..a7a21a0499 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -211,6 +211,8 @@ struct TBrokerScanRangeParams {
     12: optional i32 line_delimiter_length = 1;
     13: optional string column_separator_str;
     14: optional string line_delimiter_str;
+    // trim double quotes for csv
+    15: optional bool trim_double_quotes;
 
 }
 
@@ -255,6 +257,8 @@ struct TFileAttributes {
     8: optional bool read_by_column_def;
     // csv with header type
     9: optional string header_type;
+    // trim double quotes for csv
+    10: optional bool trim_double_quotes;
 }
 
 struct TIcebergDeleteFileDesc {
diff --git a/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv b/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv
new file mode 100644
index 0000000000..44ae3f4550
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv
@@ -0,0 +1,8 @@
+1,2,3,abc,2022-12-01,2022-12-01:09:30:31
+2,3,3,abc,2022-12-01,2022-12-01:09:30:31
+3,4,3,abc,2022-12-01,2022-12-01:09:30:31
+4,5,3,abc,2022-12-01,2022-12-01:09:30:31
+"5","6","3","abc","2022-12-01","2022-12-01:09:30:31"
+"6","7","3","abc","2022-12-01","2022-12-01:09:30:31"
+"7","8","3","abc","2022-12-01","2022-12-01:09:30:31"
+"8","9","3","abc","2022-12-01","2022-12-01:09:30:31"
diff --git a/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out b/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out
new file mode 100644
index 0000000000..0ae5ebe7f7
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql --
+\N	\N	\N	"abc"	\N	\N
+\N	\N	\N	"abc"	\N	\N
+\N	\N	\N	"abc"	\N	\N
+\N	\N	\N	"abc"	\N	\N
+1	2	3	abc	2022-12-01	2022-12-01T09:30:31
+2	3	3	abc	2022-12-01	2022-12-01T09:30:31
+3	4	3	abc	2022-12-01	2022-12-01T09:30:31
+4	5	3	abc	2022-12-01	2022-12-01T09:30:31
+
+-- !sql --
+1	2	3	abc	2022-12-01	2022-12-01T09:30:31
+2	3	3	abc	2022-12-01	2022-12-01T09:30:31
+3	4	3	abc	2022-12-01	2022-12-01T09:30:31
+4	5	3	abc	2022-12-01	2022-12-01T09:30:31
+5	6	3	abc	2022-12-01	2022-12-01T09:30:31
+6	7	3	abc	2022-12-01	2022-12-01T09:30:31
+7	8	3	abc	2022-12-01	2022-12-01T09:30:31
+8	9	3	abc	2022-12-01	2022-12-01T09:30:31
+
diff --git a/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy b/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy
new file mode 100644
index 0000000000..429e8c88fd
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy
@@ -0,0 +1,64 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_csv_with_double_quotes", "p0") {
+    def tableName = "test_csv_with_double_quotes"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4)  NULL,
+            `v2` string  NULL,
+            `v3` date  NULL,
+            `v4` datetime  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+
+        file 'csv_with_double_quotes.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+    qt_sql "select * from ${tableName} order by k1, k2"
+
+    sql """truncate table ${tableName}"""
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'trim_double_quotes', 'true'
+
+        file 'csv_with_double_quotes.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+    qt_sql "select * from ${tableName} order by k1, k2"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org