You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/18 12:50:39 UTC

[doris] branch master updated: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 60dd322aba [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942)
60dd322aba is described below

commit 60dd322abade619e0adaaf487dd01295ac64198b
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Mon Jul 18 20:50:34 2022 +0800

    [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942)
    
    FileScanNode in be will launch as many threads as the number of splits.
    The thrift interface of FileScanNode is excessive redundant.
---
 be/src/vec/exec/file_arrow_scanner.cpp             |  2 +-
 be/src/vec/exec/file_scan_node.cpp                 | 65 +++++++++++++++++++---
 be/src/vec/exec/file_scan_node.h                   |  1 -
 be/src/vec/exec/file_text_scanner.cpp              |  8 +--
 .../planner/external/ExternalFileScanNode.java     | 16 +++---
 gensrc/thrift/PlanNodes.thrift                     | 23 ++++----
 6 files changed, 82 insertions(+), 33 deletions(-)

diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 59a94399db..79c6037c36 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -55,7 +55,7 @@ Status FileArrowScanner::_open_next_reader() {
         const TFileRangeDesc& range = _ranges[_next_range++];
         std::unique_ptr<FileReader> file_reader;
         FileReader* hdfs_reader = nullptr;
-        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
                                                         range.start_offset, &hdfs_reader));
         file_reader.reset(new BufferedReader(_profile, hdfs_reader));
         RETURN_IF_ERROR(file_reader->open());
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
index ff4989a033..ad79cc0536 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -17,12 +17,14 @@
 
 #include "vec/exec/file_scan_node.h"
 
+#include "common/config.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple.h"
 #include "runtime/tuple_row.h"
+#include "util/priority_thread_pool.hpp"
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/types.h"
@@ -100,9 +102,37 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
-    for (int i = 0; i < _scan_ranges.size(); i++) {
-        _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
-                                      std::ref(_scanners_status[i]));
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
+    for (int i = 0; i < _scan_ranges.size(); ++i) {
+        Status submit_status = Status::OK();
+        if (thread_token != nullptr) {
+            submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
+                                                                i, _scan_ranges.size(),
+                                                                std::ref(_scanners_status[i])));
+        } else {
+            PriorityThreadPool::WorkFunction task =
+                    std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+                              std::ref(_scanners_status[i]));
+            if (!thread_pool->offer(task)) {
+                submit_status = Status::Cancelled("Failed to submit scan task");
+            }
+        }
+        if (!submit_status.ok()) {
+            LOG(WARNING) << "Failed to assign file scanner task to thread pool! "
+                         << submit_status.get_error_msg();
+            _scanners_status[i].set_value(submit_status);
+            for (int j = i + 1; j < _scan_ranges.size(); ++j) {
+                _scanners_status[j].set_value(Status::Cancelled("Cancelled"));
+            }
+            {
+                std::lock_guard<std::mutex> l(_batch_queue_lock);
+                update_status(submit_status);
+                _num_running_scanners -= _scan_ranges.size() - i;
+            }
+            _queue_writer_cond.notify_all();
+            break;
+        }
     }
     return Status::OK();
 }
@@ -205,8 +235,9 @@ Status FileScanNode::close(RuntimeState* state) {
     _scan_finished.store(true);
     _queue_writer_cond.notify_all();
     _queue_reader_cond.notify_all();
-    for (int i = 0; i < _scanner_threads.size(); ++i) {
-        _scanner_threads[i].join();
+    {
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });
     }
     for (int i = 0; i < _scanners_status.size(); i++) {
         std::future<Status> f = _scanners_status[i].get_future();
@@ -308,7 +339,7 @@ void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status
 std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
                                                           ScannerCounter* counter) {
     FileScanner* scan = nullptr;
-    switch (scan_range.ranges[0].format_type) {
+    switch (scan_range.params.format_type) {
     case TFileFormatType::FORMAT_PARQUET:
         scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
                                        scan_range.ranges, _pre_filter_texprs, counter);
@@ -329,7 +360,27 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange&
 
 // This function is called after plan node has been prepared.
 Status FileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
-    _scan_ranges = scan_ranges;
+    int max_scanners = config::doris_scanner_thread_pool_thread_num;
+    if (scan_ranges.size() <= max_scanners) {
+        _scan_ranges = scan_ranges;
+    } else {
+        // There is no need for the number of scanners to exceed the number of threads in thread pool.
+        _scan_ranges.clear();
+        auto range_iter = scan_ranges.begin();
+        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
+            _scan_ranges.push_back(*range_iter);
+        }
+        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
+            if (i == max_scanners) {
+                i = 0;
+            }
+            auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
+            auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
+            ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
+        }
+        _scan_ranges.shrink_to_fit();
+        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h
index 2d7deb9626..93a8916c0e 100644
--- a/be/src/vec/exec/file_scan_node.h
+++ b/be/src/vec/exec/file_scan_node.h
@@ -105,7 +105,6 @@ private:
 
     Status _process_status;
 
-    std::vector<std::thread> _scanner_threads;
     std::vector<std::promise<Status>> _scanners_status;
 
     int _max_buffered_batches;
diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp
index d541768a2f..883cbd5040 100644
--- a/be/src/vec/exec/file_text_scanner.cpp
+++ b/be/src/vec/exec/file_text_scanner.cpp
@@ -156,7 +156,7 @@ Status FileTextScanner::_open_file_reader() {
     const TFileRangeDesc& range = _ranges[_next_range];
 
     FileReader* hdfs_reader = nullptr;
-    RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+    RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
                                                     range.start_offset, &hdfs_reader));
     _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader));
     return _cur_file_reader->open();
