You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/24 08:51:25 UTC

[GitHub] [doris] SaintBacchus opened a new pull request, #10402: Be file scan node

SaintBacchus opened a new pull request, #10402:
URL: https://github.com/apache/doris/pull/10402

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dujl commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
dujl commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r908121059


##########
be/src/vec/exec/file_arrow_scanner.cpp:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/file_arrow_scanner.h"
+
+#include "exec/arrow/parquet_reader.h"
+#include "io/buffered_reader.h"
+#include "io/hdfs_reader_writer.h"
+#include "runtime/descriptors.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
+                                   const TFileScanRangeParams& params,
+                                   const std::vector<TFileRangeDesc>& ranges,
+                                   const std::vector<TExpr>& pre_filter_texprs,
+                                   ScannerCounter* counter)
+        : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
+          _cur_file_reader(nullptr),
+          _cur_file_eof(false),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0) {}
+
+FileArrowScanner::~FileArrowScanner() {
+    close();
+}
+
+Status FileArrowScanner::_open_next_reader() {
+    // open_file_reader
+    if (_cur_file_reader != nullptr) {
+        delete _cur_file_reader;
+        _cur_file_reader = nullptr;
+    }
+
+    while (true) {
+        if (_next_range >= _ranges.size()) {
+            _scanner_eof = true;
+            return Status::OK();
+        }
+        const TFileRangeDesc& range = _ranges[_next_range++];
+        std::unique_ptr<FileReader> file_reader;
+
+        FileReader* hdfs_reader = nullptr;
+        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+                                                        range.start_offset, &hdfs_reader));
+        file_reader.reset(new BufferedReader(_profile, hdfs_reader));
+        RETURN_IF_ERROR(file_reader->open());
+        if (file_reader->size() == 0) {
+            file_reader->close();
+            continue;
+        }
+
+        int32_t num_of_columns_from_file = _file_slot_descs.size();
+
+        _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
+                                             num_of_columns_from_file);
+
+        Status status = _cur_file_reader->init_reader(_file_slot_descs, _state->timezone());
+
+        if (status.is_end_of_file()) {
+            continue;
+        } else {
+            if (!status.ok()) {
+                std::stringstream ss;
+                ss << " file: " << range.path << " error:" << status.get_error_msg();
+                return Status::InternalError(ss.str());
+            } else {
+                return status;
+            }
+        }
+    }
+}
+
+Status FileArrowScanner::open() {
+    RETURN_IF_ERROR(FileScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status FileArrowScanner::_next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(_open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(_open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status FileArrowScanner::_init_arrow_batch_if_necessary() {
+    // 1. init batch if first time
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof) {
+        return Status::EndOfFile("EOF");
+    }
+    if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        return _next_arrow_batch();
+    }
+    return status;
+}
+
+Status FileArrowScanner::get_next(vectorized::Block* block, bool* eof) {
+    SCOPED_TIMER(_read_timer);
+    // init arrow batch
+    {
+        Status st = _init_arrow_batch_if_necessary();
+        if (!st.ok()) {
+            if (!st.is_end_of_file()) {
+                return st;
+            }
+            *eof = true;
+            return Status::OK();
+        }
+    }
+
+    RETURN_IF_ERROR(init_block(block));
+    // convert arrow batch to block until reach the batch_size
+    while (!_scanner_eof) {
+        // cast arrow type to PT0 and append it to block
+        // for example: arrow::Type::INT16 => TYPE_SMALLINT
+        RETURN_IF_ERROR(_append_batch_to_block(block));
+        // finalize the block if full
+        if (_rows >= _state->batch_size()) {
+            break;
+        }
+        auto status = _next_arrow_batch();
+        // if ok, append the batch to the columns
+        if (status.ok()) {
+            continue;
+        }
+        // return error if not EOF
+        if (!status.is_end_of_file()) {
+            return status;
+        }
+        _cur_file_eof = true;
+        break;
+    }
+
+    return fill_block(block, eof);

Review Comment:
   fill_block do two works.
   1. fill the data block
   2. filter the block with pre filter expr
   How about Separate filte_block from fill_block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dujl commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
dujl commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r908106823


##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/file_scan_node.h"
+
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple.h"
+#include "runtime/tuple_row.h"
+#include "util/runtime_profile.h"
+#include "util/thread.h"
+#include "util/types.h"
+#include "vec/exec/file_arrow_scanner.h"
+#include "vec/exec/file_text_scanner.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::vectorized {
+
+FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+        : ScanNode(pool, tnode, descs),
+          _tuple_id(tnode.file_scan_node.tuple_id),
+          _runtime_state(nullptr),
+          _tuple_desc(nullptr),
+          _num_running_scanners(0),
+          _scan_finished(false),
+          _max_buffered_batches(32),
+          _wait_scanner_timer(nullptr) {}
+
+Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(ScanNode::init(tnode, state));
+    auto& file_scan_node = tnode.file_scan_node;
+
+    if (file_scan_node.__isset.pre_filter_exprs) {
+        _pre_filter_texprs = file_scan_node.pre_filter_exprs;
+    }
+
+    return Status::OK();
+}
+
+Status FileScanNode::prepare(RuntimeState* state) {
+    VLOG_QUERY << "FileScanNode prepare";
+    RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    // get tuple desc
+    _runtime_state = state;
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (_tuple_desc == nullptr) {
+        std::stringstream ss;
+        ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id;
+        return Status::InternalError(ss.str());
+    }
+
+    // Initialize slots map
+    for (auto slot : _tuple_desc->slots()) {
+        auto pair = _slots_map.emplace(slot->col_name(), slot);
+        if (!pair.second) {
+            std::stringstream ss;
+            ss << "Failed to insert slot, col_name=" << slot->col_name();
+            return Status::InternalError(ss.str());
+        }
+    }
+
+    // Profile
+    _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime");
+
+    return Status::OK();
+}
+
+Status FileScanNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    RETURN_IF_CANCELLED(state);
+
+    RETURN_IF_ERROR(start_scanners());
+
+    return Status::OK();
+}
+
+Status FileScanNode::start_scanners() {
+    {
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        _num_running_scanners = 1;
+    }
+    _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, 0, _scan_ranges.size());
+    return Status::OK();
+}
+
+Status FileScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    // check if CANCELLED.
+    if (state->is_cancelled()) {
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        if (update_status(Status::Cancelled("Cancelled"))) {
+            // Notify all scanners
+            _queue_writer_cond.notify_all();
+        }
+    }
+
+    if (_scan_finished.load()) {
+        *eos = true;
+        return Status::OK();
+    }
+
+    const int batch_size = _runtime_state->batch_size();
+    while (true) {
+        std::shared_ptr<vectorized::Block> scanner_block;
+        {
+            std::unique_lock<std::mutex> l(_batch_queue_lock);
+            while (_process_status.ok() && !_runtime_state->is_cancelled() &&
+                   _num_running_scanners > 0 && _block_queue.empty()) {
+                SCOPED_TIMER(_wait_scanner_timer);
+                _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
+            }
+            if (!_process_status.ok()) {
+                // Some scanner process failed.
+                return _process_status;
+            }
+            if (_runtime_state->is_cancelled()) {
+                if (update_status(Status::Cancelled("Cancelled"))) {
+                    _queue_writer_cond.notify_all();
+                }
+                return _process_status;
+            }
+            if (!_block_queue.empty()) {
+                scanner_block = _block_queue.front();
+                _block_queue.pop_front();
+            }
+        }
+
+        // All scanner has been finished, and all cached batch has been read
+        if (!scanner_block) {
+            if (_mutable_block && !_mutable_block->empty()) {
+                *block = _mutable_block->to_block();
+                reached_limit(block, eos);
+                LOG_IF(INFO, *eos) << "FileScanNode ReachedLimit.";
+            }
+            _scan_finished.store(true);
+            *eos = true;
+            return Status::OK();
+        }
+        // notify one scanner
+        _queue_writer_cond.notify_one();
+
+        if (UNLIKELY(!_mutable_block)) {
+            _mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
+        }
+
+        if (_mutable_block->rows() + scanner_block->rows() < batch_size) {
+            // merge scanner_block into _mutable_block
+            _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
+            continue;
+        } else {
+            if (_mutable_block->empty()) {
+                // directly use scanner_block
+                *block = *scanner_block;
+            } else {
+                // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next.
+                *block = _mutable_block->to_block();
+                _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns());
+                _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
+            }
+            break;
+        }
+    }
+
+    reached_limit(block, eos);
+    if (*eos) {
+        _scan_finished.store(true);
+        _queue_writer_cond.notify_all();
+        LOG(INFO) << "FileScanNode ReachedLimit.";
+    } else {
+        *eos = false;
+    }
+
+    return Status::OK();
+}
+
+Status FileScanNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    _scan_finished.store(true);
+    _queue_writer_cond.notify_all();
+    _queue_reader_cond.notify_all();
+    for (int i = 0; i < _scanner_threads.size(); ++i) {
+        _scanner_threads[i].join();
+    }
+
+    // Close
+    _batch_queue.clear();
+    return ExecNode::close(state);
+}
+
+Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter) {
+    //create scanner object and open
+    std::unique_ptr<FileScanner> scanner = create_scanner(scan_range, counter);
+    RETURN_IF_ERROR(scanner->open());
+    bool scanner_eof = false;
+    while (!scanner_eof) {
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
+        }
+
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
+        RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
+        if (block->rows() == 0) {
+            continue;
+        }
+        auto old_rows = block->rows();
+        RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
+                                                   _tuple_desc->slots().size()));
+        counter->num_rows_unselected += old_rows - block->rows();
+        if (block->rows() == 0) {
+            continue;
+        }
+
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() &&
+               // stop pushing more batch if
+               // 1. too many batches in queue, or
+               // 2. at least one batch in queue and memory exceed limit.
+               (_block_queue.size() >= _max_buffered_batches ||
+                (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
+            _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
+        }
+        // Process already set failed, so we just return OK
+        if (!_process_status.ok()) {
+            return Status::OK();
+        }
+        // Scan already finished, just return
+        if (_scan_finished.load()) {
+            return Status::OK();
+        }
+        // Runtime state is canceled, just return cancel
+        if (_runtime_state->is_cancelled()) {
+            return Status::Cancelled("Cancelled");
+        }
+        // Queue size Must be smaller than _max_buffered_batches
+        _block_queue.push_back(block);
+
+        // Notify reader to
+        _queue_reader_cond.notify_one();
+    }
+    return Status::OK();
+}
+
+void FileScanNode::scanner_worker(int start_idx, int length) {
+    Thread::set_self_name("file_scanner");
+    Status status = Status::OK();
+    ScannerCounter counter;
+    for (int i = 0; i < length && status.ok(); ++i) {
+        const TFileScanRange& scan_range =
+                _scan_ranges[start_idx + i].scan_range.ext_scan_range.file_scan_range;
+        status = scanner_scan(scan_range, &counter);
+        if (!status.ok()) {
+            LOG(WARNING) << "Scanner[" << start_idx + i
+                         << "] process failed. status=" << status.get_error_msg();
+        }
+    }
+
+    // Update stats
+    _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered);
+    _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected);
+
+    // scanner is going to finish
+    {
+        std::lock_guard<std::mutex> l(_batch_queue_lock);
+        if (!status.ok()) {
+            update_status(status);
+        }
+        // This scanner will finish
+        _num_running_scanners--;
+    }
+    _queue_reader_cond.notify_all();
+    // If one scanner failed, others don't need scan any more
+    if (!status.ok()) {
+        _queue_writer_cond.notify_all();
+    }
+}
+
+std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
+                                                          ScannerCounter* counter) {
+    FileScanner* scan = nullptr;
+    switch (scan_range.ranges[0].format_type) {
+    case TFileFormatType::FORMAT_PARQUET:
+        scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
+                                       scan_range.ranges, _pre_filter_texprs, counter);
+        break;
+    case TFileFormatType::FORMAT_ORC:
+        scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params,
+                                   scan_range.ranges, _pre_filter_texprs, counter);
+        break;
+
+    default:
+        scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params,

