You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/13 04:17:47 UTC

[doris] 02/03: [refactor](scan-pool) move scan pool from env to scanner scheduler (#15604)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 247d6fa62d9cce801bfedf157d13f9207dd0d15e
Author: morningman <mo...@163.com>
AuthorDate: Fri Jan 13 12:01:42 2023 +0800

    [refactor](scan-pool) move scan pool from env to scanner scheduler (#15604)
    
    The origin scan pools are in exec_env.
    But after enable new_load_scan_node by default, the scan pool in exec_env is no longer used.
    All scan task will be submitted to the scan pool in scanner_scheduler.
    
    BTW, reorganize the scan pool into 3 kinds:
    
    local scan pool
    For olap scan node
    
    remote scan pool
    For file scan node
    
    limited scan pool
    For query which set cpu resource limit or with small limit clause
    
    TODO:
    Use bthread to unify all IO task.
    
    Some trivial issues:
    
    fix bug that the memtable flush size printed in log is not right
    Add RuntimeProfile param in VScanner
---
 be/src/exec/olap_scan_node.cpp              |  4 +++-
 be/src/olap/memtable.cpp                    |  2 +-
 be/src/olap/memtable_flush_executor.cpp     |  4 ++--
 be/src/olap/rowset/beta_rowset_writer.cpp   | 16 ++++++++--------
 be/src/olap/rowset/beta_rowset_writer.h     |  2 +-
 be/src/olap/rowset/rowset_writer.h          |  2 +-
 be/src/runtime/fragment_mgr.cpp             |  9 ++++++++-
 be/src/runtime/query_fragments_ctx.h        |  3 ++-
 be/src/vec/exec/scan/new_es_scan_node.cpp   |  5 +++--
 be/src/vec/exec/scan/new_es_scanner.cpp     |  4 ++--
 be/src/vec/exec/scan/new_es_scanner.h       |  3 ++-
 be/src/vec/exec/scan/new_jdbc_scan_node.cpp |  5 +++--
 be/src/vec/exec/scan/new_jdbc_scanner.cpp   |  4 ++--
 be/src/vec/exec/scan/new_jdbc_scanner.h     |  2 +-
 be/src/vec/exec/scan/new_odbc_scan_node.cpp |  3 ++-
 be/src/vec/exec/scan/new_odbc_scanner.cpp   |  4 ++--
 be/src/vec/exec/scan/new_odbc_scanner.h     |  2 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp   |  5 ++---
 be/src/vec/exec/scan/new_olap_scanner.h     |  1 -
 be/src/vec/exec/scan/scanner_scheduler.cpp  | 11 +++++++++++
 be/src/vec/exec/scan/scanner_scheduler.h    |  6 ++++++
 be/src/vec/exec/scan/vfile_scanner.cpp      |  3 +--
 be/src/vec/exec/scan/vfile_scanner.h        |  7 ++++---
 be/src/vec/exec/scan/vscanner.cpp           |  3 ++-
 be/src/vec/exec/scan/vscanner.h             |  6 ++++--
 be/test/runtime/test_env.cc                 |  2 --
 26 files changed, 74 insertions(+), 44 deletions(-)

diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 5bf924d1c1..35f3ef5023 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1508,7 +1508,9 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
      *    The larger the nice value, the more preferentially obtained query resources
      * 4. Regularly increase the priority of the remaining tasks in the queue to avoid starvation for large queries
      *********************************/
-    ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token();
+    // after merge #15604, we no long support thread token to non-vec olap scan node,
+    // so keep thread_token as null
+    ThreadPoolToken* thread_token = nullptr;
     PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
     PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool();
     _total_assign_num = 0;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 46cc15275e..1f8465c0f3 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -485,7 +485,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
     } else {
         _collect_vskiplist_results<true>();
         vectorized::Block block = _output_mutable_block.to_block();
-        RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
+        RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block, &_flush_size));
         _flush_size = block.allocated_bytes();
     }
     return Status::OK();
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index c2fe380b1b..5dafa36cc5 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -87,6 +87,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) {
 
     MonotonicStopWatch timer;
     timer.start();
+    size_t memory_usage = memtable->memory_usage();
     Status s = memtable->flush();
     if (!s) {
         LOG(WARNING) << "Flush memtable failed with res = " << s;
@@ -101,8 +102,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) {
     VLOG_CRITICAL << "flush memtable cost: " << timer.elapsed_time()
                   << ", running count: " << _stats.flush_running_count
                   << ", finish count: " << _stats.flush_finish_count
-                  << ", mem size: " << memtable->memory_usage()
-                  << ", disk size: " << memtable->flush_size();
+                  << ", mem size: " << memory_usage << ", disk size: " << memtable->flush_size();
     _stats.flush_time_ns += timer.elapsed_time();
     _stats.flush_finish_count++;
     _stats.flush_running_count--;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 42f57812dc..e64d4f97ee 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -689,7 +689,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
     return Status::OK();
 }
 
-Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
+Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, int64* flush_size) {
     if (block->rows() == 0) {
         return Status::OK();
     }
@@ -697,7 +697,7 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
     std::unique_ptr<segment_v2::SegmentWriter> writer;
     RETURN_NOT_OK(_create_segment_writer(&writer));
     RETURN_NOT_OK(_add_block(block, &writer));
-    RETURN_NOT_OK(_flush_segment_writer(&writer));
+    RETURN_NOT_OK(_flush_segment_writer(&writer, flush_size));
     return Status::OK();
 }
 
