You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/19 06:25:38 UTC
[doris] branch master updated: [refactor](new-scan) remove old file scan node (#13433)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5423de68dd [refactor](new-scan) remove old file scan node (#13433)
5423de68dd is described below
commit 5423de68dd9c7bf480823ea0a0faca84c8fd3f82
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Oct 19 14:25:32 2022 +0800
[refactor](new-scan) remove old file scan node (#13433)
All these files are not used anymore, can be removed.
---
be/src/common/config.h | 6 -
be/src/exec/exec_node.cpp | 7 +-
be/src/vec/CMakeLists.txt | 7 -
be/src/vec/exec/file_arrow_scanner.cpp | 259 -----------
be/src/vec/exec/file_arrow_scanner.h | 118 -----
be/src/vec/exec/file_scan_node.cpp | 508 ---------------------
be/src/vec/exec/file_scan_node.h | 148 ------
be/src/vec/exec/file_scanner.cpp | 199 --------
be/src/vec/exec/file_scanner.h | 112 -----
be/src/vec/exec/file_text_scanner.cpp | 294 ------------
be/src/vec/exec/file_text_scanner.h | 72 ---
be/src/vec/exec/scan/new_file_arrow_scanner.cpp | 265 -----------
be/src/vec/exec/scan/new_file_arrow_scanner.h | 89 ----
be/src/vec/exec/scan/new_file_scan_node.cpp | 27 +-
be/src/vec/exec/scan/new_file_scanner.cpp | 317 -------------
be/src/vec/exec/scan/new_file_scanner.h | 100 ----
be/src/vec/exec/scan/new_file_text_scanner.cpp | 263 -----------
be/src/vec/exec/scan/new_file_text_scanner.h | 66 ---
.../apache/doris/planner/StreamLoadPlanner.java | 2 +-
.../load_p0/broker_load/test_broker_load.groovy | 28 +-
20 files changed, 18 insertions(+), 2869 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 046ad3b234..ecbbd7b64d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -864,12 +864,6 @@ CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
// so as to avoid occupying the execution thread for a long time.
CONF_mInt32(max_fragment_start_wait_time_seconds, "30");
-// Temp config. True to use new file scan node to do load job. Will remove after fully test.
-CONF_mBool(enable_new_load_scan_node, "false");
-
-// Temp config. True to use new file scanner. Will remove after fully test.
-CONF_mBool(enable_new_file_scanner, "false");
-
// Hide webserver page for safety.
// Hide the be config page for webserver.
CONF_Bool(hide_webserver_config_page, "false");
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 39e917ec6b..f04f832328 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -59,7 +59,6 @@
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
-#include "vec/exec/file_scan_node.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/scan/new_es_scan_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
@@ -613,13 +612,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();
case TPlanNodeType::FILE_SCAN_NODE:
- // *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs));
- if (config::enable_new_scan_node) {
+ if (state->enable_vectorized_exec()) {
*node = pool->add(new vectorized::NewFileScanNode(pool, tnode, descs));
} else {
- *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs));
+ return Status::InternalError("Not support file scan node in non-vec engine");
}
-
return Status::OK();
case TPlanNodeType::REPEAT_NODE:
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 58a9701c90..d5739e25b4 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -230,10 +230,6 @@ set(VEC_FILES
runtime/vorc_writer.cpp
utils/arrow_column_to_doris_column.cpp
runtime/vsorted_run_merger.cpp
- exec/file_arrow_scanner.cpp
- exec/file_scanner.cpp
- exec/file_scan_node.cpp
- exec/file_text_scanner.cpp
exec/format/parquet/vparquet_column_chunk_reader.cpp
exec/format/parquet/vparquet_group_reader.cpp
exec/format/parquet/vparquet_page_index.cpp
@@ -250,10 +246,7 @@ set(VEC_FILES
exec/scan/scanner_scheduler.cpp
exec/scan/new_olap_scan_node.cpp
exec/scan/new_olap_scanner.cpp
- exec/scan/new_file_arrow_scanner.cpp
exec/scan/new_file_scan_node.cpp
- exec/scan/new_file_scanner.cpp
- exec/scan/new_file_text_scanner.cpp
exec/scan/vfile_scanner.cpp
exec/scan/new_odbc_scanner.cpp
exec/scan/new_odbc_scan_node.cpp
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
deleted file mode 100644
index 3a5d6c2f5b..0000000000
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ /dev/null
@@ -1,259 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/file_arrow_scanner.h"
-
-#include <memory>
-
-#include "exec/arrow/parquet_reader.h"
-#include "io/buffered_reader.h"
-#include "io/file_factory.h"
-#include "io/hdfs_reader_writer.h"
-#include "runtime/descriptors.h"
-#include "vec/utils/arrow_column_to_doris_column.h"
-
-namespace doris::vectorized {
-
-FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs,
- ScannerCounter* counter)
- : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
- _cur_file_reader(nullptr),
- _cur_file_eof(false),
- _batch(nullptr),
- _arrow_batch_cur_idx(0) {
- _convert_arrow_block_timer = ADD_TIMER(_profile, "ConvertArrowBlockTimer");
-}
-
-FileArrowScanner::~FileArrowScanner() {
- FileArrowScanner::close();
-}
-
-Status FileArrowScanner::_open_next_reader() {
- // open_file_reader
- if (_cur_file_reader != nullptr) {
- delete _cur_file_reader;
- _cur_file_reader = nullptr;
- }
-
- while (true) {
- if (_next_range >= _ranges.size()) {
- _scanner_eof = true;
- return Status::OK();
- }
- const TFileRangeDesc& range = _ranges[_next_range++];
- std::unique_ptr<FileReader> file_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
- range.start_offset, range.file_size, 0,
- file_reader));
- RETURN_IF_ERROR(file_reader->open());
- if (file_reader->size() == 0) {
- file_reader->close();
- continue;
- }
-
- int32_t num_of_columns_from_file = _file_slot_descs.size();
-
- _cur_file_reader =
- _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file,
- range.start_offset, range.size);
-
- auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
- Status status =
- _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone());
- if (status.is_end_of_file()) {
- continue;
- } else {
- if (!status.ok()) {
- std::stringstream ss;
- ss << " file: " << range.path << " error:" << status.get_error_msg();
- return Status::InternalError(ss.str());
- } else {
- _update_profile(_cur_file_reader->statistics());
- return status;
- }
- }
- }
-}
-
-Status FileArrowScanner::open() {
- RETURN_IF_ERROR(FileScanner::open());
- if (_ranges.empty()) {
- return Status::OK();
- }
- return Status::OK();
-}
-
-// get next available arrow batch
-Status FileArrowScanner::_next_arrow_batch() {
- _arrow_batch_cur_idx = 0;
- // first, init file reader
- if (_cur_file_reader == nullptr || _cur_file_eof) {
- RETURN_IF_ERROR(_open_next_reader());
- _cur_file_eof = false;
- }
- // second, loop until find available arrow batch or EOF
- while (!_scanner_eof) {
- RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
- if (_cur_file_eof) {
- RETURN_IF_ERROR(_open_next_reader());
- _cur_file_eof = false;
- continue;
- }
- if (_batch->num_rows() == 0) {
- continue;
- }
- return Status::OK();
- }
- return Status::EndOfFile("EOF");
-}
-
-Status FileArrowScanner::_init_arrow_batch_if_necessary() {
- // 1. init batch if first time
- // 2. reset reader if end of file
- Status status = Status::OK();
- if (_scanner_eof) {
- return Status::EndOfFile("EOF");
- }
- if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
- return _next_arrow_batch();
- }
- return status;
-}
-
-Status FileArrowScanner::get_next(vectorized::Block* block, bool* eof) {
- SCOPED_TIMER(_read_timer);
- // init arrow batch
- {
- Status st = _init_arrow_batch_if_necessary();
- if (!st.ok()) {
- if (!st.is_end_of_file()) {
- return st;
- }
- *eof = true;
- return Status::OK();
- }
- }
-
- RETURN_IF_ERROR(init_block(block));
- // convert arrow batch to block until reach the batch_size
- while (!_scanner_eof) {
- // cast arrow type to PT0 and append it to block
- // for example: arrow::Type::INT16 => TYPE_SMALLINT
- RETURN_IF_ERROR(_append_batch_to_block(block));
- // finalize the block if full
- if (_rows >= _state->batch_size()) {
- break;
- }
- auto status = _next_arrow_batch();
- // if ok, append the batch to the columns
- if (status.ok()) {
- continue;
- }
- // return error if not EOF
- if (!status.is_end_of_file()) {
- return status;
- }
- _cur_file_eof = true;
- break;
- }
-
- return finalize_block(block, eof);
-}
-
-Status FileArrowScanner::_append_batch_to_block(Block* block) {
- SCOPED_TIMER(_convert_arrow_block_timer);
- size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
- (_batch->num_rows() - _arrow_batch_cur_idx));
- for (auto i = 0; i < _file_slot_descs.size(); ++i) {
- SlotDescriptor* slot_desc = _file_slot_descs[i];
- if (slot_desc == nullptr) {
- continue;
- }
- std::string real_column_name = _cur_file_reader->is_case_sensitive()
- ? slot_desc->col_name()
- : slot_desc->col_name_lower_case();
- auto* array = _batch->GetColumnByName(real_column_name).get();
- auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
- RETURN_IF_ERROR(arrow_column_to_doris_column(
- array, _arrow_batch_cur_idx, column_with_type_and_name.column,
- column_with_type_and_name.type, num_elements, _state->timezone_obj()));
- }
- _rows += num_elements;
- _arrow_batch_cur_idx += num_elements;
- return _fill_columns_from_path(block, num_elements);
-}
-
-void VFileParquetScanner::_update_profile(std::shared_ptr<Statistics>& statistics) {
- COUNTER_UPDATE(_filtered_row_groups_counter, statistics->filtered_row_groups);
- COUNTER_UPDATE(_filtered_rows_counter, statistics->filtered_rows);
- COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes);
- COUNTER_UPDATE(_total_rows_counter, statistics->total_rows);
- COUNTER_UPDATE(_total_groups_counter, statistics->total_groups);
- COUNTER_UPDATE(_total_bytes_counter, statistics->total_bytes);
-}
-
-void FileArrowScanner::close() {
- FileScanner::close();
- if (_cur_file_reader != nullptr) {
- delete _cur_file_reader;
- _cur_file_reader = nullptr;
- }
-}
-
-VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs,
- ScannerCounter* counter)
- : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {
- _init_profiles(profile);
-}
-
-ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(
- const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
- int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
- return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
- range_start_offset, range_size, false);
-}
-
-void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
- _filtered_row_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsFiltered", TUnit::UNIT);
- _filtered_rows_counter = ADD_COUNTER(_profile, "ParquetRowsFiltered", TUnit::UNIT);
- _filtered_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesFiltered", TUnit::BYTES);
- _total_rows_counter = ADD_COUNTER(_profile, "ParquetRowsTotal", TUnit::UNIT);
- _total_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsTotal", TUnit::UNIT);
- _total_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesTotal", TUnit::BYTES);
-}
-
-VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs,
- ScannerCounter* counter)
- : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}
-
-ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(
- const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
- int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
- return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
- range_start_offset, range_size, false);
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h
deleted file mode 100644
index 113bd54d6e..0000000000
--- a/be/src/vec/exec/file_arrow_scanner.h
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <arrow/array.h>
-#include <exec/arrow/arrow_reader.h>
-#include <exec/arrow/orc_reader.h>
-
-#include <map>
-#include <memory>
-#include <sstream>
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-#include "common/status.h"
-#include "exec/base_scanner.h"
-#include "util/runtime_profile.h"
-#include "vec/exec/file_scanner.h"
-
-namespace doris::vectorized {
-
-// VArrow scanner convert the data read from orc|parquet to doris's columns.
-class FileArrowScanner : public FileScanner {
-public:
- FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
- ~FileArrowScanner() override;
-
- // Open this scanner, will initialize information need to
- Status open() override;
-
- Status get_next(Block* block, bool* eof) override;
-
- void close() override;
-
-protected:
- virtual ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
- FileReader* file_reader,
- int32_t num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size) = 0;
- virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {}
-
-private:
- // Read next buffer from reader
- Status _open_next_reader();
- Status _next_arrow_batch();
- Status _init_arrow_batch_if_necessary();
- Status _append_batch_to_block(Block* block);
-
-private:
- // Reader
- ArrowReaderWrap* _cur_file_reader;
- bool _cur_file_eof; // is read over?
- std::shared_ptr<arrow::RecordBatch> _batch;
- size_t _arrow_batch_cur_idx;
- RuntimeProfile::Counter* _convert_arrow_block_timer;
-};
-
-class VFileParquetScanner final : public FileArrowScanner {
-public:
- VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
- ~VFileParquetScanner() override = default;
-
-protected:
- ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
- FileReader* file_reader, int32_t num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size) override;
-
- void _init_profiles(RuntimeProfile* profile) override;
- void _update_profile(std::shared_ptr<Statistics>& statistics) override;
-
-private:
- RuntimeProfile::Counter* _filtered_row_groups_counter;
- RuntimeProfile::Counter* _filtered_rows_counter;
- RuntimeProfile::Counter* _filtered_bytes_counter;
- RuntimeProfile::Counter* _total_rows_counter;
- RuntimeProfile::Counter* _total_groups_counter;
- RuntimeProfile::Counter* _total_bytes_counter;
-};
-
-class VFileORCScanner final : public FileArrowScanner {
-public:
- VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
- ~VFileORCScanner() override = default;
-
-protected:
- ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
- FileReader* file_reader, int32_t num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size) override;
- void _init_profiles(RuntimeProfile* profile) override {};
-};
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
deleted file mode 100644
index 3a3f9634e9..0000000000
--- a/be/src/vec/exec/file_scan_node.cpp
+++ /dev/null
@@ -1,508 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/file_scan_node.h"
-
-#include "common/config.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/runtime_filter_mgr.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple.h"
-#include "runtime/tuple_row.h"
-#include "util/priority_thread_pool.hpp"
-#include "util/runtime_profile.h"
-#include "util/thread.h"
-#include "util/types.h"
-#include "vec/exec/file_arrow_scanner.h"
-#include "vec/exec/file_text_scanner.h"
-#include "vec/exprs/vcompound_pred.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-
-namespace doris::vectorized {
-
-FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : ScanNode(pool, tnode, descs),
- _tuple_id(tnode.file_scan_node.tuple_id),
- _runtime_state(nullptr),
- _tuple_desc(nullptr),
- _num_running_scanners(0),
- _scan_finished(false),
- _max_buffered_batches(32),
- _wait_scanner_timer(nullptr),
- _runtime_filter_descs(tnode.runtime_filters) {}
-
-Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(ScanNode::init(tnode, state));
-
- int filter_size = _runtime_filter_descs.size();
- _runtime_filter_ctxs.resize(filter_size);
- _runtime_filter_ready_flag.resize(filter_size);
- for (int i = 0; i < filter_size; ++i) {
- IRuntimeFilter* runtime_filter = nullptr;
- const auto& filter_desc = _runtime_filter_descs[i];
- RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
- RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id()));
- RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
- &runtime_filter));
-
- _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
- _runtime_filter_ready_flag[i] = false;
- _rf_locks.push_back(std::make_unique<std::mutex>());
- }
-
- return Status::OK();
-}
-
-Status FileScanNode::prepare(RuntimeState* state) {
- VLOG_QUERY << "FileScanNode prepare";
- RETURN_IF_ERROR(ScanNode::prepare(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- // get tuple desc
- _runtime_state = state;
- _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
- if (_tuple_desc == nullptr) {
- std::stringstream ss;
- ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id;
- return Status::InternalError(ss.str());
- }
-
- // Initialize slots map
- for (auto slot : _tuple_desc->slots()) {
- auto pair = _slots_map.emplace(slot->col_name(), slot);
- if (!pair.second) {
- std::stringstream ss;
- ss << "Failed to insert slot, col_name=" << slot->col_name();
- return Status::InternalError(ss.str());
- }
- }
-
- // Profile
- _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime");
- _filter_timer = ADD_TIMER(runtime_profile(), "PredicateFilteredTime");
- _num_rows_filtered = ADD_COUNTER(runtime_profile(), "PredicateFilteredRows", TUnit::UNIT);
- _num_scanners = ADD_COUNTER(runtime_profile(), "NumScanners", TUnit::UNIT);
-
- return Status::OK();
-}
-
-Status FileScanNode::open(RuntimeState* state) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- RETURN_IF_CANCELLED(state);
-
- RETURN_IF_ERROR(_acquire_and_build_runtime_filter(state));
-
- RETURN_IF_ERROR(start_scanners());
-
- return Status::OK();
-}
-
-Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
- // acquire runtime filter
- _runtime_filter_ctxs.resize(_runtime_filter_descs.size());
- for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
- auto& filter_desc = _runtime_filter_descs[i];
- IRuntimeFilter* runtime_filter = nullptr;
- state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
- DCHECK(runtime_filter != nullptr);
- if (runtime_filter == nullptr) {
- continue;
- }
- bool ready = runtime_filter->is_ready();
- if (!ready) {
- ready = runtime_filter->await();
- }
- if (ready) {
- _runtime_filter_ctxs[i].apply_mark = true;
- _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
-
- // TODO: currently, after calling get_push_expr_ctxs(), the func ptr in runtime_filter
- // will be released, and it will not be used again for building vexpr.
- //
- // std::list<ExprContext*> expr_context;
- // RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
- // for (auto ctx : expr_context) {
- // ctx->prepare(state, row_desc());
- // ctx->open(state);
- // int index = _conjunct_ctxs.size();
- // _conjunct_ctxs.push_back(ctx);
- // // it's safe to store address from a fix-resized vector
- // _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i];
- // }
- }
- }
-
- // rebuild vexpr
- for (int i = 0; i < _runtime_filter_ctxs.size(); ++i) {
- if (!_runtime_filter_ctxs[i].apply_mark) {
- continue;
- }
- IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
- std::vector<VExpr*> vexprs;
- runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor);
- if (vexprs.empty()) {
- continue;
- }
- auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
- for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
- TExprNode texpr_node;
- texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
- texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
- texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
- VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
- new_node->add_child(last_expr);
- new_node->add_child(vexprs[j]);
- last_expr = new_node;
- }
- auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
- auto expr_status = new_vconjunct_ctx_ptr->prepare(state, _row_descriptor);
- if (UNLIKELY(!expr_status.OK())) {
- LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
- vexprs.clear();
- break;
- }
-
- expr_status = new_vconjunct_ctx_ptr->open(state);
- if (UNLIKELY(!expr_status.OK())) {
- LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
- vexprs.clear();
- break;
- }
- if (_vconjunct_ctx_ptr) {
- _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
- }
- _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
- *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
- _runtime_filter_ready_flag[i] = true;
- }
- return Status::OK();
-}
-
-Status FileScanNode::start_scanners() {
- {
- std::unique_lock<std::mutex> l(_batch_queue_lock);
- _num_running_scanners = _scan_ranges.size();
- }
-
- _scanners_status.resize(_scan_ranges.size());
- COUNTER_UPDATE(_num_scanners, _scan_ranges.size());
- ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
- PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
- for (int i = 0; i < _scan_ranges.size(); ++i) {
- Status submit_status = Status::OK();
- if (thread_token != nullptr) {
- submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
- i, _scan_ranges.size(),
- std::ref(_scanners_status[i])));
- } else {
- PriorityThreadPool::WorkFunction task =
- std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
- std::ref(_scanners_status[i]));
- if (!thread_pool->offer(task)) {
- submit_status = Status::Cancelled("Failed to submit scan task");
- }
- }
- if (!submit_status.ok()) {
- LOG(WARNING) << "Failed to assign file scanner task to thread pool! "
- << submit_status.get_error_msg();
- _scanners_status[i].set_value(submit_status);
- for (int j = i + 1; j < _scan_ranges.size(); ++j) {
- _scanners_status[j].set_value(Status::Cancelled("Cancelled"));
- }
- {
- std::lock_guard<std::mutex> l(_batch_queue_lock);
- update_status(submit_status);
- _num_running_scanners -= _scan_ranges.size() - i;
- }
- _queue_writer_cond.notify_all();
- break;
- }
- }
- return Status::OK();
-}
-
-Status FileScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- // check if CANCELLED.
- if (state->is_cancelled()) {
- std::unique_lock<std::mutex> l(_batch_queue_lock);
- if (update_status(Status::Cancelled("Cancelled"))) {
- // Notify all scanners
- _queue_writer_cond.notify_all();
- }
- }
-
- if (_scan_finished.load()) {
- *eos = true;
- return Status::OK();
- }
-
- const int batch_size = _runtime_state->batch_size();
- while (true) {
- std::shared_ptr<vectorized::Block> scanner_block;
- {
- std::unique_lock<std::mutex> l(_batch_queue_lock);
- while (_process_status.ok() && !_runtime_state->is_cancelled() &&
- _num_running_scanners > 0 && _block_queue.empty()) {
- SCOPED_TIMER(_wait_scanner_timer);
- _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
- }
- if (!_process_status.ok()) {
- // Some scanner process failed.
- return _process_status;
- }
- if (_runtime_state->is_cancelled()) {
- if (update_status(Status::Cancelled("Cancelled"))) {
- _queue_writer_cond.notify_all();
- }
- return _process_status;
- }
- if (!_block_queue.empty()) {
- scanner_block = _block_queue.front();
- _block_queue.pop_front();
- }
- }
-
- // All scanner has been finished, and all cached batch has been read
- if (!scanner_block) {
- if (_mutable_block && !_mutable_block->empty()) {
- *block = _mutable_block->to_block();
- reached_limit(block, eos);
- LOG_IF(INFO, *eos) << "FileScanNode ReachedLimit.";
- }
- _scan_finished.store(true);
- *eos = true;
- return Status::OK();
- }
- // notify one scanner
- _queue_writer_cond.notify_one();
-
- if (UNLIKELY(!_mutable_block)) {
- _mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
- }
-
- if (_mutable_block->rows() + scanner_block->rows() < batch_size) {
- // merge scanner_block into _mutable_block
- _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
- continue;
- } else {
- if (_mutable_block->empty()) {
- // directly use scanner_block
- *block = std::move(*scanner_block);
- } else {
- // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next.
- *block = _mutable_block->to_block();
- _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns());
- _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
- }
- break;
- }
- }
-
- reached_limit(block, eos);
- if (*eos) {
- _scan_finished.store(true);
- _queue_writer_cond.notify_all();
- LOG(INFO) << "FileScanNode ReachedLimit.";
- } else {
- *eos = false;
- }
-
- return Status::OK();
-}
-
-Status FileScanNode::close(RuntimeState* state) {
- if (is_closed()) {
- return Status::OK();
- }
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- _scan_finished.store(true);
- _queue_writer_cond.notify_all();
- _queue_reader_cond.notify_all();
- {
- std::unique_lock<std::mutex> l(_batch_queue_lock);
- _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });
- }
- for (int i = 0; i < _scanners_status.size(); i++) {
- std::future<Status> f = _scanners_status[i].get_future();
- RETURN_IF_ERROR(f.get());
- }
- // Close
- _batch_queue.clear();
-
- for (auto& filter_desc : _runtime_filter_descs) {
- IRuntimeFilter* runtime_filter = nullptr;
- state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
- DCHECK(runtime_filter != nullptr);
- runtime_filter->consumer_close();
- }
-
- for (auto& ctx : _stale_vexpr_ctxs) {
- (*ctx)->close(state);
- }
-
- return ExecNode::close(state);
-}
-
-Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter) {
- //create scanner object and open
- std::unique_ptr<FileScanner> scanner = create_scanner(scan_range, counter);
- RETURN_IF_ERROR(scanner->open());
- bool scanner_eof = false;
- while (!scanner_eof) {
- RETURN_IF_CANCELLED(_runtime_state);
- // If we have finished all works
- if (_scan_finished.load() || !_process_status.ok()) {
- return Status::OK();
- }
-
- std::shared_ptr<vectorized::Block> block(new vectorized::Block());
- RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
- if (block->rows() == 0) {
- continue;
- }
- auto old_rows = block->rows();
- {
- SCOPED_TIMER(_filter_timer);
- RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
- _tuple_desc->slots().size()));
- }
- counter->num_rows_unselected += old_rows - block->rows();
- if (block->rows() == 0) {
- continue;
- }
-
- std::unique_lock<std::mutex> l(_batch_queue_lock);
- while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() &&
- // stop pushing more batch if
- // 1. too many batches in queue, or
- // 2. at least one batch in queue and memory exceed limit.
- (_block_queue.size() >= _max_buffered_batches || !_block_queue.empty())) {
- _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
- }
- // Process already set failed, so we just return OK
- if (!_process_status.ok()) {
- return Status::OK();
- }
- // Scan already finished, just return
- if (_scan_finished.load()) {
- return Status::OK();
- }
- // Runtime state is canceled, just return cancel
- if (_runtime_state->is_cancelled()) {
- return Status::Cancelled("Cancelled");
- }
- // Queue size Must be smaller than _max_buffered_batches
- _block_queue.push_back(block);
-
- // Notify reader to process
- _queue_reader_cond.notify_one();
- }
- return Status::OK();
-}
-
-void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
- Thread::set_self_name("file_scanner");
- Status status = Status::OK();
- ScannerCounter counter;
- const TFileScanRange& scan_range =
- _scan_ranges[start_idx].scan_range.ext_scan_range.file_scan_range;
- status = scanner_scan(scan_range, &counter);
- if (!status.ok()) {
- LOG(WARNING) << "Scanner[" << start_idx
- << "] process failed. status=" << status.get_error_msg();
- }
-
- // Update stats
- _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered);
- _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected);
- COUNTER_UPDATE(_num_rows_filtered, counter.num_rows_unselected);
-
- // scanner is going to finish
- {
- std::lock_guard<std::mutex> l(_batch_queue_lock);
- if (!status.ok()) {
- update_status(status);
- }
- // This scanner will finish
- _num_running_scanners--;
- }
- _queue_reader_cond.notify_all();
- // If one scanner failed, others don't need scan any more
- if (!status.ok()) {
- _queue_writer_cond.notify_all();
- }
- p_status.set_value(status);
-}
-
-std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
- ScannerCounter* counter) {
- FileScanner* scan = nullptr;
- switch (scan_range.params.format_type) {
- case TFileFormatType::FORMAT_PARQUET:
- scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
- scan_range.ranges, _pre_filter_texprs, counter);
- break;
- case TFileFormatType::FORMAT_ORC:
- scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params,
- scan_range.ranges, _pre_filter_texprs, counter);
- break;
-
- default:
- scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params,
- scan_range.ranges, _pre_filter_texprs, counter);
- }
- scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
- std::unique_ptr<FileScanner> scanner(scan);
- return scanner;
-}
-
-// This function is called after plan node has been prepared.
-Status FileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
- int max_scanners = config::doris_scanner_thread_pool_thread_num;
- if (scan_ranges.size() <= max_scanners) {
- _scan_ranges = scan_ranges;
- } else {
- // There is no need for the number of scanners to exceed the number of threads in thread pool.
- _scan_ranges.clear();
- auto range_iter = scan_ranges.begin();
- for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
- _scan_ranges.push_back(*range_iter);
- }
- for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
- if (i == max_scanners) {
- i = 0;
- }
- auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
- auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
- ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
- }
- _scan_ranges.shrink_to_fit();
- LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
- }
- return Status::OK();
-}
-
-void FileScanNode::debug_string(int ident_level, std::stringstream* out) const {
- (*out) << "FileScanNode";
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h
deleted file mode 100644
index 850c74db0c..0000000000
--- a/be/src/vec/exec/file_scan_node.h
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <atomic>
-#include <condition_variable>
-#include <future>
-#include <map>
-#include <mutex>
-#include <string>
-#include <thread>
-#include <vector>
-
-#include "common/status.h"
-#include "exec/base_scanner.h"
-#include "exec/scan_node.h"
-#include "exprs/runtime_filter.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "runtime/descriptors.h"
-#include "vec/exec/file_scanner.h"
-namespace doris {
-
-class RuntimeState;
-class Status;
-
-namespace vectorized {
-class FileScanNode final : public ScanNode {
-public:
- FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
- ~FileScanNode() override = default;
-
- // Called after create this scan node
- Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
-
- // Prepare partition infos & set up timer
- Status prepare(RuntimeState* state) override;
-
- // Start file scan using ParquetScanner or OrcScanner.
- Status open(RuntimeState* state) override;
-
- // Fill the next row batch by calling next() on the scanner,
- virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
- return Status::NotSupported("Not Implemented FileScanNode::get_next.");
- }
-
- Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
-
- // Close the scanner, and report errors.
- Status close(RuntimeState* state) override;
-
- // No use
- Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
-
-private:
- // Write debug string of this into out.
- void debug_string(int indentation_level, std::stringstream* out) const override;
-
- // Update process status to one failed status,
- // NOTE: Must hold the mutex of this scan node
- bool update_status(const Status& new_status) {
- if (_process_status.ok()) {
- _process_status = new_status;
- return true;
- }
- return false;
- }
-
- std::unique_ptr<FileScanner> create_scanner(const TFileScanRange& scan_range,
- ScannerCounter* counter);
-
- Status start_scanners();
-
- void scanner_worker(int start_idx, int length, std::promise<Status>& p_status);
- // Scan one range
- Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter);
-
- Status _acquire_and_build_runtime_filter(RuntimeState* state);
-
- TupleId _tuple_id;
- RuntimeState* _runtime_state;
- TupleDescriptor* _tuple_desc;
- std::map<std::string, SlotDescriptor*> _slots_map;
- std::vector<TScanRangeParams> _scan_ranges;
-
- std::mutex _batch_queue_lock;
- std::condition_variable _queue_reader_cond;
- std::condition_variable _queue_writer_cond;
- std::deque<std::shared_ptr<RowBatch>> _batch_queue;
-
- int _num_running_scanners;
-
- std::atomic<bool> _scan_finished;
-
- Status _process_status;
-
- std::vector<std::promise<Status>> _scanners_status;
-
- int _max_buffered_batches;
-
- // The origin preceding filter exprs.
- // These exprs will be converted to expr context
- // in XXXScanner.
- // Because the row descriptor used for these exprs is `src_row_desc`,
- // which is initialized in XXXScanner.
- std::vector<TExpr> _pre_filter_texprs;
-
- RuntimeProfile::Counter* _wait_scanner_timer;
- RuntimeProfile::Counter* _num_rows_filtered;
- RuntimeProfile::Counter* _filter_timer;
- RuntimeProfile::Counter* _num_scanners;
-
- std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
- std::unique_ptr<MutableBlock> _mutable_block;
-
-protected:
- struct RuntimeFilterContext {
- RuntimeFilterContext() : apply_mark(false), runtimefilter(nullptr) {}
- bool apply_mark;
- IRuntimeFilter* runtimefilter;
- };
-
- const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const {
- return _runtime_filter_descs;
- }
- std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
- std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
- std::vector<bool> _runtime_filter_ready_flag;
- std::vector<std::unique_ptr<std::mutex>> _rf_locks;
- std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs;
- std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
-};
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp
deleted file mode 100644
index 04eb03db83..0000000000
--- a/be/src/vec/exec/file_scanner.cpp
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "file_scanner.h"
-
-#include <fmt/format.h>
-
-#include <vec/data_types/data_type_factory.hpp>
-
-#include "common/logging.h"
-#include "common/utils.h"
-#include "exec/exec_node.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "runtime/descriptors.h"
-#include "runtime/raw_value.h"
-#include "runtime/runtime_state.h"
-#include "runtime/tuple.h"
-
-namespace doris::vectorized {
-
-FileScanner::FileScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
- : _state(state),
- _params(params),
- _ranges(ranges),
- _next_range(0),
- _counter(counter),
- _mem_pool(std::make_unique<MemPool>()),
- _pre_filter_texprs(pre_filter_texprs),
- _profile(profile),
- _rows_read_counter(nullptr),
- _read_timer(nullptr),
- _scanner_eof(false) {
- _text_converter.reset(new (std::nothrow) TextConverter('\\'));
-}
-
-Status FileScanner::open() {
- _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
- _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
- return _init_expr_ctxes();
-}
-
-void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId,
- const std::vector<ExprContext*>& conjunct_ctxs) {
- _conjunct_ctxs = conjunct_ctxs;
- _tupleId = tupleId;
-}
-
-Status FileScanner::_init_expr_ctxes() {
- const TupleDescriptor* src_tuple_desc =
- _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id);
- if (src_tuple_desc == nullptr) {
- std::stringstream ss;
- ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id;
- return Status::InternalError(ss.str());
- }
- DCHECK(!_ranges.empty());
-
- std::map<SlotId, int> _full_src_index_map;
- std::map<SlotId, SlotDescriptor*> _full_src_slot_map;
- int index = 0;
- for (const auto& slot_desc : src_tuple_desc->slots()) {
- _full_src_slot_map.emplace(slot_desc->id(), slot_desc);
- _full_src_index_map.emplace(slot_desc->id(), index++);
- }
-
- _num_of_columns_from_file = _params.num_of_columns_from_file;
- for (const auto& slot_info : _params.required_slots) {
- auto slot_id = slot_info.slot_id;
- auto it = _full_src_slot_map.find(slot_id);
- if (it == std::end(_full_src_slot_map)) {
- std::stringstream ss;
- ss << "Unknown source slot descriptor, slot_id=" << slot_id;
- return Status::InternalError(ss.str());
- }
- _required_slot_descs.emplace_back(it->second);
- if (slot_info.is_file_slot) {
- _file_slot_descs.emplace_back(it->second);
- auto iti = _full_src_index_map.find(slot_id);
- _file_slot_index_map.emplace(slot_id, iti->second);
- } else {
- _partition_slot_descs.emplace_back(it->second);
- auto iti = _full_src_index_map.find(slot_id);
- _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
- }
- }
-
- _row_desc.reset(new RowDescriptor(_state->desc_tbl(),
- std::vector<TupleId>({_params.src_tuple_id}),
- std::vector<bool>({false})));
-
- // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
- if (!_pre_filter_texprs.empty()) {
- // for vectorized, preceding filter exprs should be compounded to one passed from fe.
- DCHECK(_pre_filter_texprs.size() == 1);
- _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
- RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
- _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
- RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc));
- RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
- }
-
- return Status::OK();
-}
-
-void FileScanner::close() {
- if (_vpre_filter_ctx_ptr) {
- (*_vpre_filter_ctx_ptr)->close(_state);
- }
- if (_rows_read_counter) {
- COUNTER_UPDATE(_rows_read_counter, _read_row_counter);
- }
-}
-
-Status FileScanner::init_block(vectorized::Block* block) {
- (*block).clear();
- _rows = 0;
- for (const auto& slot_desc : _required_slot_descs) {
- if (slot_desc == nullptr) {
- continue;
- }
- auto is_nullable = slot_desc->is_nullable();
- auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
- is_nullable);
- if (data_type == nullptr) {
- return Status::NotSupported(
- fmt::format("Not support type for column:{}", slot_desc->col_name()));
- }
- MutableColumnPtr data_column = data_type->create_column();
- (*block).insert(
- ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
- }
- return Status::OK();
-}
-
-Status FileScanner::_filter_block(vectorized::Block* _block) {
- auto origin_column_num = (*_block).columns();
- // filter block
- auto old_rows = (*_block).rows();
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, _block,
- origin_column_num));
- _counter->num_rows_unselected += old_rows - (*_block).rows();
- return Status::OK();
-}
-
-Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) {
- *eof = _scanner_eof;
- _read_row_counter += _block->rows();
- if (LIKELY(_rows > 0)) {
- RETURN_IF_ERROR(_filter_block(_block));
- }
-
- return Status::OK();
-}
-
-Status FileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) {
- const TFileRangeDesc& range = _ranges.at(_next_range - 1);
- if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
- for (const auto& slot_desc : _partition_slot_descs) {
- if (slot_desc == nullptr) continue;
- auto it = _partition_slot_index_map.find(slot_desc->id());
- if (it == std::end(_partition_slot_index_map)) {
- std::stringstream ss;
- ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id();
- return Status::InternalError(ss.str());
- }
- const std::string& column_from_path = range.columns_from_path[it->second];
-
- auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
- IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-
- for (size_t j = 0; j < rows; ++j) {
- _text_converter->write_vec_column(slot_desc, col_ptr,
- const_cast<char*>(column_from_path.c_str()),
- column_from_path.size(), true, false);
- }
- }
- }
- return Status::OK();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h
deleted file mode 100644
index df4c1d4ef6..0000000000
--- a/be/src/vec/exec/file_scanner.h
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/status.h"
-#include "exec/base_scanner.h"
-#include "exec/text_converter.h"
-#include "exprs/expr.h"
-#include "util/runtime_profile.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-
-namespace doris::vectorized {
-class FileScanNode;
-class FileScanner {
-public:
- FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
- virtual ~FileScanner() = default;
-
- virtual void reg_conjunct_ctxs(const TupleId& tupleId,
- const std::vector<ExprContext*>& conjunct_ctxs);
-
- // Open this scanner, will initialize information need to
- virtual Status open();
-
- // Get next block
- virtual Status get_next(vectorized::Block* block, bool* eof) {
- return Status::NotSupported("Not Implemented get block");
- }
-
- // Close this scanner
- virtual void close() = 0;
-
- std::vector<bool>* mutable_runtime_filter_marks() { return &_runtime_filter_marks; }
-
-protected:
- virtual void _init_profiles(RuntimeProfile* profile) = 0;
-
- Status finalize_block(vectorized::Block* dest_block, bool* eof);
- Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows);
- Status init_block(vectorized::Block* block);
-
- std::unique_ptr<TextConverter> _text_converter;
-
- RuntimeState* _state;
- const TFileScanRangeParams& _params;
-
- const std::vector<TFileRangeDesc>& _ranges;
- int _next_range;
- // used for process stat
- ScannerCounter* _counter;
-
- // Used for constructing tuple
- std::vector<SlotDescriptor*> _required_slot_descs;
- std::vector<SlotDescriptor*> _file_slot_descs;
- std::map<SlotId, int> _file_slot_index_map;
- std::vector<SlotDescriptor*> _partition_slot_descs;
- std::map<SlotId, int> _partition_slot_index_map;
-
- std::unique_ptr<RowDescriptor> _row_desc;
-
- // Mem pool used to allocate _src_tuple and _src_tuple_row
- std::unique_ptr<MemPool> _mem_pool;
-
- const std::vector<TExpr> _pre_filter_texprs;
-
- // Profile
- RuntimeProfile* _profile;
- RuntimeProfile::Counter* _rows_read_counter;
- RuntimeProfile::Counter* _read_timer;
-
- bool _scanner_eof = false;
- int _rows = 0;
- long _read_row_counter = 0;
-
- std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
- int _num_of_columns_from_file;
-
- // File formats based push down predicate
- std::vector<ExprContext*> _conjunct_ctxs;
- // slot_ids for parquet predicate push down are in tuple desc
- TupleId _tupleId;
-
- // to record which runtime filters have been used
- std::vector<bool> _runtime_filter_marks;
-
- FileScanNode* _parent;
-
-private:
- Status _init_expr_ctxes();
- Status _filter_block(vectorized::Block* output_block);
-};
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp
deleted file mode 100644
index aee756868e..0000000000
--- a/be/src/vec/exec/file_text_scanner.cpp
+++ /dev/null
@@ -1,294 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/file_text_scanner.h"
-
-#include <fmt/format.h>
-#include <gen_cpp/internal_service.pb.h>
-
-#include <iostream>
-
-#include "exec/exec_node.h"
-#include "exec/plain_text_line_reader.h"
-#include "exec/text_converter.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "io/buffered_reader.h"
-#include "io/file_factory.h"
-#include "io/hdfs_reader_writer.h"
-#include "util/types.h"
-#include "util/utf8_check.h"
-
-namespace doris::vectorized {
-
-FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params,
- const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs,
- ScannerCounter* counter)
- : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
- _cur_file_reader(nullptr),
- _cur_line_reader(nullptr),
- _cur_line_reader_eof(false),
- _skip_lines(0),
- _success(false)
-
-{
- _init_profiles(profile);
- if (params.file_attributes.__isset.text_params) {
- auto text_params = params.file_attributes.text_params;
- if (text_params.__isset.column_separator) {
- _value_separator = text_params.column_separator;
- _value_separator_length = _value_separator.length();
- }
- if (text_params.__isset.line_delimiter) {
- _line_delimiter = text_params.line_delimiter;
- _line_delimiter_length = _line_delimiter.length();
- }
- }
-}
-
-FileTextScanner::~FileTextScanner() {
- close();
-}
-
-Status FileTextScanner::open() {
- RETURN_IF_ERROR(FileScanner::open());
-
- if (_ranges.empty()) {
- return Status::OK();
- }
- _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
- return Status::OK();
-}
-
-void FileTextScanner::close() {
- FileScanner::close();
-
- if (_cur_line_reader != nullptr) {
- delete _cur_line_reader;
- _cur_line_reader = nullptr;
- }
-}
-
-Status FileTextScanner::get_next(Block* block, bool* eof) {
- SCOPED_TIMER(_read_timer);
- RETURN_IF_ERROR(init_block(block));
-
- const int batch_size = _state->batch_size();
-
- int current_rows = _rows;
- while (_rows < batch_size && !_scanner_eof) {
- if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
- RETURN_IF_ERROR(_open_next_reader());
- // If there isn't any more reader, break this
- if (_scanner_eof) {
- continue;
- }
- }
- const uint8_t* ptr = nullptr;
- size_t size = 0;
- RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof));
- if (_skip_lines > 0) {
- _skip_lines--;
- continue;
- }
- if (size == 0) {
- // Read empty row, just continue
- continue;
- }
- {
- COUNTER_UPDATE(_rows_read_counter, 1);
- RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
- }
- if (_cur_line_reader_eof) {
- RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows));
- current_rows = _rows;
- }
- }
-
- return finalize_block(block, eof);
-}
-
-Status FileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) {
- RETURN_IF_ERROR(_line_split_to_values(line));
- if (!_success) {
- // If not success, which means we met an invalid row, return.
- return Status::OK();
- }
-
- for (int i = 0; i < _split_values.size(); ++i) {
- auto slot_desc = _file_slot_descs[i];
- const Slice& value = _split_values[i];
-
- auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
- IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
- _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false);
- }
- _rows++;
- return Status::OK();
-}
-
-Status FileTextScanner::_open_next_reader() {
- if (_next_range >= _ranges.size()) {
- _scanner_eof = true;
- return Status::OK();
- }
-
- RETURN_IF_ERROR(_open_file_reader());
- RETURN_IF_ERROR(_open_line_reader());
- _next_range++;
-
- return Status::OK();
-}
-
-Status FileTextScanner::_open_file_reader() {
- const TFileRangeDesc& range = _ranges[_next_range];
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
- range.start_offset, range.file_size, 0,
- _cur_file_reader));
- return _cur_file_reader->open();
-}
-
-Status FileTextScanner::_open_line_reader() {
- if (_cur_line_reader != nullptr) {
- delete _cur_line_reader;
- _cur_line_reader = nullptr;
- }
-
- const TFileRangeDesc& range = _ranges[_next_range];
- int64_t size = range.size;
- if (range.start_offset != 0) {
- if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
- std::stringstream ss;
- ss << "For now we do not support split compressed file";
- return Status::InternalError(ss.str());
- }
- size += 1;
- // not first range will always skip one line
- _skip_lines = 1;
- }
-
- // open line reader
- switch (_params.format_type) {
- case TFileFormatType::FORMAT_CSV_PLAIN:
- _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
- _line_delimiter, _line_delimiter_length);
- break;
- default: {
- std::stringstream ss;
- ss << "Unknown format type, cannot init line reader, type=" << _params.format_type;
- return Status::InternalError(ss.str());
- }
- }
-
- _cur_line_reader_eof = false;
-
- return Status::OK();
-}
-
-Status FileTextScanner::_line_split_to_values(const Slice& line) {
- if (!validate_utf8(line.data, line.size)) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- []() -> std::string { return "Unable to display"; },
- []() -> std::string {
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg, "{}", "Unable to display");
- return fmt::to_string(error_msg);
- },
- &_scanner_eof));
- _counter->num_rows_filtered++;
- _success = false;
- return Status::OK();
- }
-
- RETURN_IF_ERROR(_split_line(line));
-
- _success = true;
- return Status::OK();
-}
-
-Status FileTextScanner::_split_line(const Slice& line) {
- _split_values.clear();
- std::vector<Slice> tmp_split_values;
- tmp_split_values.reserve(_num_of_columns_from_file);
-
- const char* value = line.data;
- size_t start = 0; // point to the start pos of next col value.
- size_t curpos = 0; // point to the start pos of separator matching sequence.
- size_t p1 = 0; // point to the current pos of separator matching sequence.
- size_t non_space = 0; // point to the last pos of non_space charactor.
-
- // Separator: AAAA
- //
- // p1
- // ▼
- // AAAA
- // 1000AAAA2000AAAA
- // ▲ ▲
- // Start │
- // curpos
-
- while (curpos < line.size) {
- if (*(value + curpos + p1) != _value_separator[p1]) {
- // Not match, move forward:
- curpos += (p1 == 0 ? 1 : p1);
- p1 = 0;
- } else {
- p1++;
- if (p1 == _value_separator_length) {
- // Match a separator
- non_space = curpos;
- // Trim tailing spaces. Be consistent with hive and trino's behavior.
- if (_state->trim_tailing_spaces_for_external_table_query()) {
- while (non_space > start && *(value + non_space - 1) == ' ') {
- non_space--;
- }
- }
- tmp_split_values.emplace_back(value + start, non_space - start);
- start = curpos + _value_separator_length;
- curpos = start;
- p1 = 0;
- non_space = 0;
- }
- }
- }
-
- CHECK(curpos == line.size) << curpos << " vs " << line.size;
- non_space = curpos;
- if (_state->trim_tailing_spaces_for_external_table_query()) {
- while (non_space > start && *(value + non_space - 1) == ' ') {
- non_space--;
- }
- }
-
- tmp_split_values.emplace_back(value + start, non_space - start);
- for (const auto& slot : _file_slot_descs) {
- auto it = _file_slot_index_map.find(slot->id());
- if (it == std::end(_file_slot_index_map)) {
- std::stringstream ss;
- ss << "Unknown _file_slot_index_map, slot_id=" << slot->id();
- return Status::InternalError(ss.str());
- }
- int index = it->second;
- CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file;
- _split_values.emplace_back(tmp_split_values[index]);
- }
- return Status::OK();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_text_scanner.h b/be/src/vec/exec/file_text_scanner.h
deleted file mode 100644
index c632a842cb..0000000000
--- a/be/src/vec/exec/file_text_scanner.h
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/consts.h"
-#include "exec/base_scanner.h"
-#include "exec/decompressor.h"
-#include "exec/line_reader.h"
-#include "exec/plain_binary_line_reader.h"
-#include "exec/plain_text_line_reader.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "io/file_factory.h"
-#include "io/file_reader.h"
-#include "vec/exec/file_scanner.h"
-
-namespace doris::vectorized {
-class FileTextScanner final : public FileScanner {
-public:
- FileTextScanner(RuntimeState* state, RuntimeProfile* profile,
- const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges,
- const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
- ~FileTextScanner() override;
-
- Status open() override;
-
- Status get_next(Block* block, bool* eof) override;
- void close() override;
-
-protected:
- void _init_profiles(RuntimeProfile* profile) override {}
-
-private:
- Status _fill_file_columns(const Slice& line, vectorized::Block* _block);
- Status _open_next_reader();
- Status _open_file_reader();
- Status _open_line_reader();
- Status _line_split_to_values(const Slice& line);
- Status _split_line(const Slice& line);
- // Reader
- std::unique_ptr<FileReader> _cur_file_reader;
- LineReader* _cur_line_reader;
- bool _cur_line_reader_eof;
-
- // When we fetch range start from 0, header_type="csv_with_names" skip first line
- // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
- // When we fetch range doesn't start
- int _skip_lines;
- std::vector<Slice> _split_values;
- std::string _value_separator;
- std::string _line_delimiter;
- int _value_separator_length;
- int _line_delimiter_length;
-
- bool _success;
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
deleted file mode 100644
index 5fa6c88764..0000000000
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
+++ /dev/null
@@ -1,265 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/scan/new_file_arrow_scanner.h"
-
-#include "exec/arrow/orc_reader.h"
-#include "exec/arrow/parquet_reader.h"
-#include "io/file_factory.h"
-#include "vec/exec/scan/vscan_node.h"
-#include "vec/functions/simple_function_factory.h"
-#include "vec/utils/arrow_column_to_doris_column.h"
-
-namespace doris::vectorized {
-
-NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent,
- int64_t limit, const TFileScanRange& scan_range,
- RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs)
- : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs),
- _cur_file_reader(nullptr),
- _cur_file_eof(false),
- _batch(nullptr),
- _arrow_batch_cur_idx(0) {}
-
-Status NewFileArrowScanner::open(RuntimeState* state) {
- RETURN_IF_ERROR(NewFileScanner::open(state));
- // SCOPED_TIMER(_parent->_reader_init_timer);
-
- // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);
- return Status::OK();
-}
-
-Status NewFileArrowScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
- // init arrow batch
- {
- Status st = _init_arrow_batch_if_necessary();
- if (!st.ok()) {
- if (!st.is_end_of_file()) {
- return st;
- }
- *eof = true;
- return Status::OK();
- }
- }
-
- *eof = false;
- if (!_is_load) {
- RETURN_IF_ERROR(init_block(block));
- }
- // convert arrow batch to block until reach the batch_size
- while (!_scanner_eof) {
- // cast arrow type to PT0 and append it to block
- // for example: arrow::Type::INT16 => TYPE_SMALLINT
- RETURN_IF_ERROR(_append_batch_to_block(block));
- // finalize the block if full
- if (_rows >= _state->batch_size()) {
- break;
- }
- auto status = _next_arrow_batch();
- // if ok, append the batch to the columns
- if (status.ok()) {
- continue;
- }
- // return error if not EOF
- if (!status.is_end_of_file()) {
- return status;
- }
- _cur_file_eof = true;
- break;
- }
-
- if (_scanner_eof && block->rows() == 0) {
- *eof = true;
- }
- // return finalize_block(block, eof);
- return Status::OK();
-}
-
-Status NewFileArrowScanner::_init_arrow_batch_if_necessary() {
- // 1. init batch if first time
- // 2. reset reader if end of file
- Status status = Status::OK();
- if (_scanner_eof) {
- return Status::EndOfFile("EOF");
- }
- if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
- return _next_arrow_batch();
- }
- return status;
-}
-
-Status NewFileArrowScanner::_append_batch_to_block(Block* block) {
- size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
- (_batch->num_rows() - _arrow_batch_cur_idx));
- for (auto i = 0; i < _file_slot_descs.size(); ++i) {
- SlotDescriptor* slot_desc = _file_slot_descs[i];
- if (slot_desc == nullptr) {
- continue;
- }
- std::string real_column_name = _cur_file_reader->is_case_sensitive()
- ? slot_desc->col_name()
- : slot_desc->col_name_lower_case();
- auto* array = _batch->GetColumnByName(real_column_name).get();
- auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
- RETURN_IF_ERROR(arrow_column_to_doris_column(
- array, _arrow_batch_cur_idx, column_with_type_and_name.column,
- column_with_type_and_name.type, num_elements, _state->timezone_obj()));
- }
- _rows += num_elements;
- _arrow_batch_cur_idx += num_elements;
- return _fill_columns_from_path(block, num_elements);
-}
-
-Status NewFileArrowScanner::_convert_to_output_block(Block* output_block) {
- if (!config::enable_new_load_scan_node) {
- return Status::OK();
- }
- if (_input_block_ptr == output_block) {
- return Status::OK();
- }
- RETURN_IF_ERROR(_cast_src_block(_input_block_ptr));
- if (LIKELY(_input_block_ptr->rows() > 0)) {
- RETURN_IF_ERROR(_materialize_dest_block(output_block));
- }
-
- return Status::OK();
-}
-
-// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
-// primitive type(PT1) ==materialize_block==> dest primitive type
-Status NewFileArrowScanner::_cast_src_block(Block* block) {
- // cast primitive type(PT0) to primitive type(PT1)
- for (size_t i = 0; i < _num_of_columns_from_file; ++i) {
- SlotDescriptor* slot_desc = _required_slot_descs[i];
- if (slot_desc == nullptr) {
- continue;
- }
- auto& arg = block->get_by_name(slot_desc->col_name());
- // remove nullable here, let the get_function decide whether nullable
- auto return_type = slot_desc->get_data_type_ptr();
- ColumnsWithTypeAndName arguments {
- arg,
- {DataTypeString().create_column_const(
- arg.column->size(), remove_nullable(return_type)->get_family_name()),
- std::make_shared<DataTypeString>(), ""}};
- auto func_cast =
- SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
- RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size()));
- block->get_by_position(i).type = std::move(return_type);
- }
- return Status::OK();
-}
-
-Status NewFileArrowScanner::_next_arrow_batch() {
- _arrow_batch_cur_idx = 0;
- // first, init file reader
- if (_cur_file_reader == nullptr || _cur_file_eof) {
- RETURN_IF_ERROR(_open_next_reader());
- _cur_file_eof = false;
- }
- // second, loop until find available arrow batch or EOF
- while (!_scanner_eof) {
- RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
- if (_cur_file_eof) {
- RETURN_IF_ERROR(_open_next_reader());
- _cur_file_eof = false;
- continue;
- }
- if (_batch->num_rows() == 0) {
- continue;
- }
- return Status::OK();
- }
- return Status::EndOfFile("EOF");
-}
-
-Status NewFileArrowScanner::_open_next_reader() {
- // open_file_reader
- if (_cur_file_reader != nullptr) {
- delete _cur_file_reader;
- _cur_file_reader = nullptr;
- }
-
- while (true) {
- if (_next_range >= _ranges.size()) {
- _scanner_eof = true;
- return Status::OK();
- }
- const TFileRangeDesc& range = _ranges[_next_range++];
- std::unique_ptr<FileReader> file_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
- range.start_offset, range.file_size, 0,
- file_reader));
- RETURN_IF_ERROR(file_reader->open());
- if (file_reader->size() == 0) {
- file_reader->close();
- continue;
- }
-
- int32_t num_of_columns_from_file = _file_slot_descs.size();
-
- _cur_file_reader =
- _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file,
- range.start_offset, range.size);
-
- auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_parent->output_tuple_id());
- // TODO _conjunct_ctxs is empty for now. _conjunct_ctxs is not empty.
- Status status =
- _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone());
- if (status.is_end_of_file()) {
- continue;
- } else {
- if (!status.ok()) {
- std::stringstream ss;
- ss << " file: " << range.path << " error:" << status.get_error_msg();
- return Status::InternalError(ss.str());
- } else {
- // _update_profile(_cur_file_reader->statistics());
- return status;
- }
- }
- }
-}
-
-NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent,
- int64_t limit, const TFileScanRange& scan_range,
- RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs)
- : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {
- // _init_profiles(profile);
-}
-
-ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(
- const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
- int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
- return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
- range_start_offset, range_size, false);
-}
-
-NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs)
- : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {}
-
-ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(
- const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
- int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
- return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
- range_start_offset, range_size, false);
-}
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h
deleted file mode 100644
index 281373a70d..0000000000
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.h
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <exec/arrow/arrow_reader.h>
-
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/function_filter.h"
-#include "vec/exec/scan/new_file_scanner.h"
-#include "vec/exec/scan/vscanner.h"
-
-namespace doris::vectorized {
-class NewFileArrowScanner : public NewFileScanner {
-public:
- NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs);
- Status open(RuntimeState* state) override;
-
-protected:
- Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
- virtual ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
- FileReader* file_reader,
- int32_t num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size) = 0;
- // Convert input block to output block, if needed.
- Status _convert_to_output_block(Block* output_block);
-
-private:
- Status _open_next_reader();
- Status _next_arrow_batch();
- Status _init_arrow_batch_if_necessary();
- Status _append_batch_to_block(Block* block);
- Status _cast_src_block(Block* block);
-
-private:
- // Reader
- ArrowReaderWrap* _cur_file_reader;
- bool _cur_file_eof; // is read over?
- std::shared_ptr<arrow::RecordBatch> _batch;
- size_t _arrow_batch_cur_idx;
-};
-
-class NewFileParquetScanner final : public NewFileArrowScanner {
-public:
- NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs);
-
- ~NewFileParquetScanner() override = default;
-
-protected:
- ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
- FileReader* file_reader, int32_t num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size) override;
-
- void _init_profiles(RuntimeProfile* profile) override {};
-};
-
-class NewFileORCScanner final : public NewFileArrowScanner {
-public:
- NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs);
-
- ~NewFileORCScanner() override = default;
-
-protected:
- ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
- FileReader* file_reader, int32_t num_of_columns_from_file,
- int64_t range_start_offset, int64_t range_size) override;
- void _init_profiles(RuntimeProfile* profile) override {};
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 039082ab1c..49319f1e6b 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -18,8 +18,6 @@
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/columns/column_const.h"
-#include "vec/exec/scan/new_file_arrow_scanner.h"
-#include "vec/exec/scan/new_file_text_scanner.h"
#include "vec/exec/scan/new_olap_scanner.h"
#include "vec/exec/scan/vfile_scanner.h"
#include "vec/functions/in.h"
@@ -102,28 +100,9 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
}
VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
- VScanner* scanner = nullptr;
- if (config::enable_new_file_scanner) {
- scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile());
- ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
- } else {
- switch (scan_range.params.format_type) {
- case TFileFormatType::FORMAT_PARQUET:
- scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range,
- runtime_profile(), std::vector<TExpr>());
- break;
- case TFileFormatType::FORMAT_ORC:
- scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range,
- runtime_profile(), std::vector<TExpr>());
- break;
-
- default:
- scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range,
- runtime_profile(), std::vector<TExpr>());
- break;
- }
- ((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get());
- }
+ VScanner* scanner =
+ new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile());
+ ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
_scanner_pool.add(scanner);
// TODO: Can we remove _conjunct_ctxs and use _vconjunct_ctx_ptr instead?
scanner->reg_conjunct_ctxs(_conjunct_ctxs);
diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp
deleted file mode 100644
index 6e511fe10f..0000000000
--- a/be/src/vec/exec/scan/new_file_scanner.cpp
+++ /dev/null
@@ -1,317 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/scan/new_file_scanner.h"
-
-#include <fmt/format.h>
-
-#include <vec/data_types/data_type_factory.hpp>
-
-#include "common/logging.h"
-#include "common/utils.h"
-#include "exec/exec_node.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "runtime/descriptors.h"
-#include "runtime/raw_value.h"
-#include "runtime/runtime_state.h"
-#include "runtime/tuple.h"
-#include "vec/exec/scan/new_file_scan_node.h"
-
-namespace doris::vectorized {
-
-NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs)
- : VScanner(state, static_cast<VScanNode*>(parent), limit),
- _params(scan_range.params),
- _ranges(scan_range.ranges),
- _next_range(0),
- _mem_pool(std::make_unique<MemPool>()),
- _profile(profile),
- _pre_filter_texprs(pre_filter_texprs),
- _strict_mode(false) {}
-
-Status NewFileScanner::open(RuntimeState* state) {
- RETURN_IF_ERROR(VScanner::open(state));
- RETURN_IF_ERROR(_init_expr_ctxes());
- return Status::OK();
-}
-
-Status NewFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
- if (vconjunct_ctx_ptr != nullptr) {
- // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
- RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
- }
-
- return Status::OK();
-}
-
-Status NewFileScanner::_init_expr_ctxes() {
- if (_input_tuple_desc == nullptr) {
- std::stringstream ss;
- ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id;
- return Status::InternalError(ss.str());
- }
- DCHECK(!_ranges.empty());
-
- std::map<SlotId, int> _full_src_index_map;
- std::map<SlotId, SlotDescriptor*> _full_src_slot_map;
- int index = 0;
- for (const auto& slot_desc : _input_tuple_desc->slots()) {
- _full_src_slot_map.emplace(slot_desc->id(), slot_desc);
- _full_src_index_map.emplace(slot_desc->id(), index++);
- }
-
- _num_of_columns_from_file = _params.num_of_columns_from_file;
- for (const auto& slot_info : _params.required_slots) {
- auto slot_id = slot_info.slot_id;
- auto it = _full_src_slot_map.find(slot_id);
- if (it == std::end(_full_src_slot_map)) {
- std::stringstream ss;
- ss << "Unknown source slot descriptor, slot_id=" << slot_id;
- return Status::InternalError(ss.str());
- }
- _required_slot_descs.emplace_back(it->second);
- if (slot_info.is_file_slot) {
- _file_slot_descs.emplace_back(it->second);
- auto iti = _full_src_index_map.find(slot_id);
- _file_slot_index_map.emplace(slot_id, iti->second);
- } else {
- _partition_slot_descs.emplace_back(it->second);
- auto iti = _full_src_index_map.find(slot_id);
- _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
- }
- }
-
- _src_tuple = (doris::Tuple*)_mem_pool->allocate(_input_tuple_desc->byte_size());
- _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*));
- _src_tuple_row->set_tuple(0, _src_tuple);
- _row_desc.reset(new RowDescriptor(_state->desc_tbl(),
- std::vector<TupleId>({_params.src_tuple_id}),
- std::vector<bool>({false})));
-
- // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
- if (!_pre_filter_texprs.empty()) {
- // for vectorized, preceding filter exprs should be compounded to one passed from fe.
- DCHECK(_pre_filter_texprs.size() == 1);
- _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
- RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
- _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
- RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc));
- RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
- }
-
- // Construct dest slots information
- if (config::enable_new_load_scan_node) {
- if (_output_tuple_desc == nullptr) {
- return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}",
- _params.dest_tuple_id);
- }
-
- bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans;
- for (auto slot_desc : _output_tuple_desc->slots()) {
- if (!slot_desc->is_materialized()) {
- continue;
- }
- auto it = _params.expr_of_dest_slot.find(slot_desc->id());
- if (it == std::end(_params.expr_of_dest_slot)) {
- return Status::InternalError("No expr for dest slot, id={}, name={}",
- slot_desc->id(), slot_desc->col_name());
- }
-
- vectorized::VExprContext* ctx = nullptr;
- RETURN_IF_ERROR(
- vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
- RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get()));
- RETURN_IF_ERROR(ctx->open(_state));
- _dest_vexpr_ctx.emplace_back(ctx);
- if (has_slot_id_map) {
- auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
- if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) {
- _src_slot_descs_order_by_dest.emplace_back(nullptr);
- } else {
- auto _src_slot_it = _full_src_slot_map.find(it1->second);
- if (_src_slot_it == std::end(_full_src_slot_map)) {
- return Status::InternalError("No src slot {} in src slot descs",
- it1->second);
- }
- _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
- }
- }
- }
- }
-
- return Status::OK();
-}
-
-Status NewFileScanner::init_block(vectorized::Block* block) {
- (*block).clear();
- _rows = 0;
- for (const auto& slot_desc : _required_slot_descs) {
- if (slot_desc == nullptr) {
- continue;
- }
- auto is_nullable = slot_desc->is_nullable();
- auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
- is_nullable);
- if (data_type == nullptr) {
- return Status::NotSupported(
- fmt::format("Not support type for column:{}", slot_desc->col_name()));
- }
- MutableColumnPtr data_column = data_type->create_column();
- (*block).insert(
- ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
- }
- return Status::OK();
-}
-
-Status NewFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) {
- const TFileRangeDesc& range = _ranges.at(_next_range - 1);
- if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
- for (const auto& slot_desc : _partition_slot_descs) {
- if (slot_desc == nullptr) continue;
- auto it = _partition_slot_index_map.find(slot_desc->id());
- if (it == std::end(_partition_slot_index_map)) {
- std::stringstream ss;
- ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id();
- return Status::InternalError(ss.str());
- }
- const std::string& column_from_path = range.columns_from_path[it->second];
-
- auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
- IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-
- for (size_t j = 0; j < rows; ++j) {
- _text_converter->write_vec_column(slot_desc, col_ptr,
- const_cast<char*>(column_from_path.c_str()),
- column_from_path.size(), true, false);
- }
- }
- }
- return Status::OK();
-}
-
-Status NewFileScanner::_filter_input_block(Block* block) {
- if (!config::enable_new_load_scan_node) {
- return Status::OK();
- }
- if (_is_load) {
- auto origin_column_num = block->columns();
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, block,
- origin_column_num));
- }
- return Status::OK();
-}
-
-Status NewFileScanner::_materialize_dest_block(vectorized::Block* dest_block) {
- // Do vectorized expr here
- int ctx_idx = 0;
- size_t rows = _input_block.rows();
- auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
- auto& filter_map = filter_column->get_data();
- auto origin_column_num = _input_block.columns();
-
- for (auto slot_desc : _output_tuple_desc->slots()) {
- if (!slot_desc->is_materialized()) {
- continue;
- }
- int dest_index = ctx_idx++;
-
- auto* ctx = _dest_vexpr_ctx[dest_index];
- int result_column_id = -1;
- // PT1 => dest primitive type
- RETURN_IF_ERROR(ctx->execute(_input_block_ptr, &result_column_id));
- bool is_origin_column = result_column_id < origin_column_num;
- auto column_ptr =
- is_origin_column && _src_block_mem_reuse
- ? _input_block.get_by_position(result_column_id).column->clone_resized(rows)
- : _input_block.get_by_position(result_column_id).column;
-
- DCHECK(column_ptr != nullptr);
-
- // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
- // is likely to be nullable
- if (LIKELY(column_ptr->is_nullable())) {
- auto nullable_column =
- reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
- for (int i = 0; i < rows; ++i) {
- if (filter_map[i] && nullable_column->is_null_at(i)) {
- if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
- !_input_block.get_by_position(dest_index).column->is_null_at(i)) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return _input_block.dump_one_line(i, _num_of_columns_from_file);
- },
- [&]() -> std::string {
- auto raw_value = _input_block.get_by_position(ctx_idx)
- .column->get_data_at(i);
- std::string raw_string = raw_value.to_string();
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg,
- "column({}) value is incorrect while strict "
- "mode is {}, "
- "src value is {}",
- slot_desc->col_name(), _strict_mode, raw_string);
- return fmt::to_string(error_msg);
- },
- &_scanner_eof));
- filter_map[i] = false;
- } else if (!slot_desc->is_nullable()) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return _input_block.dump_one_line(i, _num_of_columns_from_file);
- },
- [&]() -> std::string {
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg,
- "column({}) values is null while columns is not "
- "nullable",
- slot_desc->col_name());
- return fmt::to_string(error_msg);
- },
- &_scanner_eof));
- filter_map[i] = false;
- }
- }
- }
- if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr();
- } else if (slot_desc->is_nullable()) {
- column_ptr = vectorized::make_nullable(column_ptr);
- }
- dest_block->insert(vectorized::ColumnWithTypeAndName(
- std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
- }
-
- // after do the dest block insert operation, clear _src_block to remove the reference of origin column
- if (_src_block_mem_reuse) {
- _input_block.clear_column_data(origin_column_num);
- } else {
- _input_block.clear();
- }
-
- size_t dest_size = dest_block->columns();
- // do filter
- dest_block->insert(vectorized::ColumnWithTypeAndName(
- std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
- "filter column"));
- RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size));
-
- return Status::OK();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h
deleted file mode 100644
index 50423bd3e6..0000000000
--- a/be/src/vec/exec/scan/new_file_scanner.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/text_converter.h"
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/function_filter.h"
-#include "runtime/tuple.h"
-#include "vec/exec/scan/vscanner.h"
-
-namespace doris::vectorized {
-
-class NewFileScanNode;
-
-class NewFileScanner : public VScanner {
-public:
- NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs);
-
- Status open(RuntimeState* state) override;
-
- Status prepare(VExprContext** vconjunct_ctx_ptr);
-
-protected:
- // Use prefilters to filter input block
- Status _filter_input_block(Block* block);
- Status _materialize_dest_block(vectorized::Block* output_block);
-
-protected:
- virtual void _init_profiles(RuntimeProfile* profile) = 0;
-
- Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows);
- Status init_block(vectorized::Block* block);
-
- std::unique_ptr<TextConverter> _text_converter;
-
- const TFileScanRangeParams& _params;
-
- const std::vector<TFileRangeDesc>& _ranges;
- int _next_range;
-
- // Used for constructing tuple
- std::vector<SlotDescriptor*> _required_slot_descs;
- // File source slot descriptors
- std::vector<SlotDescriptor*> _file_slot_descs;
- // File slot id to index map.
- std::map<SlotId, int> _file_slot_index_map;
- // Partition source slot descriptors
- std::vector<SlotDescriptor*> _partition_slot_descs;
- // Partition slot id to index map
- std::map<SlotId, int> _partition_slot_index_map;
- std::unique_ptr<RowDescriptor> _row_desc;
- doris::Tuple* _src_tuple;
- TupleRow* _src_tuple_row;
-
- // Mem pool used to allocate _src_tuple and _src_tuple_row
- std::unique_ptr<MemPool> _mem_pool;
-
- // Profile
- RuntimeProfile* _profile;
- RuntimeProfile::Counter* _rows_read_counter;
- RuntimeProfile::Counter* _read_timer;
-
- bool _scanner_eof = false;
- int _rows = 0;
- int _num_of_columns_from_file;
-
- const std::vector<TExpr> _pre_filter_texprs;
-
- std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
- // to filter src tuple directly.
- std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
-
- // the map values of dest slot id to src slot desc
- // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
- std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
-
- bool _src_block_mem_reuse = false;
- bool _strict_mode;
-
-private:
- Status _init_expr_ctxes();
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp
deleted file mode 100644
index 2202dac76b..0000000000
--- a/be/src/vec/exec/scan/new_file_text_scanner.cpp
+++ /dev/null
@@ -1,263 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/scan/new_file_text_scanner.h"
-
-#include "exec/plain_text_line_reader.h"
-#include "io/file_factory.h"
-#include "util/utf8_check.h"
-#include "vec/exec/scan/vscan_node.h"
-
-namespace doris::vectorized {
-
-NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs)
- : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs),
- _cur_file_reader(nullptr),
- _cur_line_reader(nullptr),
- _cur_line_reader_eof(false),
- _skip_lines(0),
- _success(false) {}
-
-Status NewFileTextScanner::open(RuntimeState* state) {
- RETURN_IF_ERROR(NewFileScanner::open(state));
- if (_ranges.empty()) {
- return Status::OK();
- }
- _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
- return Status::OK();
-}
-
-Status NewFileTextScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
- SCOPED_TIMER(_read_timer);
- if (!_is_load) {
- RETURN_IF_ERROR(init_block(block));
- }
- const int batch_size = state->batch_size();
- *eof = false;
- int current_rows = _rows;
- while (_rows < batch_size && !_scanner_eof) {
- if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
- RETURN_IF_ERROR(_open_next_reader());
- // If there isn't any more reader, break this
- if (_scanner_eof) {
- continue;
- }
- }
- const uint8_t* ptr = nullptr;
- size_t size = 0;
- RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof));
- if (_skip_lines > 0) {
- _skip_lines--;
- continue;
- }
- if (size == 0) {
- // Read empty row, just continue
- continue;
- }
- {
- COUNTER_UPDATE(_rows_read_counter, 1);
- RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
- }
- if (_cur_line_reader_eof) {
- RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows));
- current_rows = _rows;
- }
- }
- if (_scanner_eof && block->rows() == 0) {
- *eof = true;
- }
- return Status::OK();
-}
-
-Status NewFileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) {
- RETURN_IF_ERROR(_line_split_to_values(line));
- if (!_success) {
- // If not success, which means we met an invalid row, return.
- return Status::OK();
- }
-
- for (int i = 0; i < _split_values.size(); ++i) {
- auto slot_desc = _file_slot_descs[i];
- const Slice& value = _split_values[i];
-
- auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
- IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
- _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false);
- }
- _rows++;
- return Status::OK();
-}
-
-Status NewFileTextScanner::_line_split_to_values(const Slice& line) {
- if (!validate_utf8(line.data, line.size)) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- []() -> std::string { return "Unable to display"; },
- []() -> std::string {
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg, "{}", "Unable to display");
- return fmt::to_string(error_msg);
- },
- &_scanner_eof));
- _success = false;
- return Status::OK();
- }
-
- RETURN_IF_ERROR(_split_line(line));
-
- _success = true;
- return Status::OK();
-}
-
-Status NewFileTextScanner::_open_next_reader() {
- if (_next_range >= _ranges.size()) {
- _scanner_eof = true;
- return Status::OK();
- }
-
- RETURN_IF_ERROR(_open_file_reader());
- RETURN_IF_ERROR(_open_line_reader());
- _next_range++;
-
- return Status::OK();
-}
-
-Status NewFileTextScanner::_open_file_reader() {
- const TFileRangeDesc& range = _ranges[_next_range];
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
- range.start_offset, range.file_size, 0,
- _cur_file_reader));
- return _cur_file_reader->open();
-}
-
-Status NewFileTextScanner::_open_line_reader() {
- if (_cur_line_reader != nullptr) {
- delete _cur_line_reader;
- _cur_line_reader = nullptr;
- }
-
- const TFileRangeDesc& range = _ranges[_next_range];
- int64_t size = range.size;
- if (range.start_offset != 0) {
- if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
- std::stringstream ss;
- ss << "For now we do not support split compressed file";
- return Status::InternalError(ss.str());
- }
- size += 1;
- // not first range will always skip one line
- _skip_lines = 1;
- }
-
- // open line reader
- switch (_params.format_type) {
- case TFileFormatType::FORMAT_CSV_PLAIN:
- _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
- _line_delimiter, _line_delimiter_length);
- break;
- default: {
- std::stringstream ss;
- ss << "Unknown format type, cannot init line reader, type=" << _params.format_type;
- return Status::InternalError(ss.str());
- }
- }
-
- _cur_line_reader_eof = false;
-
- return Status::OK();
-}
-
-Status NewFileTextScanner::_split_line(const Slice& line) {
- _split_values.clear();
- std::vector<Slice> tmp_split_values;
- tmp_split_values.reserve(_num_of_columns_from_file);
-
- const char* value = line.data;
- size_t start = 0; // point to the start pos of next col value.
- size_t curpos = 0; // point to the start pos of separator matching sequence.
- size_t p1 = 0; // point to the current pos of separator matching sequence.
- size_t non_space = 0; // point to the last pos of non_space charactor.
-
- // Separator: AAAA
- //
- // p1
- // ▼
- // AAAA
- // 1000AAAA2000AAAA
- // ▲ ▲
- // Start │
- // curpos
-
- while (curpos < line.size) {
- if (*(value + curpos + p1) != _value_separator[p1]) {
- // Not match, move forward:
- curpos += (p1 == 0 ? 1 : p1);
- p1 = 0;
- } else {
- p1++;
- if (p1 == _value_separator_length) {
- // Match a separator
- non_space = curpos;
- // Trim tailing spaces. Be consistent with hive and trino's behavior.
- if (_state->trim_tailing_spaces_for_external_table_query()) {
- while (non_space > start && *(value + non_space - 1) == ' ') {
- non_space--;
- }
- }
- tmp_split_values.emplace_back(value + start, non_space - start);
- start = curpos + _value_separator_length;
- curpos = start;
- p1 = 0;
- non_space = 0;
- }
- }
- }
-
- CHECK(curpos == line.size) << curpos << " vs " << line.size;
- non_space = curpos;
- if (_state->trim_tailing_spaces_for_external_table_query()) {
- while (non_space > start && *(value + non_space - 1) == ' ') {
- non_space--;
- }
- }
-
- tmp_split_values.emplace_back(value + start, non_space - start);
- for (const auto& slot : _file_slot_descs) {
- auto it = _file_slot_index_map.find(slot->id());
- if (it == std::end(_file_slot_index_map)) {
- std::stringstream ss;
- ss << "Unknown _file_slot_index_map, slot_id=" << slot->id();
- return Status::InternalError(ss.str());
- }
- int index = it->second;
- CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file;
- _split_values.emplace_back(tmp_split_values[index]);
- }
- return Status::OK();
-}
-
-Status NewFileTextScanner::_convert_to_output_block(Block* output_block) {
- if (_input_block_ptr == output_block) {
- return Status::OK();
- }
- if (LIKELY(_input_block_ptr->rows() > 0)) {
- RETURN_IF_ERROR(_materialize_dest_block(output_block));
- }
- return Status::OK();
-}
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h
deleted file mode 100644
index 19cf6094f4..0000000000
--- a/be/src/vec/exec/scan/new_file_text_scanner.h
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <exec/arrow/arrow_reader.h>
-
-#include "exec/line_reader.h"
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/function_filter.h"
-#include "vec/exec/scan/new_file_scanner.h"
-#include "vec/exec/scan/vscanner.h"
-
-namespace doris::vectorized {
-class NewFileTextScanner : public NewFileScanner {
-public:
- NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile,
- const std::vector<TExpr>& pre_filter_texprs);
-
- Status open(RuntimeState* state) override;
-
-protected:
- void _init_profiles(RuntimeProfile* profile) override {}
- Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
- Status _convert_to_output_block(Block* output_block);
-
-private:
- Status _fill_file_columns(const Slice& line, vectorized::Block* _block);
- Status _open_next_reader();
- Status _open_file_reader();
- Status _open_line_reader();
- Status _line_split_to_values(const Slice& line);
- Status _split_line(const Slice& line);
- // Reader
- std::unique_ptr<FileReader> _cur_file_reader;
- LineReader* _cur_line_reader;
- bool _cur_line_reader_eof;
-
- // When we fetch range start from 0, header_type="csv_with_names" skip first line
- // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
- // When we fetch range doesn't start
- int _skip_lines;
- std::vector<Slice> _split_values;
- std::string _value_separator;
- std::string _line_delimiter;
- int _value_separator_length;
- int _line_delimiter_length;
-
- bool _success;
-};
-} // namespace doris::vectorized
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 26447413d1..5d68f6edef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -172,7 +172,7 @@ public class StreamLoadPlanner {
}
// create scan node
- if (Config.enable_new_load_scan_node) {
+ if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) {
ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new PlanNodeId(0), scanTupleDesc);
if (!Util.isCsvFormat(taskInfo.getFormatType())) {
throw new AnalysisException(
diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
index fa4d2e0c07..52451a5207 100644
--- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
@@ -185,22 +185,18 @@ suite("test_broker_load", "p0") {
String[][] backends = sql """ show backends; """
assertTrue(backends.size() > 0)
for (String[] backend in backends) {
- StringBuilder setConfigCommand = new StringBuilder();
- setConfigCommand.append("curl -X POST http://")
- setConfigCommand.append(backend[2])
- setConfigCommand.append(":")
- setConfigCommand.append(backend[5])
- setConfigCommand.append("/api/update_config?")
- String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag"
- logger.info(command1)
- String command2 = setConfigCommand.toString() + "enable_new_file_scanner=$flag"
- logger.info(command2)
- def process1 = command1.execute()
- int code = process1.waitFor()
- assertEquals(code, 0)
- def process2 = command2.execute()
- code = process1.waitFor()
- assertEquals(code, 0)
+ // No need to set this config anymore, but leave this code sample here
+ // StringBuilder setConfigCommand = new StringBuilder();
+ // setConfigCommand.append("curl -X POST http://")
+ // setConfigCommand.append(backend[2])
+ // setConfigCommand.append(":")
+ // setConfigCommand.append(backend[5])
+ // setConfigCommand.append("/api/update_config?")
+ // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag"
+ // logger.info(command1)
+ // def process1 = command1.execute()
+ // int code = process1.waitFor()
+ // assertEquals(code, 0)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org