Review Comment:
   is json format handled by FileTextScanner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dujl commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
dujl commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r907164294


##########
be/src/vec/exec/file_arrow_scanner.cpp:
##########
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/file_arrow_scanner.h"
+
+#include "exec/arrow/parquet_reader.h"
+#include "io/buffered_reader.h"
+#include "io/hdfs_reader_writer.h"
+#include "runtime/descriptors.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
+                                   const TFileScanRangeParams& params,
+                                   const std::vector<TFileRangeDesc>& ranges,
+                                   const std::vector<TExpr>& pre_filter_texprs,
+                                   ScannerCounter* counter)
+        : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
+          _cur_file_reader(nullptr),
+          _cur_file_eof(false),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0) {}
+
+FileArrowScanner::~FileArrowScanner() {
+    close();
+}
+
+Status FileArrowScanner::_open_next_reader() {
+    // open_file_reader
+    if (_cur_file_reader != nullptr) {
+        delete _cur_file_reader;
+        _cur_file_reader = nullptr;
+    }
+
+    while (true) {
+        if (_next_range >= _ranges.size()) {
+            _scanner_eof = true;
+            return Status::OK();
+        }
+        const TFileRangeDesc& range = _ranges[_next_range++];
+        std::unique_ptr<FileReader> file_reader;
+
+        FileReader* hdfs_reader = nullptr;
+        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+                                                        range.start_offset, &hdfs_reader));
+        file_reader.reset(new BufferedReader(_profile, hdfs_reader));
+        RETURN_IF_ERROR(file_reader->open());
+        if (file_reader->size() == 0) {
+            file_reader->close();
+            continue;
+        }
+
+        int32_t num_of_columns_from_file = _file_slot_descs.size();
+
+        _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
+                                             num_of_columns_from_file);
+
+        Status status = _cur_file_reader->init_reader(_file_slot_descs, _state->timezone());
+
+        if (status.is_end_of_file()) {
+            continue;
+        } else {
+            if (!status.ok()) {
+                std::stringstream ss;
+                ss << " file: " << range.path << " error:" << status.get_error_msg();
+                return Status::InternalError(ss.str());
+            } else {
+                return status;
+            }
+        }
+    }
+}
+
+Status FileArrowScanner::open() {
+    RETURN_IF_ERROR(FileScanner::open());
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    return Status::OK();
+}
+
+// get next available arrow batch
+Status FileArrowScanner::_next_arrow_batch() {
+    _arrow_batch_cur_idx = 0;
+    // first, init file reader
+    if (_cur_file_reader == nullptr || _cur_file_eof) {
+        RETURN_IF_ERROR(_open_next_reader());
+        _cur_file_eof = false;
+    }
+    // second, loop until find available arrow batch or EOF
+    while (!_scanner_eof) {
+        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
+        if (_cur_file_eof) {
+            RETURN_IF_ERROR(_open_next_reader());
+            _cur_file_eof = false;
+            continue;
+        }
+        if (_batch->num_rows() == 0) {
+            continue;
+        }
+        return Status::OK();
+    }
+    return Status::EndOfFile("EOF");
+}
+
+Status FileArrowScanner::_init_arrow_batch_if_necessary() {
+    // 1. init batch if first time
+    // 2. reset reader if end of file
+    Status status;
+    if (_scanner_eof) {
+        return Status::EndOfFile("EOF");
+    }
+    if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
+        return _next_arrow_batch();
+    }
+    return status;
+}
+
+Status FileArrowScanner::get_next(vectorized::Block* block, bool* eof) {
+    std::cout << "HZW" << std::endl;

Review Comment:
   remove the unused cout



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dujl commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
dujl commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r908136372


##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -213,8 +215,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: optional Types.TSlotId slot_id;
+    2: optional bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot and partition slot.
+  1: optional Types.TTupleId src_tuple_id;
+  // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot
+  2: optional i32 num_of_columns_from_file;
+  // all selected slots which may compose from file and partiton value.
+  3: optional list<TFileScanSlotInfo> required_slots;
 
+  4: optional TFileTextScanRangeParams text_params;
+}
+
+struct TFileRangeDesc {
+    1: optional Types.TFileType file_type;
+    2: optional TFileFormatType format_type;
+    // Path of this range
+    3: optional string path;
+    // Offset of this file start
+    4: optional i64 start_offset;
+    // Size of this range, if size = -1, this means that will read to then end of file

Review Comment:
   then->the : read to the end of file



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -213,8 +215,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}