@@ -944,12 +944,12 @@ Status BetaRowsetWriter::_create_segment_writer(
         std::unique_ptr<segment_v2::SegmentWriter>* writer) {
     size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
     if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
-        LOG(ERROR) << "too many segments in rowset."
-                   << " tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id
-                   << " max:" << config::max_segment_num_per_rowset
-                   << " _num_segment:" << _num_segment
-                   << " _segcompacted_point:" << _segcompacted_point
-                   << " _num_segcompacted:" << _num_segcompacted;
+        LOG(WARNING) << "too many segments in rowset."
+                     << " tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id
+                     << " max:" << config::max_segment_num_per_rowset
+                     << " _num_segment:" << _num_segment
+                     << " _segcompacted_point:" << _segcompacted_point
+                     << " _num_segcompacted:" << _num_segcompacted;
         return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
     } else {
         return _do_create_segment_writer(writer, false, -1, -1);
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 2d37788930..754491bc82 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -56,7 +56,7 @@ public:
     // Return the file size flushed to disk in "flush_size"
     // This method is thread-safe.
     Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) override;
-    Status flush_single_memtable(const vectorized::Block* block) override;
+    Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size) override;
 
     RowsetSharedPtr build() override;
 
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 26d933c8d0..dbf7776c8d 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -69,7 +69,7 @@ public:
     virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
         return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED);
     }
-    virtual Status flush_single_memtable(const vectorized::Block* block) {
+    virtual Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size) {
         return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED);
     }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 5e0d4f4645..d7992aa9e2 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -737,11 +737,15 @@ void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params,
     // the thread token will be set if
     // 1. the cpu_limit is set, or
     // 2. the limit is very small ( < 1024)
+    // If the token is set, the scan task will use limited_scan_pool in scanner scheduler.
+    // Otherwise, the scan task will use local/remote scan pool in scanner scheduler
     int concurrency = 1;
     bool is_serial = false;
+    bool need_token = false;
     if (params.query_options.__isset.resource_limit &&
         params.query_options.resource_limit.__isset.cpu_limit) {
         concurrency = params.query_options.resource_limit.cpu_limit;
+        need_token = true;
     } else {
         concurrency = config::doris_scanner_thread_pool_thread_num;
     }
