You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/02/18 08:02:05 UTC

[GitHub] [incubator-doris] chaoyli opened a new pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor

chaoyli opened a new pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor
URL: https://github.com/apache/incubator-doris/pull/2931
 
 
   1. MemTableFlushExecutor maintain a ThreadPool to receive FlushTask.
   2. FlushToken is used to seperate different tasks from different tablets.
      Every DeltaWriter of tablet constructs a FlushToken,
      task in FlushToken are handle serially, task between FlushToken are
      handle concurrently.
   3. I have remove thread limit on data_dir, because of I/O is not the main
      timer consumer of Flush thread. Much of time is consumed in CPU decoding
      and compress.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor
URL: https://github.com/apache/incubator-doris/pull/2931#discussion_r380576738
 
 

 ##########
 File path: be/src/olap/memtable_flush_executor.h
 ##########
 @@ -38,132 +32,65 @@ class DeltaWriter;
 class ExecEnv;
 class MemTable;
 
-// The context for a memtable to be flushed.
-class FlushHandler;
-struct MemTableFlushContext {
-    // memtable to be flushed
-    std::shared_ptr<MemTable> memtable;
-    // flush handler from a delta writer.
-    // use shared ptr because flush_handler may be deleted before this
-    // memtable being flushed. so we need to make sure the flush_handler
-    // is alive until this memtable being flushed.
-    std::shared_ptr<FlushHandler> flush_handler;
-};
-
-// the flush result of a single memtable flush
-struct FlushResult {
-    OLAPStatus flush_status;
-    int64_t flush_time_ns = 0;
-    int64_t flush_size_bytes = 0;
-};
-
 // the statistic of a certain flush handler.
 // use atomic because it may be updated by multi threads
 struct FlushStatistic {
-    std::atomic<std::int64_t> flush_time_ns = {0};
-    std::atomic<std::int64_t> flush_count= {0};
+    int64_t flush_time_ns = 0;
+    int64_t flush_count= 0;
+    int64_t flush_size_bytes = 0;
 };
 
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
 
-class MemTableFlushExecutor;
-
-// flush handler is for flushing memtables in a delta writer
-// This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception
-// when calling submit();
-class FlushHandler : public std::enable_shared_from_this<FlushHandler> {
+// A thin wrapper of ThreadPoolToken to submit task.
+class FlushToken {
 public:
-    FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor) :
-            _flush_queue_idx(flush_queue_idx),
-            _last_flush_status(OLAP_SUCCESS),
-            _counter_cond(0),
-            _flush_executor(flush_executor),
-            _is_cancelled(false) {
-    }
-
-    // submit a memtable to flush. return error if some previous submitted MemTable has failed
-    OLAPStatus submit(std::shared_ptr<MemTable> memtable);
-    // wait for all memtables submitted by itself to be finished.
+    explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
+        : _flush_status(OLAP_SUCCESS),
+          _flush_token(std::move(flush_pool_token)) {}
+
+    OLAPStatus submit(std::shared_ptr<MemTable> mem_table);
+
+    // error has happpens, so we cancel this token
+    // And remove all tasks in the queue.
+    void cancel();
+
+    // wait all tasks in token to be completed.
     OLAPStatus wait();
+
     // get flush operations' statistics
     const FlushStatistic& get_stats() const { return _stats; }
 
-    bool is_cancelled() {
-        return _last_flush_status.load() != OLAP_SUCCESS || _is_cancelled.load();
-    }
-    void cancel() { _is_cancelled.store(true); }
-
-    // These on_xxx() methods are callback when flush finishes or cancels, user should
-    // not call them directly.
-    // called when a memtable is finished by executor.
-    void on_flush_finished(const FlushResult& res);
-    // called when a flush memtable execution is cancelled
-    void on_flush_cancelled() {
-        _counter_cond.dec();
-    }
-
 private:
-    // flush queue idx in memtable flush executor
-    int32_t _flush_queue_idx;
-    // the flush status of last memtable
-    std::atomic<OLAPStatus> _last_flush_status;
-    // used to wait/notify the memtable flush execution
-    CounterCondVariable _counter_cond;
+    void _flush_memtable(std::shared_ptr<MemTable> mem_table);
 
+    OLAPStatus _flush_status;
+    std::unique_ptr<ThreadPoolToken> _flush_token;
     FlushStatistic _stats;
-    MemTableFlushExecutor* _flush_executor;
-
-    // the caller of the flush handler can set this variable to notify that the
-    // uppper application is already cancelled.
-    std::atomic<bool> _is_cancelled;
 };
 
 // MemTableFlushExecutor is responsible for flushing memtables to disk.