Review Comment:
   does we support more options such as quote, escape, include header or not and so on?
   suggest TFileTextScanRangeParams use map<k, v> struct, as that we could support more options without change the TFileTextScanRangeParams



##########
be/src/vec/exec/file_text_scanner.cpp:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/file_text_scanner.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/internal_service.pb.h>
+
+#include <iostream>
+
+#include "exec/exec_node.h"
+#include "exec/plain_text_line_reader.h"
+#include "exec/text_converter.h"
+#include "exec/text_converter.hpp"
+#include "exprs/expr_context.h"
+#include "io/buffered_reader.h"
+#include "io/hdfs_reader_writer.h"
+#include "util/types.h"
+#include "util/utf8_check.h"
+
+namespace doris::vectorized {
+
+FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile,
+                                 const TFileScanRangeParams& params,
+                                 const std::vector<TFileRangeDesc>& ranges,
+                                 const std::vector<TExpr>& pre_filter_texprs,
+                                 ScannerCounter* counter)
+        : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
+          _cur_file_reader(nullptr),
+          _cur_line_reader(nullptr),
+          _cur_line_reader_eof(false),
+          _skip_lines(0),
+          _success(false)
+
+{
+    if (params.__isset.text_params) {
+        auto text_params = params.text_params;
+        if (text_params.__isset.column_separator_length &&
+            text_params.column_separator_length > 1) {
+            _value_separator = text_params.column_separator_str;
+            _value_separator_length = text_params.column_separator_length;
+        } else {
+            _value_separator.push_back(static_cast<char>(text_params.column_separator));
+            _value_separator_length = 1;
+        }
+        if (text_params.__isset.line_delimiter_length && text_params.line_delimiter_length > 1) {
+            _line_delimiter = text_params.line_delimiter_str;
+            _line_delimiter_length = text_params.line_delimiter_length;
+        } else {
+            _line_delimiter.push_back(static_cast<char>(text_params.line_delimiter));
+            _line_delimiter_length = 1;
+        }
+    }
+}
+
+FileTextScanner::~FileTextScanner() {
+    close();
+}
+
+Status FileTextScanner::open() {
+    RETURN_IF_ERROR(FileScanner::open());
+
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
+    return Status::OK();
+}
+
+void FileTextScanner::close() {
+    FileScanner::close();
+
+    if (_cur_line_reader != nullptr) {
+        delete _cur_line_reader;
+        _cur_line_reader = nullptr;
+    }
+}
+
+Status FileTextScanner::get_next(Block* block, bool* eof) {
+    SCOPED_TIMER(_read_timer);
+    RETURN_IF_ERROR(init_block(block));
+
+    const int batch_size = _state->batch_size();
+
+    while (_rows < batch_size && !_scanner_eof) {
+        if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
+            RETURN_IF_ERROR(_open_next_reader());
+            // If there isn't any more reader, break this
+            if (_scanner_eof) {
+                continue;
+            }
+        }
+        const uint8_t* ptr = nullptr;
+        size_t size = 0;
+        RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof));
+        std::unique_ptr<const uint8_t> u_ptr;
+        u_ptr.reset(ptr);
+        if (_skip_lines > 0) {
+            _skip_lines--;
+            continue;
+        }
+        if (size == 0) {
+            // Read empty row, just continue
+            continue;
+        }
+        {
+            COUNTER_UPDATE(_rows_read_counter, 1);
+            RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
+        }
+    }
+
+    return fill_block(block, eof);
+}
+
+Status FileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) {
+    RETURN_IF_ERROR(_line_split_to_values(line));
+    if (!_success) {
+        // If not success, which means we met an invalid row, return.
+        return Status::OK();
+    }
+
+    for (int i = 0; i < _split_values.size(); ++i) {
+        auto slot_desc = _file_slot_descs[i];
+        const Slice& value = _split_values[i];
+
+        auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
+        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
+        if (slot_desc->is_nullable()) {
+            auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
+            nullable_column->get_null_map_data().push_back(0);
+            col_ptr = &nullable_column->get_nested_column();
+        }
+
+        if (value.size == 2 && value.data[0] == '\\' && value[1] == 'N') {
+            col_ptr->insert_default();
+            continue;
+        }
+        _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false);
+    }
+    _rows++;
+    return Status::OK();
+}
+
+Status FileTextScanner::_open_next_reader() {
+    if (_next_range >= _ranges.size()) {
+        _scanner_eof = true;
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(_open_file_reader());
+    RETURN_IF_ERROR(_open_line_reader());
+    _next_range++;
+
+    return Status::OK();
+}
+
+Status FileTextScanner::_open_file_reader() {
+    const TFileRangeDesc& range = _ranges[_next_range];
+
+    FileReader* hdfs_reader = nullptr;
+    RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,

Review Comment:
   do we support other file system such as s3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10402:
URL: https://github.com/apache/doris/pull/10402#issuecomment-1169459746

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman merged pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
morningman merged PR #10402:
URL: https://github.com/apache/doris/pull/10402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dujl commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
dujl commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r908080209


##########
be/src/vec/exec/file_arrow_scanner.cpp:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/file_arrow_scanner.h"
+
+#include "exec/arrow/parquet_reader.h"
+#include "io/buffered_reader.h"
+#include "io/hdfs_reader_writer.h"
+#include "runtime/descriptors.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/utils/arrow_column_to_doris_column.h"
+
+namespace doris::vectorized {
+
+FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
+                                   const TFileScanRangeParams& params,
+                                   const std::vector<TFileRangeDesc>& ranges,
+                                   const std::vector<TExpr>& pre_filter_texprs,
+                                   ScannerCounter* counter)
+        : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
+          _cur_file_reader(nullptr),
+          _cur_file_eof(false),
+          _batch(nullptr),
+          _arrow_batch_cur_idx(0) {}
+
+FileArrowScanner::~FileArrowScanner() {
+    close();
+}
+
+Status FileArrowScanner::_open_next_reader() {
+    // open_file_reader
+    if (_cur_file_reader != nullptr) {
+        delete _cur_file_reader;
+        _cur_file_reader = nullptr;
+    }
+
+    while (true) {
+        if (_next_range >= _ranges.size()) {
+            _scanner_eof = true;
+            return Status::OK();
+        }
+        const TFileRangeDesc& range = _ranges[_next_range++];
+        std::unique_ptr<FileReader> file_reader;
+
+        FileReader* hdfs_reader = nullptr;

Review Comment:
   reader could be hdfs_reader or s3_reader or other object store system, so use FileFactory::create_file_reader to create a file reader?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dujl commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
dujl commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r908124893


##########
be/src/vec/exec/file_text_scanner.cpp:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/file_text_scanner.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/internal_service.pb.h>
+
+#include <iostream>
+
+#include "exec/exec_node.h"
+#include "exec/plain_text_line_reader.h"
+#include "exec/text_converter.h"
+#include "exec/text_converter.hpp"
+#include "exprs/expr_context.h"
+#include "io/buffered_reader.h"
+#include "io/hdfs_reader_writer.h"
+#include "util/types.h"
+#include "util/utf8_check.h"
+
+namespace doris::vectorized {
+
+FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile,
+                                 const TFileScanRangeParams& params,
+                                 const std::vector<TFileRangeDesc>& ranges,
+                                 const std::vector<TExpr>& pre_filter_texprs,
+                                 ScannerCounter* counter)
+        : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
+          _cur_file_reader(nullptr),
+          _cur_line_reader(nullptr),
+          _cur_line_reader_eof(false),
+          _skip_lines(0),
+          _success(false)
+
+{
+    if (params.__isset.text_params) {
+        auto text_params = params.text_params;
+        if (text_params.__isset.column_separator_length &&
+            text_params.column_separator_length > 1) {
+            _value_separator = text_params.column_separator_str;
+            _value_separator_length = text_params.column_separator_length;
+        } else {
+            _value_separator.push_back(static_cast<char>(text_params.column_separator));
+            _value_separator_length = 1;
+        }
+        if (text_params.__isset.line_delimiter_length && text_params.line_delimiter_length > 1) {
+            _line_delimiter = text_params.line_delimiter_str;
+            _line_delimiter_length = text_params.line_delimiter_length;
+        } else {
+            _line_delimiter.push_back(static_cast<char>(text_params.line_delimiter));
+            _line_delimiter_length = 1;
+        }
+    }
+}
+
+FileTextScanner::~FileTextScanner() {
+    close();
+}
+
+Status FileTextScanner::open() {
+    RETURN_IF_ERROR(FileScanner::open());
+
+    if (_ranges.empty()) {
+        return Status::OK();
+    }
+    _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
+    return Status::OK();
+}
+
+void FileTextScanner::close() {
+    FileScanner::close();
+
+    if (_cur_line_reader != nullptr) {
+        delete _cur_line_reader;
+        _cur_line_reader = nullptr;
+    }
+}
+
+Status FileTextScanner::get_next(Block* block, bool* eof) {
+    SCOPED_TIMER(_read_timer);
+    RETURN_IF_ERROR(init_block(block));
+
+    const int batch_size = _state->batch_size();
+
+    while (_rows < batch_size && !_scanner_eof) {
+        if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
+            RETURN_IF_ERROR(_open_next_reader());
+            // If there isn't any more reader, break this
+            if (_scanner_eof) {
+                continue;
+            }
+        }
+        const uint8_t* ptr = nullptr;
+        size_t size = 0;
+        RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof));
+        std::unique_ptr<const uint8_t> u_ptr;
+        u_ptr.reset(ptr);
+        if (_skip_lines > 0) {
+            _skip_lines--;
+            continue;
+        }
+        if (size == 0) {
+            // Read empty row, just continue
+            continue;
+        }
+        {
+            COUNTER_UPDATE(_rows_read_counter, 1);
+            RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
+        }
+    }
+
+    return fill_block(block, eof);
+}
+
+Status FileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) {
+    RETURN_IF_ERROR(_line_split_to_values(line));
+    if (!_success) {
+        // If not success, which means we met an invalid row, return.
+        return Status::OK();
+    }
+
+    for (int i = 0; i < _split_values.size(); ++i) {
+        auto slot_desc = _file_slot_descs[i];
+        const Slice& value = _split_values[i];
+
+        auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
+        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
+        if (slot_desc->is_nullable()) {
+            auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
+            nullable_column->get_null_map_data().push_back(0);
+            col_ptr = &nullable_column->get_nested_column();
+        }
+
+        if (value.size == 2 && value.data[0] == '\\' && value[1] == 'N') {
+            col_ptr->insert_default();
+            continue;
+        }
+        _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false);
+    }
+    _rows++;
+    return Status::OK();
+}
+
+Status FileTextScanner::_open_next_reader() {
+    if (_next_range >= _ranges.size()) {
+        _scanner_eof = true;
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(_open_file_reader());
+    RETURN_IF_ERROR(_open_line_reader());
+    _next_range++;
+
+    return Status::OK();
+}
+
+Status FileTextScanner::_open_file_reader() {
+    const TFileRangeDesc& range = _ranges[_next_range];
+
+    FileReader* hdfs_reader = nullptr;
+    RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+                                                    range.start_offset, &hdfs_reader));
+    _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader));
+    return _cur_file_reader->open();
+}
+
+Status FileTextScanner::_open_line_reader() {
+    if (_cur_line_reader != nullptr) {
+        delete _cur_line_reader;
+        _cur_line_reader = nullptr;
+    }
+
+    const TFileRangeDesc& range = _ranges[_next_range];
+    int64_t size = range.size;
+    if (range.start_offset != 0) {
+        if (range.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
+            std::stringstream ss;
+            ss << "For now we do not support split compressed file";
+            return Status::InternalError(ss.str());
+        }
+        size += 1;
+        // not first range will always skip one line
+        _skip_lines = 1;
+    }
+
+    // open line reader
+    switch (range.format_type) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,

Review Comment:
   do we support compressed csv file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r906693567


##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -158,9 +156,10 @@ public ExternalFileScanNode(
             PlanNodeId id,
             TupleDescriptor desc,
             String planNodeName) throws MetaNotFoundException {
-        super(id, desc, planNodeName, StatisticalType.BROKER_SCAN_NODE);
 
-        this.hmsTable = (HMSExternalTable) desc.getTable();
+        super(id, desc, planNodeName, StatisticalType.FILE_SCAN_NODE);
+
+        this.hmsTable = (HMSExternalTable) this.desc.getTable();
 
         DLAType type = getDLAType();

Review Comment:
   Looks like the type can be save in hmsTable, no need to check it every time.



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,55 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: required i8 column_separator;
+    4: required i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot and partition slot.
+  1: required Types.TTupleId src_tuple_id;
+  // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot
+  2: required i32 num_of_columns_from_file;
+  // all selected slots which may compose from file and partiton value.
+  3: required list<TFileScanSlotInfo> required_slots;
+
+  4: optional TFileTextScanRangeParams text_params;
+}
+
+struct TFileRangeDesc {
+    1: required Types.TFileType file_type;
+    2: required TFileFormatType format_type;
+    // Path of this range
+    3: required string path;
+    // Offset of this file start
+    4: required i64 start_offset;
+    // Size of this range, if size = -1, this means that will read to then end of file
+    5: required i64 size;
+    // total size of the file
+    6: optional i64 file_size;
+
+    // columns parsed from file path should be after the columns read from file
+    7: optional list<string> columns_from_path;
 
+    8: optional THdfsParams hdfs_params;
+}
+
+// HDFS file scan range
+struct TFileScanRange {
+    1: required list<TFileRangeDesc> ranges
+    2: required TFileScanRangeParams params
+    3: required list<Types.TNetworkAddress> broker_addresses

Review Comment:
   Do we need to support reading via broker?



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -265,6 +314,15 @@ struct TBrokerScanNode {
     4: optional list<Exprs.TExpr> pre_filter_exprs
 }
 
+struct TFileScanNode {
+    1: required Types.TTupleId tuple_id
+
+    // Partition info used to process partition select in broker load
+    2: optional list<Exprs.TExpr> partition_exprs
+    3: optional list<Partitions.TRangePartition> partition_infos
+    4: optional list<Exprs.TExpr> pre_filter_exprs

Review Comment:
   Do we still need fields 2,3,4?



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,55 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: required i8 column_separator;

Review Comment:
   use `optional` for all fields.
   Same suggestion for all other structs.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -198,41 +197,58 @@ public void init(Analyzer analyzer) throws UserException {
 
     private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException {

Review Comment:
   No need to pass the param `context`, because `context` is already a field of this class.



##########
be/src/vec/exec/file_arrow_scanner.h:
##########
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/array.h>
+#include <exec/arrow/arrow_reader.h>
+#include <exec/arrow/orc_reader.h>
+
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/base_scanner.h"
+#include "vec/exec/file_scanner.h"
+#include "util/runtime_profile.h"
+
+namespace doris::vectorized {
+
+// VArrow scanner convert the data read from orc|parquet to doris's columns.
+class FileArrowScanner : public FileScanner {

Review Comment:
   What is different between `FileArrowScanner` and `VArrowScanner` ?
   Looks like the `VArrowScanner` first read data into src block, and then convert to dest block?
   And `FileArrowScanner` read data directly into dest block?



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -198,41 +197,58 @@ public void init(Analyzer analyzer) throws UserException {
 
     private void initContext(ParamCreateContext context) throws DdlException, MetaNotFoundException {
         context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor();
-        context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-        context.params = new TBrokerScanRangeParams();
+        context.params = new TFileScanRangeParams();
         if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) {
             Map<String, String> serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
             String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
                     ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim");
             String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim"))
                     ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim");
-            context.params.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]);
-            context.params.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]);
-            context.params.setColumnSeparatorStr(columnSeparator);
-            context.params.setLineDelimiterStr(lineDelimiter);
-            context.params.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length);
-            context.params.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length);
+            TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+            textParams.setColumnSeparator(columnSeparator.getBytes(StandardCharsets.UTF_8)[0]);
+            textParams.setLineDelimiter(lineDelimiter.getBytes(StandardCharsets.UTF_8)[0]);
+            textParams.setColumnSeparatorStr(columnSeparator);
+            textParams.setLineDelimiterStr(lineDelimiter);
+            textParams.setColumnSeparatorLength(columnSeparator.getBytes(StandardCharsets.UTF_8).length);
+            textParams.setLineDelimiterLength(lineDelimiter.getBytes(StandardCharsets.UTF_8).length);
+            context.params.setTextParams(textParams);
         }
 
-        Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
+        context.params.setSrcTupleId(context.srcTupleDescriptor.getId().asInt());
+        // Need re compute memory layout after set some slot descriptor to nullable
+        context.srcTupleDescriptor.computeStatAndMemLayout();
+
+        Map<String, SlotDescriptor> slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
         List<Column> columns = hmsTable.getBaseSchema(false);
         for (Column column : columns) {
             SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor);
-            slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+            slotDesc.setType(column.getType());
             slotDesc.setIsMaterialized(true);
             slotDesc.setIsNullable(true);
-            slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
-            context.params.addToSrcSlotIds(slotDesc.getId().asInt());
+            slotDesc.setColumn(new Column(column));
             slotDescByName.put(column.getName(), slotDesc);
         }
