You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/26 06:12:45 UTC
[doris] branch branch-1.2-lts updated: [feature](load) stream load trim double quotes for csv (#15241)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 681155c68a [feature](load) stream load trim double quotes for csv (#15241)
681155c68a is described below
commit 681155c68a92c705792e0f1bf1bf4a99af863255
Author: Xin Liao <li...@126.com>
AuthorDate: Mon Dec 26 11:45:54 2022 +0800
[feature](load) stream load trim double quotes for csv (#15241)
---
be/src/http/action/stream_load.cpp | 7 +++
be/src/http/http_common.h | 1 +
be/src/vec/exec/format/csv/csv_reader.cpp | 14 +++++
be/src/vec/exec/format/csv/csv_reader.h | 1 +
.../Load/STREAM-LOAD.md | 4 +-
.../Load/STREAM-LOAD.md | 4 +-
.../org/apache/doris/analysis/DataDescription.java | 6 ++
.../org/apache/doris/load/BrokerFileGroup.java | 6 ++
.../apache/doris/planner/StreamLoadScanNode.java | 1 +
.../doris/planner/external/LoadScanProvider.java | 1 +
.../java/org/apache/doris/task/LoadTaskInfo.java | 4 ++
.../java/org/apache/doris/task/StreamLoadTask.java | 9 +++
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PlanNodes.thrift | 4 ++
.../load_p0/stream_load/csv_with_double_quotes.csv | 8 +++
.../stream_load/test_csv_with_double_quotes.out | 21 +++++++
.../stream_load/test_csv_with_double_quotes.groovy | 64 ++++++++++++++++++++++
17 files changed, 154 insertions(+), 2 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index b2acc0cedb..7a1b9636ec 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -565,6 +565,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS));
}
+ if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) {
+ if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) {
+ request.__set_trim_double_quotes(true);
+ } else {
+ request.__set_trim_double_quotes(false);
+ }
+ }
#ifndef BE_TEST
// plan this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 1ae254b062..e61c828fb4 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -51,6 +51,7 @@ static const std::string HTTP_COMPRESS_TYPE = "compress_type";
static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism";
static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns";
+static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 48f8d84dc7..1eaef7ad87 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -124,6 +124,10 @@ Status CsvReader::init_reader(bool is_load) {
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();
+ if (_params.file_attributes.__isset.trim_double_quotes) {
+ _trim_double_quotes = _params.file_attributes.trim_double_quotes;
+ }
+
// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());
@@ -412,6 +416,11 @@ void CsvReader::_split_line(const Slice& line) {
non_space--;
}
}
+ if (_trim_double_quotes && (non_space - 1) > start &&
+ *(value + start) == '\"' && *(value + non_space - 1) == '\"') {
+ start++;
+ non_space--;
+ }
_split_values.emplace_back(value + start, non_space - start);
start = curpos + _value_separator_length;
curpos = start;
@@ -428,6 +437,11 @@ void CsvReader::_split_line(const Slice& line) {
non_space--;
}
}
+ if (_trim_double_quotes && (non_space - 1) > start && *(value + start) == '\"' &&
+ *(value + non_space - 1) == '\"') {
+ start++;
+ non_space--;
+ }
_split_values.emplace_back(value + start, non_space - start);
}
}
diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h
index 5bb14523e3..972c87da3c 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -107,6 +107,7 @@ private:
std::string _line_delimiter;
int _value_separator_length;
int _line_delimiter_length;
+ bool _trim_double_quotes = false;
// save source text which have been splitted.
std::vector<Slice> _split_values;
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 560cc80ea3..fcb2b92b31 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -177,10 +177,12 @@ ERRORS:
where url is the url given by ErrorURL.
-23: compress_type
+24. compress_type
Specify compress type file. Only support compressed csv file now. Support gz, lzo, bz2, lz4, lzop, deflate.
+25. trim_double_quotes: Boolean type, The default value is false. True means that the outermost double quotes of each field in the csv file are trimmed.
+
### Example
1. Import the data in the local file 'testData' into the table 'testTbl' in the database 'testDb', and use Label for deduplication. Specify a timeout of 100 seconds
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
index 97473b3d1d..5dadc7f0e7 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
@@ -174,10 +174,12 @@ ERRORS:
其中 url 为 ErrorURL 给出的 url。
-23: compress_type
+24. compress_type
指定文件的压缩格式。目前只支持 csv 文件的压缩。支持 gz, lzo, bz2, lz4, lzop, deflate 压缩格式。
+25. trim_double_quotes: 布尔类型,默认值为 false,为 true 时表示裁剪掉 csv 文件每个字段最外层的双引号。
+
### Example
1. 将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表,使用Label用于去重。指定超时时间为 100 秒
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index b67574e743..72b494a89d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -145,6 +145,7 @@ public class DataDescription {
private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;
private final Expr deleteCondition;
private final Map<String, String> properties;
+ private boolean trimDoubleQuotes = false;
public DataDescription(String tableName,
PartitionNames partitionNames,
@@ -250,6 +251,7 @@ public class DataDescription {
this.readJsonByLine = taskInfo.isReadJsonByLine();
this.numAsString = taskInfo.isNumAsString();
this.properties = Maps.newHashMap();
+ this.trimDoubleQuotes = taskInfo.getTrimDoubleQuotes();
}
private void getFileFormatAndCompressType(LoadTaskInfo taskInfo) {
@@ -641,6 +643,10 @@ public class DataDescription {
return readJsonByLine;
}
+ public boolean getTrimDoubleQuotes() {
+ return trimDoubleQuotes;
+ }
+
/*
* Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList
* Example:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 497f0c2d23..887062bda1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -105,6 +105,7 @@ public class BrokerFileGroup implements Writable {
private boolean fuzzyParse = true;
private boolean readJsonByLine = false;
private boolean numAsString = false;
+ private boolean trimDoubleQuotes = false;
// for unit test and edit log persistence
private BrokerFileGroup() {
@@ -262,6 +263,7 @@ public class BrokerFileGroup implements Writable {
readJsonByLine = dataDescription.isReadJsonByLine();
numAsString = dataDescription.isNumAsString();
}
+ trimDoubleQuotes = dataDescription.getTrimDoubleQuotes();
}
public long getTableId() {
@@ -416,6 +418,10 @@ public class BrokerFileGroup implements Writable {
return fileFormat.equalsIgnoreCase("parquet") || fileFormat.equalsIgnoreCase("orc");
}
+ public boolean getTrimDoubleQuotes() {
+ return trimDoubleQuotes;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 96e8410aef..621c7a9306 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -179,6 +179,7 @@ public class StreamLoadScanNode extends LoadScanNode {
params.setLineDelimiter((byte) '\n');
params.setLineDelimiterLength(1);
}
+ params.setTrimDoubleQuotes(taskInfo.getTrimDoubleQuotes());
params.setDestTupleId(desc.getId().asInt());
brokerScanRange.setParams(params);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 8f7ea2bb6e..ae2bf5d4ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -131,6 +131,7 @@ public class LoadScanProvider implements FileScanProviderIf {
fileAttributes.setReadJsonByLine(fileGroup.isReadJsonByLine());
fileAttributes.setReadByColumnDef(true);
fileAttributes.setHeaderType(getHeaderType(fileGroup.getFileFormat()));
+ fileAttributes.setTrimDoubleQuotes(fileGroup.getTrimDoubleQuotes());
}
private String getHeaderType(String formatType) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 642597246f..e384394653 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -97,6 +97,10 @@ public interface LoadTaskInfo {
List<String> getHiddenColumns();
+ default boolean getTrimDoubleQuotes() {
+ return false;
+ }
+
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 59393b7d01..2a80ceef90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -82,6 +82,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private boolean loadToSingleTablet = false;
private String headerType = "";
private List<String> hiddenColumns;
+ private boolean trimDoubleQuotes = false;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType,
TFileCompressType compressType) {
@@ -251,6 +252,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return hiddenColumns;
}
+ @Override
+ public boolean getTrimDoubleQuotes() {
+ return trimDoubleQuotes;
+ }
+
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@@ -350,6 +356,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetHiddenColumns()) {
hiddenColumns = Arrays.asList(request.getHiddenColumns().replaceAll("\\s+", "").split(","));
}
+ if (request.isSetTrimDoubleQuotes()) {
+ trimDoubleQuotes = request.isTrimDoubleQuotes();
+ }
}
// used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 3036c1d53f..4c7d062727 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -546,6 +546,7 @@ struct TStreamLoadPutRequest {
39: optional string hidden_columns
40: optional PlanNodes.TFileCompressType compress_type
41: optional i64 file_size // only for stream load with parquet or orc
+ 42: optional bool trim_double_quotes // trim double quotes for csv
}
struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index b14610587a..a7a21a0499 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -211,6 +211,8 @@ struct TBrokerScanRangeParams {
12: optional i32 line_delimiter_length = 1;
13: optional string column_separator_str;
14: optional string line_delimiter_str;
+ // trim double quotes for csv
+ 15: optional bool trim_double_quotes;
}
@@ -255,6 +257,8 @@ struct TFileAttributes {
8: optional bool read_by_column_def;
// csv with header type
9: optional string header_type;
+ // trim double quotes for csv
+ 10: optional bool trim_double_quotes;
}
struct TIcebergDeleteFileDesc {
diff --git a/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv b/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv
new file mode 100644
index 0000000000..44ae3f4550
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/csv_with_double_quotes.csv
@@ -0,0 +1,8 @@
+1,2,3,abc,2022-12-01,2022-12-01:09:30:31
+2,3,3,abc,2022-12-01,2022-12-01:09:30:31
+3,4,3,abc,2022-12-01,2022-12-01:09:30:31
+4,5,3,abc,2022-12-01,2022-12-01:09:30:31
+"5","6","3","abc","2022-12-01","2022-12-01:09:30:31"
+"6","7","3","abc","2022-12-01","2022-12-01:09:30:31"
+"7","8","3","abc","2022-12-01","2022-12-01:09:30:31"
+"8","9","3","abc","2022-12-01","2022-12-01:09:30:31"
diff --git a/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out b/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out
new file mode 100644
index 0000000000..0ae5ebe7f7
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_csv_with_double_quotes.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql --
+\N \N \N "abc" \N \N
+\N \N \N "abc" \N \N
+\N \N \N "abc" \N \N
+\N \N \N "abc" \N \N
+1 2 3 abc 2022-12-01 2022-12-01T09:30:31
+2 3 3 abc 2022-12-01 2022-12-01T09:30:31
+3 4 3 abc 2022-12-01 2022-12-01T09:30:31
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+
+-- !sql --
+1 2 3 abc 2022-12-01 2022-12-01T09:30:31
+2 3 3 abc 2022-12-01 2022-12-01T09:30:31
+3 4 3 abc 2022-12-01 2022-12-01T09:30:31
+4 5 3 abc 2022-12-01 2022-12-01T09:30:31
+5 6 3 abc 2022-12-01 2022-12-01T09:30:31
+6 7 3 abc 2022-12-01 2022-12-01T09:30:31
+7 8 3 abc 2022-12-01 2022-12-01T09:30:31
+8 9 3 abc 2022-12-01 2022-12-01T09:30:31
+
diff --git a/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy b/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy
new file mode 100644
index 0000000000..429e8c88fd
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_csv_with_double_quotes.groovy
@@ -0,0 +1,64 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_csv_with_double_quotes", "p0") {
+ def tableName = "test_csv_with_double_quotes"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) NULL,
+ `v2` string NULL,
+ `v3` date NULL,
+ `v4` datetime NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+
+ file 'csv_with_double_quotes.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ sql """truncate table ${tableName}"""
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'trim_double_quotes', 'true'
+
+ file 'csv_with_double_quotes.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org