@@ -759,11 +763,14 @@ void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params,
             if (node.limit > 0 && node.limit < 1024) {
                 concurrency = 1;
                 is_serial = true;
+                need_token = true;
                 break;
             }
         }
     }
-    fragments_ctx->set_thread_token(concurrency, is_serial);
+    if (need_token) {
+        fragments_ctx->set_thread_token(concurrency, is_serial);
+    }
 #endif
 }
 
diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h
index f3565b9f30..543b9868b5 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -30,6 +30,7 @@
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "util/pretty_printer.h"
 #include "util/threadpool.h"
+#include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/shared_hash_table_controller.h"
 
 namespace doris {
@@ -76,7 +77,7 @@ public:
     }
 
     void set_thread_token(int concurrency, bool is_serial) {
-        _thread_token = _exec_env->limited_scan_thread_pool()->new_token(
+        _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token(
                 is_serial ? ThreadPool::ExecutionMode::SERIAL
                           : ThreadPool::ExecutionMode::CONCURRENT,
                 concurrency);
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp
index 6a57e917b4..bd6f676fe5 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -201,8 +201,9 @@ Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) {
         properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(
                 properties, _column_names, _predicates, _docvalue_context, &doc_value_mode);
 
-        NewEsScanner* scanner = new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id,
-                                                 properties, _docvalue_context, doc_value_mode);
+        NewEsScanner* scanner =
+                new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id, properties,
+                                 _docvalue_context, doc_value_mode, _state->runtime_profile());
 
         _scanner_pool.add(scanner);
         RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp
index 358a226d1e..bae49801a0 100644
--- a/be/src/vec/exec/scan/new_es_scanner.cpp
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -26,8 +26,8 @@ namespace doris::vectorized {
 NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit,
                            TupleId tuple_id, const std::map<std::string, std::string>& properties,
                            const std::map<std::string, std::string>& docvalue_context,
-                           bool doc_value_mode)
-        : VScanner(state, static_cast<VScanNode*>(parent), limit),
+                           bool doc_value_mode, RuntimeProfile* profile)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _is_init(false),
           _es_eof(false),
           _properties(properties),
diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h
index 4e82d72af9..4b97237340 100644
--- a/be/src/vec/exec/scan/new_es_scanner.h
+++ b/be/src/vec/exec/scan/new_es_scanner.h
@@ -30,7 +30,8 @@ class NewEsScanner : public VScanner {
 public:
     NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, TupleId tuple_id,
                  const std::map<std::string, std::string>& properties,
-                 const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode);
+                 const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode,
+                 RuntimeProfile* profile);
 
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
index 955a33970d..eaa511f40d 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -50,8 +50,9 @@ Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
     if (_eos == true) {
         return Status::OK();
     }
-    NewJdbcScanner* scanner = new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id,
-                                                 _query_string, _table_type);
+    NewJdbcScanner* scanner =
+            new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id, _query_string,
+                               _table_type, _state->runtime_profile());
     _scanner_pool.add(scanner);
     RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
     scanners->push_back(static_cast<VScanner*>(scanner));
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index edfb843733..377ae6c800 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -20,8 +20,8 @@
 namespace doris::vectorized {
 NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
                                const TupleId& tuple_id, const std::string& query_string,
-                               TOdbcTableType::type table_type)
-        : VScanner(state, static_cast<VScanNode*>(parent), limit),
+                               TOdbcTableType::type table_type, RuntimeProfile* profile)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _is_init(false),
           _jdbc_eos(false),
           _tuple_id(tuple_id),
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h
index 9fa17c4116..e88b33d252 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.h
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -27,7 +27,7 @@ class NewJdbcScanner : public VScanner {
 public:
     NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
                    const TupleId& tuple_id, const std::string& query_string,
-                   TOdbcTableType::type table_type);
+                   TOdbcTableType::type table_type, RuntimeProfile* profile);
 
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
index db8f8bfc68..1a7296a866 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
@@ -51,7 +51,8 @@ Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
     if (_eos == true) {
         return Status::OK();
     }