-        context.slotDescByName = slotDescByName;
+
+        for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) {

Review Comment:
   Are you sure that all partition key columns are in file path, not in file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r909133881


##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -213,8 +215,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}

Review Comment:
   `map` is not an efficient way for serde.
   And if we add more options in future, we still need to modify BE or FE code.
   So I think it is ok for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r906899654


##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;

Review Comment:
   use optional



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot and partition slot.
+  1: required Types.TTupleId src_tuple_id;

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java:
##########
@@ -1686,49 +1688,55 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
             throws UserException {
         ScanNode scanNode = null;
 
-        switch (tblRef.getTable().getType()) {
-            case OLAP:
-                OlapScanNode olapNode = new OlapScanNode(ctx.getNextNodeId(), tblRef.getDesc(),
-                        "OlapScanNode");
-                olapNode.setForceOpenPreAgg(tblRef.isForcePreAggOpened());
-                scanNode = olapNode;
-                break;
-            case ODBC:
-                scanNode = new OdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (OdbcTable) tblRef.getTable());
-                break;
-            case MYSQL:
-                scanNode = new MysqlScanNode(ctx.getNextNodeId(), tblRef.getDesc(), (MysqlTable) tblRef.getTable());
-                break;
-            case SCHEMA:
-                scanNode = new SchemaScanNode(ctx.getNextNodeId(), tblRef.getDesc());
-                break;
-            case BROKER:
-                scanNode = new BrokerScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "BrokerScanNode",
-                        null, -1);
-                break;
-            case ELASTICSEARCH:
-                scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "EsScanNode");
-                break;
-            case HIVE:
-                scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HiveScanNode",
-                        null, -1);
-                break;
-            case ICEBERG:
-                scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
-                        null, -1);
-                break;
-            case HUDI:
-                scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode",
-                        null, -1);
-                break;
-            default:
-                break;
-        }
-        if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode || scanNode instanceof HiveScanNode) {
-            if (analyzer.enableInferPredicate()) {
-                PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), analyzer);
+        if (Config.enable_multi_catalog) {

Review Comment:
   Add `HMS_EXTERNAL_TABLE` in `TableType`



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -253,45 +264,40 @@ private void buildScanRange() throws UserException, IOException {
         String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
         hdfsParams.setFsName(fsName);
-        List<String> partitionKeys = new ArrayList<>();
-        for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) {
-            partitionKeys.add(fieldSchema.getName());
-        }
+
 
         for (InputSplit split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
 
             TScanRangeLocations curLocations = newLocations(context.params);
             List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
                     partitionKeys);
-            int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
 
-            TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath,
-                    numberOfColumnsFromFile);
+            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);
             rangeDesc.setHdfsParams(hdfsParams);
