You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/18 05:59:26 UTC

[GitHub] [doris] morningman commented on a diff in pull request #10942: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode

morningman commented on code in PR #10942:
URL: https://github.com/apache/doris/pull/10942#discussion_r922991743


##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -205,8 +231,9 @@ Status FileScanNode::close(RuntimeState* state) {
     _scan_finished.store(true);
     _queue_writer_cond.notify_all();
     _queue_reader_cond.notify_all();
-    for (int i = 0; i < _scanner_threads.size(); ++i) {
-        _scanner_threads[i].join();
+    {
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });

Review Comment:
   Why not just using `_scanner_threads[i].join();`?



##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -100,9 +102,33 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();

Review Comment:
   Not a good idea to use scan thread pool for external table scan.
   But we can just leave it here. And later we should refactor this part.



##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -100,9 +102,33 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
     for (int i = 0; i < _scan_ranges.size(); i++) {
-        _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
-                                      std::ref(_scanners_status[i]));
+        Status submit_status = Status::OK();
+        if (thread_token != nullptr) {
+            submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
+                                                                i, _scan_ranges.size(),
+                                                                std::ref(_scanners_status[i])));
+        } else {
+            PriorityThreadPool::WorkFunction task =
+                    std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+                              std::ref(_scanners_status[i]));
+            if (!thread_pool->offer(task)) {
+                submit_status = Status::Cancelled("Failed to submit scan task");
+            }
+        }
+        if (!submit_status.ok()) {
+            LOG(FATAL) << "Failed to assign file scanner task to thread pool! "

Review Comment:
   And if we meet error, no need to continue this loop.



##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -100,9 +102,33 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
     for (int i = 0; i < _scan_ranges.size(); i++) {
-        _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
-                                      std::ref(_scanners_status[i]));
+        Status submit_status = Status::OK();
+        if (thread_token != nullptr) {
+            submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
+                                                                i, _scan_ranges.size(),
+                                                                std::ref(_scanners_status[i])));
+        } else {
+            PriorityThreadPool::WorkFunction task =
+                    std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+                              std::ref(_scanners_status[i]));
+            if (!thread_pool->offer(task)) {
+                submit_status = Status::Cancelled("Failed to submit scan task");
+            }
+        }
+        if (!submit_status.ok()) {
+            LOG(FATAL) << "Failed to assign file scanner task to thread pool! "

Review Comment:
   FATAL log will crash the BE process. Use WARN instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org