@@ -171,7 +171,7 @@ Status FileTextScanner::_open_line_reader() {
     const TFileRangeDesc& range = _ranges[_next_range];
     int64_t size = range.size;
     if (range.start_offset != 0) {
-        if (range.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
+        if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
             std::stringstream ss;
             ss << "For now we do not support split compressed file";
             return Status::InternalError(ss.str());
@@ -182,14 +182,14 @@ Status FileTextScanner::_open_line_reader() {
     }
 
     // open line reader
-    switch (range.format_type) {
+    switch (_params.format_type) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
         _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
                                                    _line_delimiter, _line_delimiter_length);
         break;
     default: {
         std::stringstream ss;
-        ss << "Unknown format type, cannot init line reader, type=" << range.format_type;
+        ss << "Unknown format type, cannot init line reader, type=" << _params.format_type;
         return Status::InternalError(ss.str());
     }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 34c224de82..00051837ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -285,6 +285,14 @@ public class ExternalFileScanNode extends ExternalScanNode {
         String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
         String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
+        context.params.setFileType(scanProvider.getTableFileType());
+        context.params.setFormatType(scanProvider.getTableFormatType());
+        // set hdfs params for hdfs file type.
+        if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
+            THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties());
+            tHdfsParams.setFsName(fsName);
+            context.params.setHdfsParams(tHdfsParams);
+        }
 
         TScanRangeLocations curLocations = newLocations(context.params);
 
@@ -298,7 +306,6 @@ public class ExternalFileScanNode extends ExternalScanNode {
                     partitionKeys);
 
             TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);
-            rangeDesc.getHdfsParams().setFsName(fsName);
 
             curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
             Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
@@ -346,17 +353,10 @@ public class ExternalFileScanNode extends ExternalScanNode {
             FileSplit fileSplit,
             List<String> columnsFromPath) throws DdlException, MetaNotFoundException {
         TFileRangeDesc rangeDesc = new TFileRangeDesc();
-        rangeDesc.setFileType(scanProvider.getTableFileType());
-        rangeDesc.setFormatType(scanProvider.getTableFormatType());
         rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
         rangeDesc.setStartOffset(fileSplit.getStart());
         rangeDesc.setSize(fileSplit.getLength());
         rangeDesc.setColumnsFromPath(columnsFromPath);
-        // set hdfs params for hdfs file type.
-        if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
-            THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties());
-            rangeDesc.setHdfsParams(tHdfsParams);
-        }
         return rangeDesc;
     }
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2469ebd4e2..b6d0a3b19a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -226,29 +226,28 @@ struct TFileScanSlotInfo {
 }
 
 struct TFileScanRangeParams {
+  1: optional Types.TFileType file_type;
+  2: optional TFileFormatType format_type;
   // use src_tuple_id to get all slots from src table include both file slot and partition slot.
-  1: optional Types.TTupleId src_tuple_id;
+  3: optional Types.TTupleId src_tuple_id;
   // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot
-  2: optional i32 num_of_columns_from_file;
+  4: optional i32 num_of_columns_from_file;
   // all selected slots which may compose from file and partiton value.
-  3: optional list<TFileScanSlotInfo> required_slots;
+  5: optional list<TFileScanSlotInfo> required_slots;
 
-  4: optional TFileTextScanRangeParams text_params;
+  6: optional THdfsParams hdfs_params;
+  7: optional TFileTextScanRangeParams text_params;
 }
 
 struct TFileRangeDesc {
-    1: optional Types.TFileType file_type;
-    2: optional TFileFormatType format_type;
     // Path of this range
-    3: optional string path;
+    1: optional string path;
     // Offset of this file start
-    4: optional i64 start_offset;
+    2: optional i64 start_offset;
     // Size of this range, if size = -1, this means that will read to the end of file
-    5: optional i64 size;
+    3: optional i64 size;
     // columns parsed from file path should be after the columns read from file
-    6: optional list<string> columns_from_path;
-
-    7: optional THdfsParams hdfs_params;
+    4: optional list<string> columns_from_path;
 }
 
 // HDFS file scan range


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org