-            rangeDesc.setReadByColumnDef(true);
 
-            curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
+            curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
             Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
                     + " with table split: " +  fileSplit.getPath()
                     + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")");
 
             // Put the last file
-            if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
+            if (curLocations.getScanRange().getExtScanRange().getFileScanRange().isSetRanges()) {

Review Comment:
   This `if` always return `true`?



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot and partition slot.
+  1: required Types.TTupleId src_tuple_id;
+  // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot
+  2: required i32 num_of_columns_from_file;
+  // all selected slots which may compose from file and partiton value.
+  3: required list<TFileScanSlotInfo> required_slots;
+
+  4: optional TFileTextScanRangeParams text_params;
+}
+
+struct TFileRangeDesc {
+    1: required Types.TFileType file_type;

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java:
##########
@@ -117,6 +117,15 @@ public List<String> getDbNames() {
         return dbNames;
     }
 
+    public DataSourceIf getExternalDatasource(String name) {
+        readLock();

Review Comment:
   nameToCatalogs is a concurrentMap, no need to lock



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot and partition slot.
+  1: required Types.TTupleId src_tuple_id;
+  // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot
+  2: required i32 num_of_columns_from_file;
+  // all selected slots which may compose from file and partiton value.
+  3: required list<TFileScanSlotInfo> required_slots;
+
+  4: optional TFileTextScanRangeParams text_params;
+}
+
+struct TFileRangeDesc {
+    1: required Types.TFileType file_type;
+    2: required TFileFormatType format_type;
+    // Path of this range
+    3: required string path;
+    // Offset of this file start
+    4: required i64 start_offset;
+    // Size of this range, if size = -1, this means that will read to then end of file
+    5: required i64 size;
+    // total size of the file
+    6: optional i64 file_size;
+
+    // columns parsed from file path should be after the columns read from file
+    7: optional list<string> columns_from_path;
 
+    8: optional THdfsParams hdfs_params;
+}
+
+// HDFS file scan range
+struct TFileScanRange {
+    1: required list<TFileRangeDesc> ranges

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -253,45 +264,40 @@ private void buildScanRange() throws UserException, IOException {
         String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
         hdfsParams.setFsName(fsName);
-        List<String> partitionKeys = new ArrayList<>();
-        for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) {
-            partitionKeys.add(fieldSchema.getName());
-        }
+
 
         for (InputSplit split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
 
             TScanRangeLocations curLocations = newLocations(context.params);
             List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
                     partitionKeys);
-            int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size();
 
-            TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, partitionValuesFromPath,
-                    numberOfColumnsFromFile);
+            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);
             rangeDesc.setHdfsParams(hdfsParams);

