You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2023/01/17 15:04:24 UTC

[GitHub] [doris] github-actions[bot] commented on a diff in pull request #16031: [feature] Supports bthread scanner

github-actions[bot] commented on code in PR #16031:
URL: https://github.com/apache/doris/pull/16031#discussion_r1072323733


##########
be/src/util/async_io.h:
##########
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <bthread/bthread.h>
+
+#include "io/fs/file_system.h"
+#include "olap/olap_define.h"
+#include "priority_thread_pool.hpp"
+#include "runtime/threadlocal.h"
+
+namespace doris {
+
+struct AsyncIOCtx {
+    int nice;
+};
+
+/**
+ * Separate task from bthread to pthread, specific for IO task.
+ */
+class AsyncIO {
+public:
+    AsyncIO() {
+        _io_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+                                                 config::doris_scanner_thread_pool_queue_size,
+                                                 "async_io_thread_pool");
+        _remote_thread_pool = new PriorityThreadPool(
+                config::doris_remote_scanner_thread_pool_thread_num,
+                config::doris_remote_scanner_thread_pool_queue_size, "async_remote_thread_pool");
+    }
+
+    ~AsyncIO() {
+        SAFE_DELETE(_io_thread_pool);
+        SAFE_DELETE(_remote_thread_pool);
+    }
+
+    AsyncIO& operator=(const AsyncIO&) = delete;
+    AsyncIO(const AsyncIO&) = delete;
+
+    static AsyncIO& instance() {
+        static AsyncIO instance;
+        return instance;
+    }
+
+    // This function should run on the bthread, and it will put the task into
+    // thread_pool and release the bthread_worker at cv.wait. When the task is completed,
+    // the bthread will continue to execute.
+    static void run_task(std::function<void()> fn, io::FileSystemType file_type) {
+        DCHECK(bthread_self() != 0);
+        doris::Mutex mutex;
+        doris::ConditionVariable cv;
+        std::unique_lock l(mutex);
+
+        AsyncIOCtx* ctx = static_cast<AsyncIOCtx*>(bthread_getspecific(btls_io_ctx_key));
+        int nice = -1;
+        if (ctx == nullptr) {
+            nice = 18;
+        } else {
+            nice = ctx->nice;
+        }
+
+        PriorityThreadPool::Task task;
+        task.priority = nice;
+        task.work_function = [&] {
+            fn();
+            std::unique_lock l(mutex);
+            cv.notify_one();
+        };
+
+        if (file_type == io::FileSystemType::S3) {
+            AsyncIO::instance().remote_thread_pool()->offer(task);
+        } else {
+            AsyncIO::instance().io_thread_pool()->offer(task);
+        }
+        cv.wait(l);
+    }
+
+    inline static bthread_key_t btls_io_ctx_key;
+
+    static void io_ctx_key_deleter(void* d) { delete static_cast<AsyncIOCtx*>(d); }
+
+private:
+    PriorityThreadPool* _io_thread_pool = nullptr;
+    PriorityThreadPool* _remote_thread_pool = nullptr;
+
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/util/async_io.h:79:** previously declared here
   ```cpp
   private:
   ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
     void drain_and_shutdown() override {
         {
-            std::unique_lock<std::mutex> l(_lock);
+            std::unique_lock l(_lock);

Review Comment:
   warning: use of undeclared identifier '_lock'; did you mean 'clock'? [clang-diagnostic-error]
   
   ```suggestion
               std::unique_lock l(clock);
   ```
   **/usr/include/time.h:71:** 'clock' declared here
   ```cpp
   extern clock_t clock (void) __THROW;
                  ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
     void drain_and_shutdown() override {
         {
-            std::unique_lock<std::mutex> l(_lock);
+            std::unique_lock l(_lock);
             while (get_queue_size() != 0) {
                 _empty_cv.wait(l);

Review Comment:
   warning: use of undeclared identifier '_empty_cv' [clang-diagnostic-error]
   ```cpp
                   _empty_cv.wait(l);
                   ^
   ```
   



##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@ class PriorityWorkStealingThreadPool : public PriorityThreadPool {
     // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
     void drain_and_shutdown() override {

Review Comment:
   warning: only virtual member functions can be marked 'override' [clang-diagnostic-error]
   
   ```suggestion
       void drain_and_shutdown() {
   ```
   



##########
be/src/util/once.h:
##########
@@ -54,10 +55,16 @@ class DorisCallOnce {
     // lambda and stores its return value. Otherwise, returns the stored Status.
     template <typename Fn>
     ReturnType call(Fn fn) {
-        std::call_once(_once_flag, [this, fn] {
-            _status = fn();
-            _has_called.store(true, std::memory_order_release);
-        });
+        if (!_has_called.load(std::memory_order_acquire)) {
+            do {
+                std::lock_guard l(_mutex);
+                if (_has_called.load(std::memory_order_acquire)) break;

Review Comment:
   warning: statement should be inside braces [readability-braces-around-statements]
   
   ```suggestion
                   if (_has_called.load(std::memory_order_acquire)) { break;
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org