You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/29 03:21:26 UTC
[incubator-doris] branch master updated: [Bug] Fix bug that storage
engine bg threads should start after env is ready
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 74b987f [Bug] Fix bug that storage engine bg threads should start after env is ready
74b987f is described below
commit 74b987f0535b0d34ed5c3d61cd9af0fce7be483e
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Apr 29 11:21:19 2020 +0800
[Bug] Fix bug that storage engine bg threads should start after env is ready
---
be/src/olap/olap_server.cpp | 16 +++++++++++++---
be/src/olap/storage_engine.cpp | 5 -----
be/src/olap/storage_engine.h | 5 +++--
be/src/service/doris_main.cpp | 15 +++++++++++----
4 files changed, 27 insertions(+), 14 deletions(-)
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 6015f5e..e5b5056 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -26,6 +26,7 @@
#include <gperftools/profiler.h>
+#include "common/status.h"
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -51,12 +52,13 @@ namespace doris {
// number of running SCHEMA-CHANGE threads
volatile uint32_t g_schema_change_active_threads = 0;
-OLAPStatus StorageEngine::_start_bg_worker() {
+Status StorageEngine::start_bg_threads() {
_unused_rowset_monitor_thread = std::thread(
[this] {
_unused_rowset_monitor_thread_callback(nullptr);
});
_unused_rowset_monitor_thread.detach();
+ LOG(INFO) << "unused rowset monitor thread started";
// start thread for monitoring the snapshot and trash folder
_garbage_sweeper_thread = std::thread(
@@ -64,12 +66,15 @@ OLAPStatus StorageEngine::_start_bg_worker() {
_garbage_sweeper_thread_callback(nullptr);
});
_garbage_sweeper_thread.detach();
+ LOG(INFO) << "garbage sweeper thread started";
+
// start thread for monitoring the tablet with io error
_disk_stat_monitor_thread = std::thread(
[this] {
_disk_stat_monitor_thread_callback(nullptr);
});
_disk_stat_monitor_thread.detach();
+ LOG(INFO) << "disk stat monitor thread started";
// convert store map to vector
std::vector<DataDir*> data_dirs;
@@ -101,6 +106,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
for (auto& thread : _base_compaction_threads) {
thread.detach();
}
+ LOG(INFO) << "base compaction threads started. number: " << base_compaction_num_threads;
_cumulative_compaction_threads.reserve(cumulative_compaction_num_threads);
for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) {
@@ -112,6 +118,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
for (auto& thread : _cumulative_compaction_threads) {
thread.detach();
}
+ LOG(INFO) << "cumulative compaction threads started. number: " << cumulative_compaction_num_threads;
// tablet checkpoint thread
for (auto data_dir : data_dirs) {
@@ -123,6 +130,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
for (auto& thread : _tablet_checkpoint_threads) {
thread.detach();
}
+ LOG(INFO) << "tablet checkpint thread started";
// fd cache clean thread
_fd_cache_clean_thread = std::thread(
@@ -130,6 +138,7 @@ OLAPStatus StorageEngine::_start_bg_worker() {
_fd_cache_clean_callback(nullptr);
});
_fd_cache_clean_thread.detach();
+ LOG(INFO) << "fd cache clean thread started";
// path scan and gc thread
if (config::path_gc_check) {
@@ -150,10 +159,11 @@ OLAPStatus StorageEngine::_start_bg_worker() {
for (auto& thread : _path_gc_threads) {
thread.detach();
}
+ LOG(INFO) << "path scan/gc threads started. number:" << get_stores().size();
}
- VLOG(10) << "all bg worker started.";
- return OLAP_SUCCESS;
+ LOG(INFO) << "all storage engine's backgroud threads are started.";
+ return Status::OK();
}
void* StorageEngine::_fd_cache_clean_callback(void* arg) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 2c53317..3f120b8 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -97,11 +97,6 @@ Status StorageEngine::open(const EngineOptions& options, StorageEngine** engine_
LOG(WARNING) << "engine open failed, res=" << st;
return Status::InternalError("open engine failed");
}
- st = engine->_start_bg_worker();
- if (st != OLAP_SUCCESS) {
- LOG(WARNING) << "engine start background failed, res=" << st;
- return Status::InternalError("open engine failed");
- }
*engine_ptr = engine.release();
LOG(INFO) << "success to init storage engine.";
return Status::OK();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 7177968..a29f1c1 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -190,13 +190,14 @@ public:
_heartbeat_flags = heartbeat_flags;
}
+ // start all backgroud threads. This should be call after env is ready.
+ Status start_bg_threads();
+
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
OLAPStatus _open();
- OLAPStatus _start_bg_worker();
-
// Clear status(tables, ...)
void _clear();
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index ee4a5ea..42ddc14 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -173,7 +173,7 @@ int main(int argc, char** argv) {
exit(-1);
}
- // options
+ // init and open storage engine
doris::EngineOptions options;
options.store_paths = paths;
options.backend_uid = doris::UniqueId::gen_uid();
@@ -184,15 +184,20 @@ int main(int argc, char** argv) {
exit(-1);
}
- // start backend service for the coordinator on be_port
+ // init exec env
auto exec_env = doris::ExecEnv::GetInstance();
doris::ExecEnv::init(exec_env, paths);
exec_env->set_storage_engine(engine);
engine->set_heartbeat_flags(exec_env->heartbeat_flags());
+ // start all backgroud threads of storage engine.
+ // SHOULD be called after exec env is initialized.
+ EXIT_IF_ERROR(engine->start_bg_threads());
+
+ // begin to start services
doris::ThriftRpcHelper::setup(exec_env);
+ // 1. thrift server with be_port
doris::ThriftServer* be_server = nullptr;
-
EXIT_IF_ERROR(
doris::BackendService::create_service(exec_env, doris::config::be_port, &be_server));
Status status = be_server->start();
@@ -202,6 +207,7 @@ int main(int argc, char** argv) {
exit(1);
}
+ // 2. bprc service
doris::BRpcService brpc_service(exec_env);
status = brpc_service.start(doris::config::brpc_port);
if (!status.ok()) {
@@ -210,6 +216,7 @@ int main(int argc, char** argv) {
exit(1);
}
+ // 3. http service
doris::HttpService http_service(exec_env, doris::config::webserver_port,
doris::config::webserver_num_workers);
status = http_service.start();
@@ -219,8 +226,8 @@ int main(int argc, char** argv) {
exit(1);
}
+ // 4. heart beat server
doris::TMasterInfo* master_info = exec_env->master_info();
- // start heart beat server
doris::ThriftServer* heartbeat_thrift_server;
doris::AgentStatus heartbeat_status = doris::create_heartbeat_server(
exec_env, doris::config::heartbeat_service_port, &heartbeat_thrift_server,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org