You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/29 10:49:05 UTC
[doris] branch master updated: [feature-wip](multi-catalog)(fix) partition value error when a block contains multiple splits (#11260)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 84ce2a1e98 [feature-wip](multi-catalog)(fix) partition value error when a block contains multiple splits (#11260)
84ce2a1e98 is described below
commit 84ce2a1e983a118feae614166d1d1d905deeb4bb
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Fri Jul 29 18:48:59 2022 +0800
[feature-wip](multi-catalog)(fix) partition value error when a block contains multiple splits (#11260)
`FileArrowScanner::get_next` returns a block when full, so it maybe contains multiple
splits in small files or crosses two splits in large files.
However, a block can only fill the partition values from one file. Different splits may be
from different files, causing the error of embed partition values.
---
be/src/vec/exec/file_arrow_scanner.cpp | 2 +-
be/src/vec/exec/file_scanner.cpp | 5 +----
be/src/vec/exec/file_scanner.h | 2 +-
be/src/vec/exec/file_text_scanner.cpp | 5 +++++
4 files changed, 8 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 30511b2392..e6c4fa7597 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -194,7 +194,7 @@ Status FileArrowScanner::_append_batch_to_block(Block* block) {
}
_rows += num_elements;
_arrow_batch_cur_idx += num_elements;
- return Status::OK();
+ return _fill_columns_from_path(block, num_elements);
}
void VFileParquetScanner::_update_profile(std::shared_ptr<Statistics>& statistics) {
diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp
index a0f473ffc9..bb1ba21924 100644
--- a/be/src/vec/exec/file_scanner.cpp
+++ b/be/src/vec/exec/file_scanner.cpp
@@ -164,7 +164,6 @@ Status FileScanner::_filter_block(vectorized::Block* _block) {
Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) {
*eof = _scanner_eof;
_read_row_counter += _block->rows();
- RETURN_IF_ERROR(_fill_columns_from_path(_block));
if (LIKELY(_rows > 0)) {
RETURN_IF_ERROR(_filter_block(_block));
}
@@ -172,11 +171,9 @@ Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) {
return Status::OK();
}
-Status FileScanner::_fill_columns_from_path(vectorized::Block* _block) {
+Status FileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) {
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
- size_t rows = _rows;
-
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc == nullptr) continue;
auto it = _partition_slot_index_map.find(slot_desc->id());
diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h
index 16e75aefc0..df4c1d4ef6 100644
--- a/be/src/vec/exec/file_scanner.h
+++ b/be/src/vec/exec/file_scanner.h
@@ -55,6 +55,7 @@ protected:
virtual void _init_profiles(RuntimeProfile* profile) = 0;
Status finalize_block(vectorized::Block* dest_block, bool* eof);
+ Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows);
Status init_block(vectorized::Block* block);
std::unique_ptr<TextConverter> _text_converter;
@@ -106,7 +107,6 @@ protected:
private:
Status _init_expr_ctxes();
Status _filter_block(vectorized::Block* output_block);
- Status _fill_columns_from_path(vectorized::Block* output_block);
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp
index 593b78867f..02da0bca2c 100644
--- a/be/src/vec/exec/file_text_scanner.cpp
+++ b/be/src/vec/exec/file_text_scanner.cpp
@@ -91,6 +91,7 @@ Status FileTextScanner::get_next(Block* block, bool* eof) {
const int batch_size = _state->batch_size();
+ int current_rows = _rows;
while (_rows < batch_size && !_scanner_eof) {
if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
RETURN_IF_ERROR(_open_next_reader());
@@ -114,6 +115,10 @@ Status FileTextScanner::get_next(Block* block, bool* eof) {
COUNTER_UPDATE(_rows_read_counter, 1);
RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
}
+ if (_cur_line_reader_eof) {
+ RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows));
+ current_rows = _rows;
+ }
}
return finalize_block(block, eof);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org