-    NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node);
+    NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node,
+                                                 _state->runtime_profile());
     _scanner_pool.add(scanner);
     RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
     scanners->push_back(static_cast<VScanner*>(scanner));
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp
index 71b1a8d525..f19b44c294 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -26,8 +26,8 @@ static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner";
 
 namespace doris::vectorized {
 NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
-                               const TOdbcScanNode& odbc_scan_node)
-        : VScanner(state, static_cast<VScanNode*>(parent), limit),
+                               const TOdbcScanNode& odbc_scan_node, RuntimeProfile* profile)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _is_init(false),
           _odbc_eof(false),
           _table_name(odbc_scan_node.table_name),
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h
index 012fc8b3c1..e238a544fe 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.h
+++ b/be/src/vec/exec/scan/new_odbc_scanner.h
@@ -26,7 +26,7 @@ namespace doris::vectorized {
 class NewOdbcScanner : public VScanner {
 public:
     NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
-                   const TOdbcScanNode& odbc_scan_node);
+                   const TOdbcScanNode& odbc_scan_node, RuntimeProfile* profile);
 
     Status open(RuntimeState* state) override;
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp
index fe1a010df4..cc1f030c6a 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -26,11 +26,10 @@ namespace doris::vectorized {
 NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit,
                                bool aggregation, bool need_agg_finalize,
                                const TPaloScanRange& scan_range, RuntimeProfile* profile)
-        : VScanner(state, static_cast<VScanNode*>(parent), limit),
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _aggregation(aggregation),
           _need_agg_finalize(need_agg_finalize),
-          _version(-1),
-          _profile(profile) {
+          _version(-1) {
     _tablet_schema = std::make_shared<TabletSchema>();
 }
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h
index 5f253ea93f..97ba78aa2b 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -79,7 +79,6 @@ private:
     // ========= profiles ==========
     int64_t _compressed_bytes_read = 0;
     int64_t _raw_rows_read = 0;
-    RuntimeProfile* _profile;
     bool _profile_updated = false;
 };
 } // namespace vectorized
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e67ff364be..63c8fa1355 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -80,6 +80,12 @@ Status ScannerScheduler::init(ExecEnv* env) {
             new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
                                    config::doris_scanner_thread_pool_queue_size, "remote_scan"));
 
+    ThreadPoolBuilder("LimitedScanThreadPool")
+            .set_min_threads(config::doris_scanner_thread_pool_thread_num)
+            .set_max_threads(config::doris_scanner_thread_pool_thread_num)
+            .set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
+            .build(&_limited_scan_thread_pool);
+
     _is_init = true;
     return Status::OK();
 }
@@ -94,6 +100,11 @@ Status ScannerScheduler::submit(ScannerContext* ctx) {
     return Status::OK();
 }
 
