You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/04/23 01:45:24 UTC

[incubator-doris] branch master updated: [Optimize] make tablet meta checkpoint to be threadpool model (#5654)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fa25b6  [Optimize] make tablet meta checkpoint to be threadpool model (#5654)
4fa25b6 is described below

commit 4fa25b6eb9c30ad9434aa551bba13aeb51134c8a
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Fri Apr 23 09:45:15 2021 +0800

    [Optimize] make tablet meta checkpoint to be threadpool model (#5654)
    
    Currently Tablet meta checkpoint is a memory-exhausted operation.
    If a host has 12 disks, it will start 12 threads to do tablet meta checkpoint.
    In our experience, the data size of one tablet can be as high as 2G.
    If 12 threads do the checkpoint at the same time, it maybe cause OOM.
    
    Therefore, this PR try to solve this problem.
    Firstly, it only start one thread to produce table meta checkpoint tasks.
    Secondly, it creates a thread pool to handle these tasks.
    You can configure the size of the thread pool to control the parallelism in case of OOM.
    It is a producer-customer model.
---
 be/src/common/config.h         |  4 ++++
 be/src/olap/olap_server.cpp    | 43 +++++++++++++++++++++++-------------------
 be/src/olap/storage_engine.cpp |  5 ++++-
 be/src/olap/storage_engine.h   |  8 +++++---
 4 files changed, 37 insertions(+), 23 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8b4aab5..0c9fd35 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -286,6 +286,9 @@ CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min
 CONF_mInt32(min_compaction_threads, "10");
 CONF_mInt32(max_compaction_threads, "10");
 
+// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
+CONF_Int32(max_meta_checkpoint_threads, "-1");
+
 // The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
 CONF_mInt64(total_permits_for_compaction_score, "10000");
 
@@ -522,6 +525,7 @@ CONF_Int32(flush_thread_num_per_store, "2");
 // config for tablet meta checkpoint
 CONF_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
 CONF_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
+CONF_Int32(generate_tablet_meta_checkpoint_tasks_interval_secs, "600");
 
 // config for default rowset type
 // Valid configs: ALPHA, BETA
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 97bbf84..ed72dc1 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -81,16 +81,19 @@ Status StorageEngine::start_bg_threads() {
             &_compaction_tasks_producer_thread));
     LOG(INFO) << "compaction tasks producer thread started";
 
-    // tablet checkpoint thread
-    for (auto data_dir : data_dirs) {
-        scoped_refptr<Thread> tablet_checkpoint_thread;
-        RETURN_IF_ERROR(Thread::create(
-                "StorageEngine", "tablet_checkpoint_thread",
-                [this, data_dir]() { this->_tablet_checkpoint_callback(data_dir); },
-                &tablet_checkpoint_thread));
-        _tablet_checkpoint_threads.emplace_back(tablet_checkpoint_thread);
+    int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads;
+    if (max_checkpoint_thread_num < 0) {
+        max_checkpoint_thread_num = data_dirs.size();
     }
-    LOG(INFO) << "tablet checkpoint thread started";
+    ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool")
+            .set_max_threads(max_checkpoint_thread_num)
+            .build(&_tablet_meta_checkpoint_thread_pool);
+
+    RETURN_IF_ERROR(Thread::create(
+            "StorageEngine", "tablet_checkpoint_tasks_producer_thread",
+            [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); },
+            &_tablet_checkpoint_tasks_producer_thread));
+    LOG(INFO) << "tablet checkpoint tasks producer thread started";
 
     // fd cache clean thread
     RETURN_IF_ERROR(Thread::create(
@@ -290,22 +293,24 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) {
     } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
 }
 
-void StorageEngine::_tablet_checkpoint_callback(DataDir* data_dir) {
+void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
 #ifdef GOOGLE_PROFILER
     ProfilerRegisterThread();
 #endif
 
-    int64_t interval = config::tablet_meta_checkpoint_min_interval_secs;
+    int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
     do {
-        LOG(INFO) << "begin to do tablet meta checkpoint:" << data_dir->path();
-        int64_t start_time = UnixMillis();
-        _tablet_manager->do_tablet_meta_checkpoint(data_dir);
-        int64_t used_time = (UnixMillis() - start_time) / 1000;
-        if (used_time < config::tablet_meta_checkpoint_min_interval_secs) {
-            interval = config::tablet_meta_checkpoint_min_interval_secs - used_time;
-        } else {
-            interval = 1;
+        LOG(INFO) << "begin to produce tablet meta checkpoint tasks.";
+        for (auto data_dir : data_dirs) {
+            auto st =_tablet_meta_checkpoint_thread_pool->submit_func([=]() {
+                CgroupsMgr::apply_system_cgroup();
+                _tablet_manager->do_tablet_meta_checkpoint(data_dir);
+            });
+            if (!st.ok()) {
+                LOG(WARNING) << "submit tablet checkpoint tasks failed.";
+            }
         }
+        interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
     } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
 }
 
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 3747d91..cd01226 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -145,6 +145,9 @@ StorageEngine::~StorageEngine() {
     if (_compaction_thread_pool) {
         _compaction_thread_pool->shutdown();
     }
+    if (_tablet_meta_checkpoint_thread_pool) {
+        _tablet_meta_checkpoint_thread_pool->shutdown();
+    }
 }
 
 void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
@@ -543,6 +546,7 @@ void StorageEngine::stop() {
     THREAD_JOIN(_garbage_sweeper_thread);
     THREAD_JOIN(_disk_stat_monitor_thread);
     THREAD_JOIN(_fd_cache_clean_thread);
+    THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
 #undef THREAD_JOIN
 
 #define THREADS_JOIN(threads)           \
@@ -554,7 +558,6 @@ void StorageEngine::stop() {
 
     THREADS_JOIN(_path_gc_threads);
     THREADS_JOIN(_path_scan_threads);
-    THREADS_JOIN(_tablet_checkpoint_threads);
 #undef THREADS_JOIN
 }
 
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index dbae15b..bc06db2 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -224,7 +224,7 @@ private:
 
     void _path_scan_thread_callback(DataDir* data_dir);
 
-    void _tablet_checkpoint_callback(DataDir* data_dir);
+    void _tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs);
 
     // parse the default rowset type config to RowsetTypePB
     void _parse_default_rowset_type();
@@ -318,8 +318,8 @@ private:
     std::vector<scoped_refptr<Thread>> _path_gc_threads;
     // threads to scan disk paths
     std::vector<scoped_refptr<Thread>> _path_scan_threads;
-    // threads to run tablet checkpoint
-    std::vector<scoped_refptr<Thread>> _tablet_checkpoint_threads;
+    // thread to produce tablet checkpoint tasks
+    scoped_refptr<Thread> _tablet_checkpoint_tasks_producer_thread;
 
     // For tablet and disk-stat report
     std::mutex _report_mtx;
@@ -342,6 +342,8 @@ private:
 
     std::unique_ptr<ThreadPool> _compaction_thread_pool;
 
+    std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
+
     CompactionPermitLimiter _permit_limiter;
 
     std::mutex _tablet_submitted_compaction_mutex;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org