You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/18 12:50:39 UTC
[doris] branch master updated: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 60dd322aba [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942)
60dd322aba is described below
commit 60dd322abade619e0adaaf487dd01295ac64198b
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Mon Jul 18 20:50:34 2022 +0800
[feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942)
FileScanNode in be will launch as many threads as the number of splits.
The thrift interface of FileScanNode is excessive redundant.
---
be/src/vec/exec/file_arrow_scanner.cpp | 2 +-
be/src/vec/exec/file_scan_node.cpp | 65 +++++++++++++++++++---
be/src/vec/exec/file_scan_node.h | 1 -
be/src/vec/exec/file_text_scanner.cpp | 8 +--
.../planner/external/ExternalFileScanNode.java | 16 +++---
gensrc/thrift/PlanNodes.thrift | 23 ++++----
6 files changed, 82 insertions(+), 33 deletions(-)
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 59a94399db..79c6037c36 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -55,7 +55,7 @@ Status FileArrowScanner::_open_next_reader() {
const TFileRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
FileReader* hdfs_reader = nullptr;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+ RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
range.start_offset, &hdfs_reader));
file_reader.reset(new BufferedReader(_profile, hdfs_reader));
RETURN_IF_ERROR(file_reader->open());
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
index ff4989a033..ad79cc0536 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -17,12 +17,14 @@
#include "vec/exec/file_scan_node.h"
+#include "common/config.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
+#include "util/priority_thread_pool.hpp"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/types.h"
@@ -100,9 +102,37 @@ Status FileScanNode::start_scanners() {
}
_scanners_status.resize(_scan_ranges.size());
- for (int i = 0; i < _scan_ranges.size(); i++) {
- _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
- std::ref(_scanners_status[i]));
+ ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+ PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
+ for (int i = 0; i < _scan_ranges.size(); ++i) {
+ Status submit_status = Status::OK();
+ if (thread_token != nullptr) {
+ submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
+ i, _scan_ranges.size(),
+ std::ref(_scanners_status[i])));
+ } else {
+ PriorityThreadPool::WorkFunction task =
+ std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+ std::ref(_scanners_status[i]));
+ if (!thread_pool->offer(task)) {
+ submit_status = Status::Cancelled("Failed to submit scan task");
+ }
+ }
+ if (!submit_status.ok()) {
+ LOG(WARNING) << "Failed to assign file scanner task to thread pool! "
+ << submit_status.get_error_msg();
+ _scanners_status[i].set_value(submit_status);
+ for (int j = i + 1; j < _scan_ranges.size(); ++j) {
+ _scanners_status[j].set_value(Status::Cancelled("Cancelled"));
+ }
+ {
+ std::lock_guard<std::mutex> l(_batch_queue_lock);
+ update_status(submit_status);
+ _num_running_scanners -= _scan_ranges.size() - i;
+ }
+ _queue_writer_cond.notify_all();
+ break;
+ }
}
return Status::OK();
}
@@ -205,8 +235,9 @@ Status FileScanNode::close(RuntimeState* state) {
_scan_finished.store(true);
_queue_writer_cond.notify_all();
_queue_reader_cond.notify_all();
- for (int i = 0; i < _scanner_threads.size(); ++i) {
- _scanner_threads[i].join();
+ {
+ std::unique_lock<std::mutex> l(_batch_queue_lock);
+ _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });
}
for (int i = 0; i < _scanners_status.size(); i++) {
std::future<Status> f = _scanners_status[i].get_future();
@@ -308,7 +339,7 @@ void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status
std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
ScannerCounter* counter) {
FileScanner* scan = nullptr;
- switch (scan_range.ranges[0].format_type) {
+ switch (scan_range.params.format_type) {
case TFileFormatType::FORMAT_PARQUET:
scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, _pre_filter_texprs, counter);
@@ -329,7 +360,27 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange&
// This function is called after plan node has been prepared.
Status FileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
- _scan_ranges = scan_ranges;
+ int max_scanners = config::doris_scanner_thread_pool_thread_num;
+ if (scan_ranges.size() <= max_scanners) {
+ _scan_ranges = scan_ranges;
+ } else {
+ // There is no need for the number of scanners to exceed the number of threads in thread pool.
+ _scan_ranges.clear();
+ auto range_iter = scan_ranges.begin();
+ for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
+ _scan_ranges.push_back(*range_iter);
+ }
+ for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
+ if (i == max_scanners) {
+ i = 0;
+ }
+ auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
+ auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
+ }
+ _scan_ranges.shrink_to_fit();
+ LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
+ }
return Status::OK();
}
diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h
index 2d7deb9626..93a8916c0e 100644
--- a/be/src/vec/exec/file_scan_node.h
+++ b/be/src/vec/exec/file_scan_node.h
@@ -105,7 +105,6 @@ private:
Status _process_status;
- std::vector<std::thread> _scanner_threads;
std::vector<std::promise<Status>> _scanners_status;
int _max_buffered_batches;
diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp
index d541768a2f..883cbd5040 100644
--- a/be/src/vec/exec/file_text_scanner.cpp
+++ b/be/src/vec/exec/file_text_scanner.cpp
@@ -156,7 +156,7 @@ Status FileTextScanner::_open_file_reader() {
const TFileRangeDesc& range = _ranges[_next_range];
FileReader* hdfs_reader = nullptr;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
+ RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path,
range.start_offset, &hdfs_reader));
_cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader));
return _cur_file_reader->open();
@@ -171,7 +171,7 @@ Status FileTextScanner::_open_line_reader() {
const TFileRangeDesc& range = _ranges[_next_range];
int64_t size = range.size;
if (range.start_offset != 0) {
- if (range.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
+ if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
std::stringstream ss;
ss << "For now we do not support split compressed file";
return Status::InternalError(ss.str());
@@ -182,14 +182,14 @@ Status FileTextScanner::_open_line_reader() {
}
// open line reader
- switch (range.format_type) {
+ switch (_params.format_type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
_cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
_line_delimiter, _line_delimiter_length);
break;
default: {
std::stringstream ss;
- ss << "Unknown format type, cannot init line reader, type=" << range.format_type;
+ ss << "Unknown format type, cannot init line reader, type=" << _params.format_type;
return Status::InternalError(ss.str());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 34c224de82..00051837ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -285,6 +285,14 @@ public class ExternalFileScanNode extends ExternalScanNode {
String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString();
String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath();
String fsName = fullPath.replace(filePath, "");
+ context.params.setFileType(scanProvider.getTableFileType());
+ context.params.setFormatType(scanProvider.getTableFormatType());
+ // set hdfs params for hdfs file type.
+ if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
+ THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties());
+ tHdfsParams.setFsName(fsName);
+ context.params.setHdfsParams(tHdfsParams);
+ }
TScanRangeLocations curLocations = newLocations(context.params);
@@ -298,7 +306,6 @@ public class ExternalFileScanNode extends ExternalScanNode {
partitionKeys);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath);
- rangeDesc.getHdfsParams().setFsName(fsName);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId()
@@ -346,17 +353,10 @@ public class ExternalFileScanNode extends ExternalScanNode {
FileSplit fileSplit,
List<String> columnsFromPath) throws DdlException, MetaNotFoundException {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
- rangeDesc.setFileType(scanProvider.getTableFileType());
- rangeDesc.setFormatType(scanProvider.getTableFormatType());
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
rangeDesc.setColumnsFromPath(columnsFromPath);
- // set hdfs params for hdfs file type.
- if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) {
- THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties());
- rangeDesc.setHdfsParams(tHdfsParams);
- }
return rangeDesc;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2469ebd4e2..b6d0a3b19a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -226,29 +226,28 @@ struct TFileScanSlotInfo {
}
struct TFileScanRangeParams {
+ 1: optional Types.TFileType file_type;
+ 2: optional TFileFormatType format_type;
// use src_tuple_id to get all slots from src table include both file slot and partition slot.
- 1: optional Types.TTupleId src_tuple_id;
+ 3: optional Types.TTupleId src_tuple_id;
// num_of_columns_from_file can spilt the all_file_slot and all_partition_slot
- 2: optional i32 num_of_columns_from_file;
+ 4: optional i32 num_of_columns_from_file;
// all selected slots which may compose from file and partiton value.
- 3: optional list<TFileScanSlotInfo> required_slots;
+ 5: optional list<TFileScanSlotInfo> required_slots;
- 4: optional TFileTextScanRangeParams text_params;
+ 6: optional THdfsParams hdfs_params;
+ 7: optional TFileTextScanRangeParams text_params;
}
struct TFileRangeDesc {
- 1: optional Types.TFileType file_type;
- 2: optional TFileFormatType format_type;
// Path of this range
- 3: optional string path;
+ 1: optional string path;
// Offset of this file start
- 4: optional i64 start_offset;
+ 2: optional i64 start_offset;
// Size of this range, if size = -1, this means that will read to the end of file
- 5: optional i64 size;
+ 3: optional i64 size;
// columns parsed from file path should be after the columns read from file
- 6: optional list<string> columns_from_path;
-
- 7: optional THdfsParams hdfs_params;
+ 4: optional list<string> columns_from_path;
}
// HDFS file scan range
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org