-// Each data directory has a specified number of worker threads and each thread will correspond
-// to a queue. The only job of each worker thread is to take memtable from its corresponding
-// flush queue and writes the data to disk.
-//
-// NOTE: User SHOULD NOT call method of this class directly, use pattern should be:
+// It encapsulate a ThreadPool to handle all tasks.
+// Usage Example:
 //      ...
 //      std::shared_ptr<FlushHandler> flush_handler;
-//      memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler);
+//      memTableFlushExecutor.create_flush_token(path_hash, &flush_handler);
 //      ...
-//      flush_handler->submit(memtable)
+//      flush_token->submit(memtable)
 //      ...
 class MemTableFlushExecutor {
 
 Review comment:
   The only class the user needs to know is `FlushToken`.
   So maybe we should hide the implementation of `MemTableFlushExecutor` in `.cpp` file, because the user does not need to directly know it.  But this can be refactored later. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] lingbin commented on a change in pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor

Posted by GitBox <gi...@apache.org>.
lingbin commented on a change in pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor
URL: https://github.com/apache/incubator-doris/pull/2931#discussion_r380573580
 
 

 ##########
 File path: be/src/olap/memtable_flush_executor.cpp
 ##########
 @@ -24,146 +24,62 @@
 #include "olap/memtable.h"
 #include "runtime/exec_env.h"
 #include "runtime/mem_tracker.h"
+#include "util/scoped_cleanup.h"
 
 namespace doris {
 
-OLAPStatus FlushHandler::submit(std::shared_ptr<MemTable> memtable) {
-    RETURN_NOT_OK(_last_flush_status.load());
-    MemTableFlushContext ctx;
-    ctx.memtable = std::move(memtable);
-    ctx.flush_handler = this->shared_from_this();
-    _counter_cond.inc();
-    VLOG(5) << "submitting " << *(ctx.memtable) << " to flush queue " << _flush_queue_idx;
-    RETURN_NOT_OK(_flush_executor->_push_memtable(_flush_queue_idx, ctx));
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus FlushHandler::wait() {
-    // wait all submitted tasks to be finished or cancelled
-    _counter_cond.block_wait();
-    return _last_flush_status.load();
-}
-
-void FlushHandler::on_flush_finished(const FlushResult& res) {
-    if (res.flush_status != OLAP_SUCCESS) {
-        _last_flush_status.store(res.flush_status);
-    } else {
-        _stats.flush_time_ns.fetch_add(res.flush_time_ns);
-        _stats.flush_count.fetch_add(1);
-    }
-    _counter_cond.dec();
+std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
+    os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
+       << ", flush count=" << stat.flush_count << ")";
+    return os;
 }
 
-OLAPStatus MemTableFlushExecutor::create_flush_handler(
-        size_t path_hash, std::shared_ptr<FlushHandler>* flush_handler) {
-    size_t flush_queue_idx = _get_queue_idx(path_hash);
-    flush_handler->reset(new FlushHandler(flush_queue_idx, this));
+OLAPStatus FlushToken::submit(std::shared_ptr<MemTable> memtable) {
+    _flush_token->submit_func(boost::bind(boost::mem_fn(&FlushToken::_flush_memtable), this, memtable));
 
 Review comment:
   Could you please use `std::bind` instead?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] chaoyli merged pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor

Posted by GitBox <gi...@apache.org>.
chaoyli merged pull request #2931: Use ThreadPool to refactor MemTableFlushExecutor
URL: https://github.com/apache/incubator-doris/pull/2931
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] chaoyli commented on issue #2931: Use ThreadPool to refactor MemTableFlushExecutor

Posted by GitBox <gi...@apache.org>.
chaoyli commented on issue #2931: Use ThreadPool to refactor MemTableFlushExecutor
URL: https://github.com/apache/incubator-doris/pull/2931#issuecomment-587332412
 
 
   ThreadPool related issue #2896 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org