+std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
+        ThreadPool::ExecutionMode mode, int max_concurrency) {
+    return _limited_scan_thread_pool->new_token(mode, max_concurrency);
+}
+
 void ScannerScheduler::_schedule_thread(int queue_id) {
     BlockingQueue<ScannerContext*>* queue = _pending_queues[queue_id];
     while (!_is_closed) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h
index f8c1a8f3df..a0062500d9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -19,6 +19,7 @@
 
 #include "common/status.h"
 #include "util/blocking_queue.hpp"
+#include "util/threadpool.h"
 #include "vec/exec/scan/scanner_context.h"
 
 namespace doris::vectorized {
@@ -50,6 +51,9 @@ public:
 
     Status submit(ScannerContext* ctx);
 
+    std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
+                                                                 int max_concurrency);
+
 private:
     // scheduling thread function
     void _schedule_thread(int queue_id);
@@ -75,8 +79,10 @@ private:
     // execution thread pool
     // _local_scan_thread_pool is for local scan task(typically, olap scanner)
     // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.)
+    // _limited_scan_thread_pool is a special pool for queries with resource limit
     std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
     std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
+    std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
     // true is the scheduler is closed.
     std::atomic_bool _is_closed = {false};
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index d0492ea9c7..c1f85cf01b 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -43,14 +43,13 @@ namespace doris::vectorized {
 VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
                            const TFileScanRange& scan_range, RuntimeProfile* profile,
                            KVCache<std::string>& kv_cache)
-        : VScanner(state, static_cast<VScanNode*>(parent), limit),
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _params(scan_range.params),
           _ranges(scan_range.ranges),
           _next_range(0),
           _cur_reader(nullptr),
           _cur_reader_eof(false),
           _mem_pool(std::make_unique<MemPool>()),
-          _profile(profile),
           _kv_cache(kv_cache),
           _strict_mode(false) {
     if (scan_range.params.__isset.strict_mode) {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index 28c0e3d347..48bbfbf3ad 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -45,6 +45,10 @@ public:
     Status prepare(VExprContext** vconjunct_ctx_ptr,
                    std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
 
+    doris::TabletStorageType get_storage_type() override {
+        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
+    }
+
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
 
@@ -106,9 +110,6 @@ protected:
     // Mem pool used to allocate _src_tuple and _src_tuple_row
     std::unique_ptr<MemPool> _mem_pool;
 
-    // Profile
-    RuntimeProfile* _profile;
-
     KVCache<std::string>& _kv_cache;
 
     bool _scanner_eof = false;
diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp
index e9518fbe91..071509f6ef 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -21,10 +21,11 @@
 
 namespace doris::vectorized {
 
-VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit)
+VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, RuntimeProfile* profile)
         : _state(state),
           _parent(parent),
           _limit(limit),
+          _profile(profile),
           _input_tuple_desc(parent->input_tuple_desc()),
           _output_tuple_desc(parent->output_tuple_desc()) {
     _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc;
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 07ddf65298..e8817670e7 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -38,7 +38,7 @@ struct ScannerCounter {
 
 class VScanner {
 public:
-    VScanner(RuntimeState* state, VScanNode* parent, int64_t limit);
+    VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, RuntimeProfile* profile);
 
     virtual ~VScanner() {}
 
@@ -89,7 +89,7 @@ public:
 
     int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); }
 
-    doris::TabletStorageType get_storage_type() {
+    virtual doris::TabletStorageType get_storage_type() {
         return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
     }
 
@@ -133,6 +133,8 @@ protected:
     // Set if scan node has sort limit info
     int64_t _limit = -1;
 
+    RuntimeProfile* _profile;
+
     const TupleDescriptor* _input_tuple_desc = nullptr;
     const TupleDescriptor* _output_tuple_desc = nullptr;
     const TupleDescriptor* _real_tuple_desc = nullptr;
diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc
index dc2b53c9f6..17dbc22b48 100644
--- a/be/test/runtime/test_env.cc
+++ b/be/test/runtime/test_env.cc
@@ -36,7 +36,6 @@ TestEnv::TestEnv() {
     _exec_env->_thread_mgr = new ThreadResourceMgr(2);
     _exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10);
     _exec_env->disk_io_mgr()->init(-1);
-    _exec_env->_scan_thread_pool = new PriorityThreadPool(1, 16, "ut_scan");
     _exec_env->_result_queue_mgr = new ResultQueueMgr();
     // TODO may need rpc support, etc.
 }
@@ -58,7 +57,6 @@ void TestEnv::init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t c
 TestEnv::~TestEnv() {
     SAFE_DELETE(_exec_env->_result_queue_mgr);
     SAFE_DELETE(_exec_env->_buffer_pool);
-    SAFE_DELETE(_exec_env->_scan_thread_pool);
     SAFE_DELETE(_exec_env->_disk_io_mgr);
     SAFE_DELETE(_exec_env->_thread_mgr);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org