Review Comment:
   Already call `setHdfsParams` in `createBrokerRangeDesc()`



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -265,6 +313,11 @@ struct TBrokerScanNode {
     4: optional list<Exprs.TExpr> pre_filter_exprs
 }
 
+struct TFileScanNode {
+    1: required Types.TTupleId tuple_id

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java:
##########
@@ -57,34 +57,22 @@ public class HMSExternalDataSource extends ExternalDataSource {
      * Default constructor for HMSExternalDataSource.
      */
     public HMSExternalDataSource(String name, Map<String, String> props) {
-        setName(name);
-        getDsProperty().setProperties(props);
-        setType("hms");
-    }
-
-    /**
-     * Hive metastore data source implementation.
-     *
-     * @param hiveMetastoreUris e.g. thrift://127.0.0.1:9083
-     */
-    public HMSExternalDataSource(long id, String name, String type, DataSourceProperty dsProperty,
-            String hiveMetastoreUris) throws DdlException {
-        this.id = id;
+        this.id = nextId.incrementAndGet();

Review Comment:
   Why generate this id each time?
   You'd better pass this id from outside.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -253,45 +264,40 @@ private void buildScanRange() throws UserException, IOException {
         String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
         hdfsParams.setFsName(fsName);
-        List<String> partitionKeys = new ArrayList<>();
-        for (FieldSchema fieldSchema : hmsTable.getRemoteTable().getPartitionKeys()) {
-            partitionKeys.add(fieldSchema.getName());
-        }
+
 
         for (InputSplit split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
 
             TScanRangeLocations curLocations = newLocations(context.params);

Review Comment:
   Here you create `TScanRangeLocations` for every file?
   That would be too many `TScanRangeLocations`.



##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1652,7 +1652,7 @@ public class Config extends ConfigBase {
      * Should be removed when this feature is ready.
      */
     @ConfField(mutable = false, masterOnly = true)
-    public static boolean enable_multi_catalog = false; // 1 min
+    public static boolean enable_multi_catalog = true; // 1 min

Review Comment:
   set back to false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10402:
URL: https://github.com/apache/doris/pull/10402#issuecomment-1169459776

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dujl commented on a diff in pull request #10402: [feature-wip](multi-catalog) Impl FileScanNode in be

Posted by GitBox <gi...@apache.org>.
dujl commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r908099929


##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/file_scan_node.h"
+
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple.h"
+#include "runtime/tuple_row.h"
+#include "util/runtime_profile.h"
+#include "util/thread.h"
+#include "util/types.h"
+#include "vec/exec/file_arrow_scanner.h"
+#include "vec/exec/file_text_scanner.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::vectorized {
+
+FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+        : ScanNode(pool, tnode, descs),
+          _tuple_id(tnode.file_scan_node.tuple_id),
+          _runtime_state(nullptr),
+          _tuple_desc(nullptr),
+          _num_running_scanners(0),
+          _scan_finished(false),
+          _max_buffered_batches(32),
+          _wait_scanner_timer(nullptr) {}
+
+Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(ScanNode::init(tnode, state));
+    auto& file_scan_node = tnode.file_scan_node;
+
+    if (file_scan_node.__isset.pre_filter_exprs) {
+        _pre_filter_texprs = file_scan_node.pre_filter_exprs;
+    }
+
+    return Status::OK();
+}
+
+Status FileScanNode::prepare(RuntimeState* state) {
+    VLOG_QUERY << "FileScanNode prepare";
+    RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    // get tuple desc
+    _runtime_state = state;
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (_tuple_desc == nullptr) {
+        std::stringstream ss;
+        ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id;
+        return Status::InternalError(ss.str());
+    }
+
+    // Initialize slots map
+    for (auto slot : _tuple_desc->slots()) {
+        auto pair = _slots_map.emplace(slot->col_name(), slot);
+        if (!pair.second) {
+            std::stringstream ss;
+            ss << "Failed to insert slot, col_name=" << slot->col_name();
+            return Status::InternalError(ss.str());
+        }
+    }
+
+    // Profile
+    _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime");
+
+    return Status::OK();
+}
+
+Status FileScanNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    RETURN_IF_CANCELLED(state);
+
+    RETURN_IF_ERROR(start_scanners());
+
+    return Status::OK();
+}
+
+Status FileScanNode::start_scanners() {
+    {
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        _num_running_scanners = 1;
+    }
+    _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, 0, _scan_ranges.size());

Review Comment:
   always start only one thread to scan the ranges?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org