You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/04 10:25:16 UTC

[doris] branch master updated: [fix](csv-reader) fix new csv reader's performance issue (#15581)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4075e3aec6 [fix](csv-reader) fix new csv reader's performance issue (#15581)
4075e3aec6 is described below

commit 4075e3aec6e1041bc855cd30cc85b7ff584c4a28
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Wed Jan 4 18:25:08 2023 +0800

    [fix](csv-reader) fix new csv reader's performance issue (#15581)
---
 be/src/vec/exec/format/csv/csv_reader.cpp | 60 +++++++++++++++++++++++--------
 be/src/vec/exec/format/csv/csv_reader.h   |  8 ++++-
 be/src/vec/exec/scan/vfile_scanner.h      |  1 +
 3 files changed, 53 insertions(+), 16 deletions(-)

diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 5e3a975341..38fd30ba74 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -167,9 +167,23 @@ Status CsvReader::init_reader(bool is_load) {
 
     _is_load = is_load;
     if (!_is_load) {
-        // For query task, we need to save the mapping from table schema to file column
+        // For query task, there are 2 slot mapping.
+        // One is from file slot to values in line.
+        //      eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
+        //      the _col_idxs will save: 0, 2, 4
+        // The other is from file slot to columns in output block
+        //      eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
+        //      where "p1" is the partition col which does not exist in file
+        //      the _file_slot_idx_map will save: 1, 2, 3
         DCHECK(_params.__isset.column_idxs);
         _col_idxs = _params.column_idxs;
+        int idx = 0;
+        for (const auto& slot_info : _params.required_slots) {
+            if (slot_info.is_file_slot) {
+                _file_slot_idx_map.push_back(idx);
+            }
+            idx++;
+        }
     } else {
         // For load task, the column order is same as file column order
         int i = 0;
@@ -190,6 +204,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
 
     const int batch_size = _state->batch_size();
     size_t rows = 0;
+    auto columns = block->mutate_columns();
     while (rows < batch_size && !_line_reader_eof) {
         const uint8_t* ptr = nullptr;
         size_t size = 0;
@@ -203,7 +218,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
             continue;
         }
 
-        RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, &rows));
+        RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
     }
 
     *eof = (rows == 0);
@@ -303,7 +318,8 @@ Status CsvReader::_create_decompressor() {
     return Status::OK();
 }
 
-Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* rows) {
+Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
+                                     std::vector<MutableColumnPtr>& columns, size_t* rows) {
     bool is_success = false;
 
     RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
@@ -312,18 +328,32 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* ro
         return Status::OK();
     }
 
-    // if _split_values.size > _file_slot_descs.size()
-    // we only take the first few columns
-    for (int i = 0; i < _file_slot_descs.size(); ++i) {
-        auto src_slot_desc = _file_slot_descs[i];
-        int col_idx = _col_idxs[i];
-        // col idx is out of range, fill with null.
-        const Slice& value =
-                col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
-        IColumn* col_ptr =
-                const_cast<IColumn*>(block->get_by_name(src_slot_desc->col_name()).column.get());
-        _text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
-                                          false);
+    if (_is_load) {
+        for (int i = 0; i < _file_slot_descs.size(); ++i) {
+            auto src_slot_desc = _file_slot_descs[i];
+            int col_idx = _col_idxs[i];
+            // col idx is out of range, fill with null.
+            const Slice& value =
+                    col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
+            // For load task, we always read "string" from file, so use "write_string_column"
+            _text_converter->write_string_column(src_slot_desc, &columns[i], value.data,
+                                                 value.size);
+        }
+    } else {
+        // if _split_values.size > _file_slot_descs.size()
+        // we only take the first few columns
+        for (int i = 0; i < _file_slot_descs.size(); ++i) {
+            auto src_slot_desc = _file_slot_descs[i];
+            int col_idx = _col_idxs[i];
+            // col idx is out of range, fill with null.
+            const Slice& value =
+                    col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
+            IColumn* col_ptr = const_cast<IColumn*>(
+                    block->get_by_position(_file_slot_idx_map[i]).column.get());
+            // For query task, we will convert values to final column type, so use "write_vec_column"
+            _text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
+                                              false);
+        }
     }
     ++(*rows);
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h
index 2b1c35193a..c237958d07 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -56,7 +56,8 @@ public:
 private:
     // used for stream/broker load of csv file.
     Status _create_decompressor();
-    Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows);
+    Status _fill_dest_columns(const Slice& line, Block* block,
+                              std::vector<MutableColumnPtr>& columns, size_t* rows);
     Status _line_split_to_values(const Slice& line, bool* success);
     void _split_line(const Slice& line);
     Status _check_array_format(std::vector<Slice>& split_values, bool* is_success);
@@ -77,6 +78,11 @@ private:
     const TFileScanRangeParams& _params;
     const TFileRangeDesc& _range;
     const std::vector<SlotDescriptor*>& _file_slot_descs;
+    // Only for query task, save the file slot to columns in block map.
+    // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
+    // and this 3 columns in block are k2, k3, k1,
+    // the _file_slot_idx_map will save: 2, 0, 1
+    std::vector<int> _file_slot_idx_map;
     // Only for query task, save the columns' index which need to be read.
     // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
     // and the corresponding position in file is 0, 3, 5.
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index cfe26d9753..6ac802f60c 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -68,6 +68,7 @@ protected:
     std::map<std::string, int> _file_slot_name_map;
     // col names from _file_slot_descs
     std::vector<std::string> _file_col_names;
+
     // Partition source slot descriptors
     std::vector<SlotDescriptor*> _partition_slot_descs;
     // Partition slot id to index in _partition_slot_descs


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org