You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/29 03:21:26 UTC

[incubator-doris] branch master updated: [Bug] Fix bug that storage engine bg threads should start after env is ready

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 74b987f  [Bug] Fix bug that storage engine bg threads should start after env is ready
74b987f is described below

commit 74b987f0535b0d34ed5c3d61cd9af0fce7be483e
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Apr 29 11:21:19 2020 +0800

    [Bug] Fix bug that storage engine bg threads should start after env is ready
---
 be/src/olap/olap_server.cpp    | 16 +++++++++++++---
 be/src/olap/storage_engine.cpp |  5 -----
 be/src/olap/storage_engine.h   |  5 +++--
 be/src/service/doris_main.cpp  | 15 +++++++++++----
 4 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 6015f5e..e5b5056 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -26,6 +26,7 @@
 
 #include <gperftools/profiler.h>
 
+#include "common/status.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -51,12 +52,13 @@ namespace doris {
 // number of running SCHEMA-CHANGE threads
 volatile uint32_t g_schema_change_active_threads = 0;
 
-OLAPStatus StorageEngine::_start_bg_worker() {
+Status StorageEngine::start_bg_threads() {
     _unused_rowset_monitor_thread =  std::thread(
         [this] {
             _unused_rowset_monitor_thread_callback(nullptr);
         });
     _unused_rowset_monitor_thread.detach();
+    LOG(INFO) << "unused rowset monitor thread started";
 
     // start thread for monitoring the snapshot and trash folder
     _garbage_sweeper_thread = std::thread(
@@ -64,12 +66,15 @@ OLAPStatus StorageEngine::_start_bg_worker() {
             _garbage_sweeper_thread_callback(nullptr);
         });
     _garbage_sweeper_thread.detach();
+    LOG(INFO) << "garbage sweeper thread started";
+
     // start thread for monitoring the tablet with io error
     _disk_stat_monitor_thread = std::thread(
         [this] {
             _disk_stat_monitor_thread_callback(nullptr);
         });
     _disk_stat_monitor_thread.detach();
+    LOG(INFO) << "disk stat monitor thread started";
 
     // convert store map to vector
     std::vector<DataDir*> data_dirs;
@@ -101,6 +106,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
     for (auto& thread : _base_compaction_threads) {
         thread.detach();
     }
+    LOG(INFO) << "base compaction threads started. number: " << base_compaction_num_threads;
 
     _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads);
     for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) {
@@ -112,6 +118,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
     for (auto& thread : _cumulative_compaction_threads) {
         thread.detach();
     }
+    LOG(INFO) << "cumulative compaction threads started. number: " << cumulative_compaction_num_threads;
 
     // tablet checkpoint thread
     for (auto data_dir : data_dirs) {
@@ -123,6 +130,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
     for (auto& thread : _tablet_checkpoint_threads) {
         thread.detach();
     }
+    LOG(INFO) << "tablet checkpint thread started";
 
     // fd cache clean thread
     _fd_cache_clean_thread = std::thread(
@@ -130,6 +138,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
             _fd_cache_clean_callback(nullptr);
         });
     _fd_cache_clean_thread.detach();
+    LOG(INFO) << "fd cache clean thread started";
 
     // path scan and gc thread
     if (config::path_gc_check) {
@@ -150,10 +159,11 @@ OLAPStatus StorageEngine::_start_bg_worker() {
         for (auto& thread : _path_gc_threads) {
             thread.detach();
         }
+        LOG(INFO) << "path scan/gc threads started. number:" << get_stores().size();
     }
 
-    VLOG(10) << "all bg worker started.";
-    return OLAP_SUCCESS;
+    LOG(INFO) << "all storage engine's backgroud threads are started.";
+    return Status::OK();
 }
 
 void* StorageEngine::_fd_cache_clean_callback(void* arg) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 2c53317..3f120b8 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -97,11 +97,6 @@ Status StorageEngine::open(const EngineOptions& options, StorageEngine** engine_
         LOG(WARNING) << "engine open failed, res=" << st;
         return Status::InternalError("open engine failed");
     }
-    st = engine->_start_bg_worker();
-    if (st != OLAP_SUCCESS) {
-        LOG(WARNING) << "engine start background failed, res=" << st;
-        return Status::InternalError("open engine failed");
-    }
     *engine_ptr = engine.release();
     LOG(INFO) << "success to init storage engine.";
     return Status::OK();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 7177968..a29f1c1 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -190,13 +190,14 @@ public:
         _heartbeat_flags = heartbeat_flags;
     }
 
+    // start all backgroud threads. This should be call after env is ready.
+    Status start_bg_threads();
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
     OLAPStatus _open();
 
-    OLAPStatus _start_bg_worker();
-
     // Clear status(tables, ...)
     void _clear();
 
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index ee4a5ea..42ddc14 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -173,7 +173,7 @@ int main(int argc, char** argv) {
         exit(-1);
     }
 
-    // options
+    // init and open storage engine
     doris::EngineOptions options;
     options.store_paths = paths;
     options.backend_uid = doris::UniqueId::gen_uid();
@@ -184,15 +184,20 @@ int main(int argc, char** argv) {
         exit(-1);
     }
 
-    // start backend service for the coordinator on be_port
+    // init exec env
     auto exec_env = doris::ExecEnv::GetInstance();
     doris::ExecEnv::init(exec_env, paths);
     exec_env->set_storage_engine(engine);
     engine->set_heartbeat_flags(exec_env->heartbeat_flags());
 
+    // start all backgroud threads of storage engine.
+    // SHOULD be called after exec env is initialized.
+    EXIT_IF_ERROR(engine->start_bg_threads());
+
+    // begin to start services
     doris::ThriftRpcHelper::setup(exec_env);
+    // 1. thrift server with be_port
     doris::ThriftServer* be_server = nullptr;
-
     EXIT_IF_ERROR(
             doris::BackendService::create_service(exec_env, doris::config::be_port, &be_server));
     Status status = be_server->start();
@@ -202,6 +207,7 @@ int main(int argc, char** argv) {
         exit(1);
     }
 
+    // 2. bprc service
     doris::BRpcService brpc_service(exec_env);
     status = brpc_service.start(doris::config::brpc_port);
     if (!status.ok()) {
@@ -210,6 +216,7 @@ int main(int argc, char** argv) {
         exit(1);
     }
 
+    // 3. http service
     doris::HttpService http_service(exec_env, doris::config::webserver_port,
                                     doris::config::webserver_num_workers);
     status = http_service.start();
@@ -219,8 +226,8 @@ int main(int argc, char** argv) {
         exit(1);
     }
 
+    // 4. heart beat server
     doris::TMasterInfo* master_info = exec_env->master_info();
-    // start heart beat server
     doris::ThriftServer* heartbeat_thrift_server;
     doris::AgentStatus heartbeat_status = doris::create_heartbeat_server(
             exec_env, doris::config::heartbeat_service_port, &heartbeat_thrift_server,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org