You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/04 10:25:16 UTC
[doris] branch master updated: [fix](csv-reader) fix new csv reader's performance issue (#15581)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4075e3aec6 [fix](csv-reader) fix new csv reader's performance issue (#15581)
4075e3aec6 is described below
commit 4075e3aec6e1041bc855cd30cc85b7ff584c4a28
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Wed Jan 4 18:25:08 2023 +0800
[fix](csv-reader) fix new csv reader's performance issue (#15581)
---
be/src/vec/exec/format/csv/csv_reader.cpp | 60 +++++++++++++++++++++++--------
be/src/vec/exec/format/csv/csv_reader.h | 8 ++++-
be/src/vec/exec/scan/vfile_scanner.h | 1 +
3 files changed, 53 insertions(+), 16 deletions(-)
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 5e3a975341..38fd30ba74 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -167,9 +167,23 @@ Status CsvReader::init_reader(bool is_load) {
_is_load = is_load;
if (!_is_load) {
- // For query task, we need to save the mapping from table schema to file column
+ // For query task, there are 2 slot mapping.
+ // One is from file slot to values in line.
+ // eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
+ // the _col_idxs will save: 0, 2, 4
+ // The other is from file slot to columns in output block
+ // eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
+ // where "p1" is the partition col which does not exist in file
+ // the _file_slot_idx_map will save: 1, 2, 3
DCHECK(_params.__isset.column_idxs);
_col_idxs = _params.column_idxs;
+ int idx = 0;
+ for (const auto& slot_info : _params.required_slots) {
+ if (slot_info.is_file_slot) {
+ _file_slot_idx_map.push_back(idx);
+ }
+ idx++;
+ }
} else {
// For load task, the column order is same as file column order
int i = 0;
@@ -190,6 +204,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
const int batch_size = _state->batch_size();
size_t rows = 0;
+ auto columns = block->mutate_columns();
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
@@ -203,7 +218,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
continue;
}
- RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, &rows));
+ RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
}
*eof = (rows == 0);
@@ -303,7 +318,8 @@ Status CsvReader::_create_decompressor() {
return Status::OK();
}
-Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* rows) {
+Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
+ std::vector<MutableColumnPtr>& columns, size_t* rows) {
bool is_success = false;
RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
@@ -312,18 +328,32 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* ro
return Status::OK();
}
- // if _split_values.size > _file_slot_descs.size()
- // we only take the first few columns
- for (int i = 0; i < _file_slot_descs.size(); ++i) {
- auto src_slot_desc = _file_slot_descs[i];
- int col_idx = _col_idxs[i];
- // col idx is out of range, fill with null.
- const Slice& value =
- col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
- IColumn* col_ptr =
- const_cast<IColumn*>(block->get_by_name(src_slot_desc->col_name()).column.get());
- _text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
- false);
+ if (_is_load) {
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ auto src_slot_desc = _file_slot_descs[i];
+ int col_idx = _col_idxs[i];
+ // col idx is out of range, fill with null.
+ const Slice& value =
+ col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
+ // For load task, we always read "string" from file, so use "write_string_column"
+ _text_converter->write_string_column(src_slot_desc, &columns[i], value.data,
+ value.size);
+ }
+ } else {
+ // if _split_values.size > _file_slot_descs.size()
+ // we only take the first few columns
+ for (int i = 0; i < _file_slot_descs.size(); ++i) {
+ auto src_slot_desc = _file_slot_descs[i];
+ int col_idx = _col_idxs[i];
+ // col idx is out of range, fill with null.
+ const Slice& value =
+ col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
+ IColumn* col_ptr = const_cast<IColumn*>(
+ block->get_by_position(_file_slot_idx_map[i]).column.get());
+ // For query task, we will convert values to final column type, so use "write_vec_column"
+ _text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
+ false);
+ }
}
++(*rows);
diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h
index 2b1c35193a..c237958d07 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -56,7 +56,8 @@ public:
private:
// used for stream/broker load of csv file.
Status _create_decompressor();
- Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows);
+ Status _fill_dest_columns(const Slice& line, Block* block,
+ std::vector<MutableColumnPtr>& columns, size_t* rows);
Status _line_split_to_values(const Slice& line, bool* success);
void _split_line(const Slice& line);
Status _check_array_format(std::vector<Slice>& split_values, bool* is_success);
@@ -77,6 +78,11 @@ private:
const TFileScanRangeParams& _params;
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
+ // Only for query task, save the file slot to columns in block map.
+ // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
+ // and this 3 columns in block are k2, k3, k1,
+ // the _file_slot_idx_map will save: 2, 0, 1
+ std::vector<int> _file_slot_idx_map;
// Only for query task, save the columns' index which need to be read.
// eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
// and the corresponding position in file is 0, 3, 5.
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index cfe26d9753..6ac802f60c 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -68,6 +68,7 @@ protected:
std::map<std::string, int> _file_slot_name_map;
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;
+
// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
// Partition slot id to index in _partition_slot_descs
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org