You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/17 11:46:20 UTC

[GitHub] [doris] AshinGau opened a new pull request, #10942: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode

AshinGau opened a new pull request, #10942:
URL: https://github.com/apache/doris/pull/10942

   # Proposed changes
   
   Optimize threads and thrift interface of FileScanNode
   
   ## Problem Summary:
   
   FileScanNode in be will launch as many threads as the number of splits.
   The thrift interface of FileScanNode is excessive redundant.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (No)
   2. Has unit tests been added: (No)
   3. Has document been added or modified: (No)
   4. Does it need to update dependencies: (No)
   5. Are there any changes that cannot be rolled back: (No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] AshinGau commented on a diff in pull request #10942: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode

Posted by GitBox <gi...@apache.org>.
AshinGau commented on code in PR #10942:
URL: https://github.com/apache/doris/pull/10942#discussion_r923007998


##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -205,8 +231,9 @@ Status FileScanNode::close(RuntimeState* state) {
     _scan_finished.store(true);
     _queue_writer_cond.notify_all();
     _queue_reader_cond.notify_all();
-    for (int i = 0; i < _scanner_threads.size(); ++i) {
-        _scanner_threads[i].join();
+    {
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });

Review Comment:
   Threads in the thread pool are reused and will not be terminated or released.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on a diff in pull request #10942: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode

Posted by GitBox <gi...@apache.org>.
morningman commented on code in PR #10942:
URL: https://github.com/apache/doris/pull/10942#discussion_r922991743


##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -205,8 +231,9 @@ Status FileScanNode::close(RuntimeState* state) {
     _scan_finished.store(true);
     _queue_writer_cond.notify_all();
     _queue_reader_cond.notify_all();
-    for (int i = 0; i < _scanner_threads.size(); ++i) {
-        _scanner_threads[i].join();
+    {
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });

Review Comment:
   Why not just using `_scanner_threads[i].join();`?



##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -100,9 +102,33 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();

Review Comment:
   Not a good idea to use scan thread pool for external table scan.
   But we can just leave it here. And later we should refactor this part.



##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -100,9 +102,33 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
     for (int i = 0; i < _scan_ranges.size(); i++) {
-        _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
-                                      std::ref(_scanners_status[i]));
+        Status submit_status = Status::OK();
+        if (thread_token != nullptr) {
+            submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
+                                                                i, _scan_ranges.size(),
+                                                                std::ref(_scanners_status[i])));
+        } else {
+            PriorityThreadPool::WorkFunction task =
+                    std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+                              std::ref(_scanners_status[i]));
+            if (!thread_pool->offer(task)) {
+                submit_status = Status::Cancelled("Failed to submit scan task");
+            }
+        }
+        if (!submit_status.ok()) {
+            LOG(FATAL) << "Failed to assign file scanner task to thread pool! "

Review Comment:
   And if we meet error, no need to continue this loop.



##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -100,9 +102,33 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
     for (int i = 0; i < _scan_ranges.size(); i++) {
-        _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
-                                      std::ref(_scanners_status[i]));
+        Status submit_status = Status::OK();
+        if (thread_token != nullptr) {
+            submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
+                                                                i, _scan_ranges.size(),
+                                                                std::ref(_scanners_status[i])));
+        } else {
+            PriorityThreadPool::WorkFunction task =
+                    std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+                              std::ref(_scanners_status[i]));
+            if (!thread_pool->offer(task)) {
+                submit_status = Status::Cancelled("Failed to submit scan task");
+            }
+        }
+        if (!submit_status.ok()) {
+            LOG(FATAL) << "Failed to assign file scanner task to thread pool! "

Review Comment:
   FATAL log will crash the BE process. Use WARN instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] AshinGau commented on a diff in pull request #10942: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode

Posted by GitBox <gi...@apache.org>.
AshinGau commented on code in PR #10942:
URL: https://github.com/apache/doris/pull/10942#discussion_r923045171


##########
be/src/vec/exec/file_scan_node.cpp:
##########
@@ -100,9 +102,33 @@ Status FileScanNode::start_scanners() {
     }
 
     _scanners_status.resize(_scan_ranges.size());
+    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
+    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
     for (int i = 0; i < _scan_ranges.size(); i++) {
-        _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
-                                      std::ref(_scanners_status[i]));
+        Status submit_status = Status::OK();
+        if (thread_token != nullptr) {
+            submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
+                                                                i, _scan_ranges.size(),
+                                                                std::ref(_scanners_status[i])));
+        } else {
+            PriorityThreadPool::WorkFunction task =
+                    std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
+                              std::ref(_scanners_status[i]));
+            if (!thread_pool->offer(task)) {
+                submit_status = Status::Cancelled("Failed to submit scan task");
+            }
+        }
+        if (!submit_status.ok()) {
+            LOG(FATAL) << "Failed to assign file scanner task to thread pool! "

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman merged pull request #10942: [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode

Posted by GitBox <gi...@apache.org>.
morningman merged PR #10942:
URL: https://github.com/apache/doris/pull/10942


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org