You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/13 01:55:35 UTC
[incubator-doris] branch master updated: [refactor] refactor code, replace some file with stl libs (#8759)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 290366787c [refactor] refactor code, replace some file with stl libs (#8759)
290366787c is described below
commit 290366787c60aa3cb79c0bcba928b8393dc7b659
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Apr 13 09:55:29 2022 +0800
[refactor] refactor code, replace some file with stl libs (#8759)
1. replace ConditionVariables with std::condition_variable
2. repalace Mutex with std::mutex
3. repalce MonoTime with std::chrono
---
be/src/agent/task_worker_pool.cpp | 115 +++---
be/src/agent/task_worker_pool.h | 18 +-
be/src/agent/topic_subscriber.cpp | 6 +-
be/src/common/daemon.cpp | 10 +-
be/src/exec/broker_reader.cpp | 7 +-
be/src/exec/olap_scanner.cpp | 16 +-
be/src/exec/tablet_sink.cpp | 9 +-
be/src/exec/tablet_sink.h | 2 +-
be/src/exprs/utility_functions.cpp | 5 +-
be/src/olap/compaction.cpp | 7 +-
be/src/olap/data_dir.cpp | 19 +-
be/src/olap/data_dir.h | 3 +-
be/src/olap/delta_writer.cpp | 20 +-
be/src/olap/lru_cache.h | 1 -
be/src/olap/olap_server.cpp | 90 +++--
be/src/olap/out_stream.cpp | 19 +-
be/src/olap/push_handler.cpp | 10 +-
be/src/olap/reader.cpp | 206 +++++------
be/src/olap/schema_change.cpp | 91 +++--
be/src/olap/snapshot_manager.cpp | 36 +-
be/src/olap/snapshot_manager.h | 2 +-
be/src/olap/storage_engine.cpp | 8 +-
be/src/olap/storage_engine.h | 2 +-
be/src/olap/tablet.cpp | 83 +++--
be/src/olap/tablet.h | 8 +-
be/src/olap/tablet_manager.cpp | 117 +++---
be/src/olap/tablet_meta.cpp | 16 +-
be/src/olap/tablet_meta.h | 1 -
be/src/olap/task/engine_checksum_task.cpp | 9 +-
be/src/olap/task/engine_clone_task.cpp | 39 +-
be/src/olap/task/engine_storage_migration_task.cpp | 68 ++--
be/src/olap/task/engine_storage_migration_task.h | 16 +-
be/src/olap/txn_manager.cpp | 41 ++-
be/src/olap/txn_manager.h | 6 +-
be/src/olap/utils.cpp | 1 -
be/src/runtime/broker_mgr.cpp | 2 +-
be/src/runtime/bufferpool/buffer_pool_internal.h | 1 -
be/src/runtime/external_scan_context_mgr.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/load_channel_mgr.cpp | 6 +-
be/src/runtime/load_path_mgr.cpp | 2 +-
be/src/runtime/result_buffer_mgr.cpp | 2 +-
be/src/runtime/routine_load/data_consumer_pool.cpp | 6 +-
be/src/util/CMakeLists.txt | 3 -
be/src/util/barrier.h | 24 +-
be/src/util/blocking_priority_queue.hpp | 4 +-
be/src/util/condition_variable.cpp | 86 -----
be/src/util/condition_variable.h | 114 ------
be/src/util/countdown_latch.h | 80 ++--
be/src/util/monotime.cpp | 305 ----------------
be/src/util/monotime.h | 406 ---------------------
be/src/util/mutex.cpp | 85 -----
be/src/util/mutex.h | 96 -----
be/src/util/os_util.h | 1 -
be/src/util/runtime_profile.cpp | 1 -
be/src/util/runtime_profile.h | 21 --
be/src/util/thread.cpp | 17 +-
be/src/util/thread_group.h | 16 +-
be/src/util/threadpool.cpp | 113 ++----
be/src/util/threadpool.h | 85 ++---
be/src/util/thrift_client.cpp | 3 +-
be/src/util/thrift_rpc_helper.cpp | 7 +-
be/src/util/thrift_util.cpp | 3 +-
be/src/vec/functions/function_utility.cpp | 6 +-
be/test/CMakeLists.txt | 1 -
be/test/exprs/runtime_filter_test.cpp | 2 +-
be/test/olap/lru_cache_test.cpp | 2 +-
be/test/olap/skiplist_test.cpp | 22 +-
be/test/runtime/fragment_mgr_test.cpp | 3 +-
be/test/runtime/stream_load_pipe_test.cpp | 6 +-
be/test/udf/uda_test.cpp | 2 +-
be/test/util/countdown_latch_test.cpp | 6 +-
be/test/util/monotime_test.cpp | 404 --------------------
be/test/util/thread_test.cpp | 14 +-
be/test/util/threadpool_test.cpp | 44 ++-
be/test/util/trace_test.cpp | 3 +-
build.sh | 8 +-
77 files changed, 784 insertions(+), 2339 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index f68196112a..5ae03100ce 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -50,23 +50,12 @@
#include "service/backend_options.h"
#include "util/doris_metrics.h"
#include "util/file_utils.h"
-#include "util/monotime.h"
#include "util/random.h"
#include "util/scoped_cleanup.h"
#include "util/stopwatch.hpp"
#include "util/threadpool.h"
#include "util/trace.h"
-using std::deque;
-using std::list;
-using std::lock_guard;
-using std::map;
-using std::set;
-using std::string;
-using std::stringstream;
-using std::to_string;
-using std::vector;
-
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(agent_task_queue_size, MetricUnit::NOUNIT);
@@ -75,8 +64,8 @@ const uint32_t TASK_FINISH_MAX_RETRY = 3;
const uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
std::atomic_ulong TaskWorkerPool::_s_report_version(time(nullptr) * 10000);
-Mutex TaskWorkerPool::_s_task_signatures_lock;
-map<TTaskType::type, set<int64_t>> TaskWorkerPool::_s_task_signatures;
+std::mutex TaskWorkerPool::_s_task_signatures_lock;
+std::map<TTaskType::type, std::set<int64_t>> TaskWorkerPool::_s_task_signatures;
FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache;
TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env,
@@ -85,7 +74,6 @@ TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* e
_agent_utils(new AgentUtils()),
_master_client(new MasterServerClient(_master_info, &_master_service_client_cache)),
_env(env),
- _worker_thread_condition_variable(&_worker_thread_lock),
_stop_background_threads_latch(1),
_is_work(false),
_thread_model(thread_model),
@@ -104,7 +92,7 @@ TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* e
if (_thread_model == ThreadModel::SINGLE_THREAD) {
return _is_doing_work.load();
} else {
- lock_guard<Mutex> lock(_worker_thread_lock);
+ std::lock_guard<std::mutex> lock(_worker_thread_lock);
return _tasks.size();
}
});
@@ -227,7 +215,7 @@ void TaskWorkerPool::start() {
void TaskWorkerPool::stop() {
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_is_work = false;
_worker_thread_condition_variable.notify_all();
}
@@ -247,7 +235,7 @@ void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
size_t task_count_in_queue = 0;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
_tasks.push_back(task);
task_count_in_queue = _tasks.size();
_worker_thread_condition_variable.notify_one();
@@ -265,7 +253,7 @@ void TaskWorkerPool::notify_thread() {
}
bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_t signature) {
- lock_guard<Mutex> task_signatures_lock(_s_task_signatures_lock);
+ lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock);
set<int64_t>& signature_set = _s_task_signatures[task_type];
return signature_set.insert(signature).second;
}
@@ -273,7 +261,7 @@ bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, int64_
void TaskWorkerPool::_remove_task_info(const TTaskType::type task_type, int64_t signature) {
size_t queue_size;
{
- lock_guard<Mutex> task_signatures_lock(_s_task_signatures_lock);
+ lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock);
set<int64_t>& signature_set = _s_task_signatures[task_type];
signature_set.erase(signature);
queue_size = signature_set.size();
@@ -340,9 +328,9 @@ void TaskWorkerPool::_create_tablet_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TCreateTabletReq create_tablet_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -414,9 +402,9 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TDropTabletReq drop_tablet_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -467,9 +455,9 @@ void TaskWorkerPool::_alter_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -592,7 +580,7 @@ void TaskWorkerPool::_push_worker_thread_callback() {
int32_t push_worker_count_high_priority = config::push_worker_count_high_priority;
static uint32_t s_worker_count = 0;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
if (s_worker_count < push_worker_count_high_priority) {
++s_worker_count;
priority = TPriority::HIGH;
@@ -605,9 +593,9 @@ void TaskWorkerPool::_push_worker_thread_callback() {
TPushReq push_req;
int32_t index = 0;
do {
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -681,9 +669,9 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TPublishVersionRequest publish_version_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -712,7 +700,7 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
<< publish_version_req.transaction_id
<< ", error_tablets_size=" << error_tablet_ids.size() << "]";
++retry_time;
- SleepFor(MonoDelta::FromSeconds(1));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
@@ -745,9 +733,9 @@ void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TClearTransactionTaskRequest clear_transaction_task_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -803,9 +791,9 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TUpdateTabletMetaInfoReq update_tablet_meta_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -830,7 +818,7 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
<< " schema_hash=" << tablet_meta_info.schema_hash;
continue;
}
- WriteLock wrlock(tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
// update tablet meta
if (!tablet_meta_info.__isset.meta_type) {
tablet->set_partition_id(tablet_meta_info.partition_id);
@@ -871,9 +859,9 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
TCloneReq clone_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -924,9 +912,9 @@ void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TStorageMediumMigrateReq storage_medium_migrate_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -1018,9 +1006,9 @@ OLAPStatus TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq
*dest_store = stores[0];
}
if (tablet->data_dir()->path() == (*dest_store)->path()) {
- LOG(INFO) << "tablet is already on specified path. "
- << "path=" << tablet->data_dir()->path();
- return OLAP_REQUEST_FAILED;
+ LOG(INFO) << "tablet is already on specified path. "
+ << "path=" << tablet->data_dir()->path();
+ return OLAP_REQUEST_FAILED;
}
// check disk capacity
@@ -1039,9 +1027,9 @@ void TaskWorkerPool::_check_consistency_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TCheckConsistencyReq check_consistency_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -1094,8 +1082,9 @@ void TaskWorkerPool::_report_task_worker_thread_callback() {
while (_is_work) {
_is_doing_work = false;
// wait at most report_task_interval_seconds, or being notified
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
- MonoDelta::FromSeconds(config::report_task_interval_seconds));
+ worker_thread_lock, std::chrono::seconds(config::report_task_interval_seconds));
if (!_is_work) {
break;
}
@@ -1112,7 +1101,7 @@ void TaskWorkerPool::_report_task_worker_thread_callback() {
// See _random_sleep() comment in _report_disk_state_worker_thread_callback
_random_sleep(5);
{
- lock_guard<Mutex> task_signatures_lock(_s_task_signatures_lock);
+ lock_guard<std::mutex> task_signatures_lock(_s_task_signatures_lock);
request.__set_tasks(_s_task_signatures);
}
_handle_report(request, ReportType::TASK);
@@ -1130,8 +1119,10 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
while (_is_work) {
_is_doing_work = false;
// wait at most report_disk_state_interval_seconds, or being notified
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
- MonoDelta::FromSeconds(config::report_disk_state_interval_seconds));
+ worker_thread_lock,
+ std::chrono::seconds(config::report_disk_state_interval_seconds));
if (!_is_work) {
break;
}
@@ -1176,13 +1167,13 @@ void TaskWorkerPool::_report_tablet_worker_thread_callback() {
TReportRequest request;
request.__set_backend(_backend);
request.__isset.tablets = true;
-
while (_is_work) {
_is_doing_work = false;
// wait at most report_tablet_interval_seconds, or being notified
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait_for(
- MonoDelta::FromSeconds(config::report_tablet_interval_seconds));
+ worker_thread_lock, std::chrono::seconds(config::report_tablet_interval_seconds));
if (!_is_work) {
break;
}
@@ -1232,9 +1223,9 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TUploadReq upload_request;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -1294,9 +1285,9 @@ void TaskWorkerPool::_download_worker_thread_callback() {
TAgentTaskRequest agent_task_req;
TDownloadReq download_request;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -1358,9 +1349,9 @@ void TaskWorkerPool::_make_snapshot_thread_callback() {
TAgentTaskRequest agent_task_req;
TSnapshotRequest snapshot_request;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -1437,9 +1428,9 @@ void TaskWorkerPool::_release_snapshot_thread_callback() {
TAgentTaskRequest agent_task_req;
TReleaseSnapshotRequest release_snapshot_request;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -1504,9 +1495,9 @@ void TaskWorkerPool::_move_dir_thread_callback() {
TAgentTaskRequest agent_task_req;
TMoveDirReq move_dir_req;
{
- MutexLock worker_thread_lock(&(_worker_thread_lock));
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
@@ -1624,9 +1615,9 @@ void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
TCompactionReq compaction_req;
{
- lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait();
+ _worker_thread_condition_variable.wait(worker_thread_lock);
}
if (!_is_work) {
return;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index eec4ad2c65..1325091753 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -31,9 +31,7 @@
#include "gutil/ref_counted.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
-#include "util/condition_variable.h"
#include "util/countdown_latch.h"
-#include "util/mutex.h"
#include "util/thread.h"
namespace doris {
@@ -73,15 +71,11 @@ public:
SUBMIT_TABLE_COMPACTION
};
- enum ReportType {
- TASK,
- DISK,
- TABLET
- };
+ enum ReportType { TASK, DISK, TABLET };
enum class ThreadModel {
- SINGLE_THREAD, // Only 1 thread allowed in the pool
- MULTI_THREADS // 1 or more threads allowed in the pool
+ SINGLE_THREAD, // Only 1 thread allowed in the pool
+ MULTI_THREADS // 1 or more threads allowed in the pool
};
const std::string TYPE_STRING(TaskWorkerType type) {
@@ -224,8 +218,8 @@ private:
ExecEnv* _env;
// Protect task queue
- Mutex _worker_thread_lock;
- ConditionVariable _worker_thread_condition_variable;
+ std::mutex _worker_thread_lock;
+ std::condition_variable _worker_thread_condition_variable;
CountDownLatch _stop_background_threads_latch;
bool _is_work;
ThreadModel _thread_model;
@@ -245,7 +239,7 @@ private:
static FrontendServiceClientCache _master_service_client_cache;
static std::atomic_ulong _s_report_version;
- static Mutex _s_task_signatures_lock;
+ static std::mutex _s_task_signatures_lock;
static std::map<TTaskType::type, std::set<int64_t>> _s_task_signatures;
DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp
index b6ccfe289f..cdc99d54c8 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -16,8 +16,8 @@
// under the License.
#include "agent/topic_subscriber.h"
+
#include "common/logging.h"
-#include "util/mutex.h"
namespace doris {
@@ -38,13 +38,13 @@ TopicSubscriber::~TopicSubscriber() {
void TopicSubscriber::register_listener(TTopicType::type topic_type, TopicListener* listener) {
// Unique lock here to prevent access to listeners
- WriteLock lock(_listener_mtx);
+ std::lock_guard<std::shared_mutex> lock(_listener_mtx);
this->_registered_listeners[topic_type].push_back(listener);
}
void TopicSubscriber::handle_updates(const TAgentPublishRequest& agent_publish_request) {
// Shared lock here in order to avoid updates in listeners' map
- ReadLock lock(_listener_mtx);
+ std::shared_lock lock(_listener_mtx);
// Currently, not deal with protocol version, the listener should deal with protocol version
const std::vector<TTopicUpdate>& topic_updates = agent_publish_request.updates;
std::vector<TTopicUpdate>::const_iterator topic_update_it = topic_updates.begin();
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index ccfb4999e8..5d8af62815 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -69,7 +69,7 @@ namespace doris {
bool k_doris_exit = false;
void Daemon::tcmalloc_gc_thread() {
- while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(10))) {
+ while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(10))) {
size_t used_size = 0;
size_t free_size = 0;
@@ -89,13 +89,15 @@ void Daemon::tcmalloc_gc_thread() {
void Daemon::memory_maintenance_thread() {
while (!_stop_background_threads_latch.wait_for(
- MonoDelta::FromSeconds(config::memory_maintenance_sleep_time_s))) {
+ std::chrono::seconds(config::memory_maintenance_sleep_time_s))) {
ExecEnv* env = ExecEnv::GetInstance();
// ExecEnv may not have been created yet or this may be the catalogd or statestored,
// which don't have ExecEnvs.
if (env != nullptr) {
BufferPool* buffer_pool = env->buffer_pool();
- if (buffer_pool != nullptr) buffer_pool->Maintenance();
+ if (buffer_pool != nullptr) {
+ buffer_pool->Maintenance();
+ }
}
}
}
@@ -164,7 +166,7 @@ void Daemon::calculate_metrics_thread() {
DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes,
&lst_net_receive_bytes);
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(15)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
}
static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp
index ab78b5c6c3..0d65bfc6f5 100644
--- a/be/src/exec/broker_reader.cpp
+++ b/be/src/exec/broker_reader.cpp
@@ -25,7 +25,6 @@
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
-#include "util/monotime.h"
#include "util/thrift_util.h"
namespace doris {
@@ -92,7 +91,7 @@ Status BrokerReader::open() {
try {
client->openReader(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
- SleepFor(MonoDelta::FromSeconds(1));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
RETURN_IF_ERROR(client.reopen());
client->openReader(response, request);
}
@@ -163,7 +162,7 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea
try {
client->pread(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
- SleepFor(MonoDelta::FromSeconds(1));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
RETURN_IF_ERROR(client.reopen());
LOG(INFO) << "retry reading from broker: " << broker_addr << ". reason: " << e.what();
client->pread(response, request);
@@ -235,7 +234,7 @@ void BrokerReader::close() {
try {
client->closeReader(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
- SleepFor(MonoDelta::FromSeconds(1));
+ std::this_thread::sleep_for(std::chrono::seconds(1));
status = client.reopen();
if (!status.ok()) {
LOG(WARNING) << "Close broker reader failed. broker=" << broker_addr
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 6619c67aa6..e2ad167f59 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -84,7 +84,7 @@ Status OlapScanner::prepare(
return Status::InternalError(ss.str());
}
{
- ReadLock rdlock(_tablet->get_header_lock());
+ std::shared_lock rdlock(_tablet->get_header_lock());
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
if (rowset == nullptr) {
std::stringstream ss;
@@ -379,13 +379,13 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
const TypeDescriptor& item_type = desc->type().children.at(0);
auto pool = batch->tuple_data_pool();
CollectionValue::deep_copy_collection(
- slot, item_type, [pool](int size) -> MemFootprint {
- int64_t offset = pool->total_allocated_bytes();
- uint8_t* data = pool->allocate(size);
- return { offset, data };
- },
- false
- );
+ slot, item_type,
+ [pool](int size) -> MemFootprint {
+ int64_t offset = pool->total_allocated_bytes();
+ uint8_t* data = pool->allocate(size);
+ return {offset, data};
+ },
+ false);
}
// the memory allocate by mem pool has been copied,
// so we should release these memory immediately
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 27893cb2a0..77eb6f1dae 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -36,7 +36,6 @@
#include "util/brpc_client_cache.h"
#include "util/debug/sanitizer_scopes.h"
#include "util/defer_op.h"
-#include "util/monotime.h"
#include "util/proto_util.h"
#include "util/threadpool.h"
#include "util/time.h"
@@ -273,7 +272,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
(_pending_batches_bytes > _max_pending_batches_bytes ||
_parent->_mem_tracker->any_limit_exceeded())) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
- SleepFor(MonoDelta::FromMilliseconds(10));
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
auto row_no = _cur_batch->add_row();
@@ -325,7 +324,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
(_pending_batches_bytes > _max_pending_batches_bytes ||
_parent->_mem_tracker->any_limit_exceeded())) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
- SleepFor(MonoDelta::FromMilliseconds(10));
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
auto row_no = _cur_batch->add_row();
@@ -399,7 +398,7 @@ Status NodeChannel::close_wait(RuntimeState* state) {
// waiting for finished, it may take a long time, so we couldn't set a timeout
while (!_add_batches_finished && !_cancelled) {
- SleepFor(MonoDelta::FromMilliseconds(1));
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
_close_time_ms = UnixMillis() - _close_time_ms;
@@ -1316,7 +1315,7 @@ void OlapTableSink::_send_batch_process(RuntimeState* state) {
return;
}
} while (!_stop_background_threads_latch.wait_for(
- MonoDelta::FromMilliseconds(config::olap_table_sink_send_interval_ms)));
+ std::chrono::milliseconds(config::olap_table_sink_send_interval_ms)));
}
} // namespace stream_load
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index cf5ad40c35..cf2caf9e3a 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -111,7 +111,7 @@ public:
brpc::Join(cid);
}
if (_packet_in_flight) {
- SleepFor(MonoDelta::FromMilliseconds(10));
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
diff --git a/be/src/exprs/utility_functions.cpp b/be/src/exprs/utility_functions.cpp
index 03bcc68b83..2848b69099 100644
--- a/be/src/exprs/utility_functions.cpp
+++ b/be/src/exprs/utility_functions.cpp
@@ -17,11 +17,12 @@
#include "exprs/utility_functions.h"
+#include <thread>
+
#include "exprs/anyval_util.h"
#include "exprs/expr.h"
#include "runtime/tuple_row.h"
#include "util/debug_util.h"
-#include "util/monotime.h"
namespace doris {
@@ -35,7 +36,7 @@ BooleanVal UtilityFunctions::sleep(FunctionContext* ctx, const IntVal& seconds)
if (seconds.is_null) {
return BooleanVal::null();
}
- SleepFor(MonoDelta::FromSeconds(seconds.val));
+ std::this_thread::sleep_for(std::chrono::seconds(seconds.val));
return BooleanVal(true);
}
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c9e999c022..08a49b54b8 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -27,8 +27,7 @@ using std::vector;
namespace doris {
Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
- : _mem_tracker(
- MemTracker::create_tracker(-1, label, nullptr, MemTrackerLevel::INSTANCE)),
+ : _mem_tracker(MemTracker::create_tracker(-1, label, nullptr, MemTrackerLevel::INSTANCE)),
_tablet(tablet),
_input_rowsets_size(0),
_input_row_num(0),
@@ -129,7 +128,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
int64_t current_max_version;
{
- ReadLock rdlock(_tablet->get_header_lock());
+ std::shared_lock rdlock(_tablet->get_header_lock());
current_max_version = _tablet->rowset_with_max_version()->end_version();
}
@@ -179,7 +178,7 @@ void Compaction::modify_rowsets() {
std::vector<RowsetSharedPtr> output_rowsets;
output_rowsets.push_back(_output_rowset);
- WriteLock wrlock(_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
_tablet->modify_rowsets(output_rowsets, _input_rowsets);
_tablet->save_meta();
}
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 9283a33ca0..d882391bb7 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -45,9 +45,9 @@
#include "service/backend_options.h"
#include "util/errno.h"
#include "util/file_utils.h"
-#include "util/monotime.h"
#include "util/storage_backend.h"
#include "util/storage_backend_mgr.h"
+
#include "util/string_util.h"
using strings::Substitute;
@@ -520,19 +520,20 @@ OLAPStatus DataDir::load() {
// At startup, we only count these invalid rowset, but do not actually delete it.
// The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
// which is cleaned up uniformly by the background cleanup thread.
- LOG(INFO) << "finish to load tablets from " << _path_desc.filepath << ", total rowset meta: "
- << dir_rowset_metas.size() << ", invalid rowset num: " << invalid_rowset_counter;
+ LOG(INFO) << "finish to load tablets from " << _path_desc.filepath
+ << ", total rowset meta: " << dir_rowset_metas.size()
+ << ", invalid rowset num: " << invalid_rowset_counter;
return OLAP_SUCCESS;
}
void DataDir::add_pending_ids(const std::string& id) {
- WriteLock wr_lock(_pending_path_mutex);
+ std::lock_guard<std::shared_mutex> wr_lock(_pending_path_mutex);
_pending_path_ids.insert(id);
}
void DataDir::remove_pending_ids(const std::string& id) {
- WriteLock wr_lock(_pending_path_mutex);
+ std::lock_guard<std::shared_mutex> wr_lock(_pending_path_mutex);
_pending_path_ids.erase(id);
}
@@ -549,7 +550,8 @@ void DataDir::perform_path_gc_by_tablet() {
for (const auto& path : _all_tablet_schemahash_paths) {
++counter;
if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) {
- SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms));
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
}
TTabletId tablet_id = -1;
TSchemaHash schema_hash = -1;
@@ -599,7 +601,8 @@ void DataDir::perform_path_gc_by_rowsetid() {
for (const auto& path : _all_check_paths) {
++counter;
if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) {
- SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms));
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(config::path_gc_check_step_interval_ms));
}
TTabletId tablet_id = -1;
TSchemaHash schema_hash = -1;
@@ -702,7 +705,7 @@ void DataDir::_process_garbage_path(const std::string& path) {
}
bool DataDir::_check_pending_ids(const std::string& id) {
- ReadLock rd_lock(_pending_path_mutex);
+ std::shared_lock rd_lock(_pending_path_mutex);
return _pending_path_ids.find(id) != _pending_path_ids.end();
}
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index f60eab7a59..5af18d6991 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -31,7 +31,6 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset_id_generator.h"
#include "util/metrics.h"
-#include "util/mutex.h"
namespace doris {
@@ -53,7 +52,7 @@ public:
void stop_bg_worker();
const std::string& path() const { return _path_desc.filepath; }
- const FilePathDesc& path_desc() const { return _path_desc;}
+ const FilePathDesc& path_desc() const { return _path_desc; }
size_t path_hash() const { return _path_hash; }
bool is_used() const { return _is_used; }
void set_is_used(bool is_used) { _is_used = is_used; }
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 5d22d0f158..60ea864282 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -105,7 +105,7 @@ OLAPStatus DeltaWriter::init() {
}
{
- ReadLock base_migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
+ std::shared_lock base_migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
if (!base_migration_rlock.owns_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
@@ -139,8 +139,8 @@ OLAPStatus DeltaWriter::init() {
_reset_mem_table();
// create flush handler
- RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token,
- writer_context.rowset_type, _req.is_high_priority));
+ RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(
+ &_flush_token, writer_context.rowset_type, _req.is_high_priority));
_is_init = true;
return OLAP_SUCCESS;
@@ -211,7 +211,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
// and at that time, the writer may not be initialized yet and that is a normal case.
return OLAP_SUCCESS;
}
-
+
if (_is_cancelled) {
return OLAP_ERR_ALREADY_CANCELLED;
}
@@ -219,8 +219,8 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
if (mem_consumption() == _mem_table->memory_usage()) {
// equal means there is no memtable in flush queue, just flush this memtable
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
- << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
- << ", load id: " << print_id(_req.load_id);
+ << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
+ << ", load id: " << print_id(_req.load_id);
RETURN_NOT_OK(_flush_memtable_async());
_reset_mem_table();
} else {
@@ -275,7 +275,8 @@ OLAPStatus DeltaWriter::close() {
return OLAP_SUCCESS;
}
-OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec, bool is_broken) {
+OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+ bool is_broken) {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";
@@ -312,9 +313,8 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
_delta_written_success = true;
const FlushStatistic& stat = _flush_token->get_stats();
- VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id()
- << ", load id: " << print_id(_req.load_id)
- << ", stats: " << stat;
+ VLOG_CRITICAL << "close delta writer for tablet: " << _tablet->tablet_id()
+ << ", load id: " << print_id(_req.load_id) << ", stats: " << stat;
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 51e1d0ad1b..d0d656a882 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -17,7 +17,6 @@
#include "olap/olap_common.h"
#include "runtime/mem_tracker.h"
#include "util/metrics.h"
-#include "util/mutex.h"
#include "util/slice.h"
namespace doris {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 196580ed72..d27f12be20 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -22,8 +22,8 @@
#include <boost/algorithm/string.hpp>
#include <cmath>
#include <ctime>
-#include <string>
#include <random>
+#include <string>
#include "agent/cgroups_mgr.h"
#include "common/status.h"
@@ -131,7 +131,7 @@ void StorageEngine::_fd_cache_clean_callback() {
ProfilerRegisterThread();
#endif
int32_t interval = 600;
- while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))) {
+ while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
interval = config::cache_clean_interval;
if (interval <= 0) {
OLAP_LOG_WARNING(
@@ -165,7 +165,7 @@ void StorageEngine::_garbage_sweeper_thread_callback() {
double usage = 1.0;
// After the program starts, the first round of cleaning starts after min_interval.
uint32_t curr_interval = min_interval;
- while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(curr_interval))) {
+ while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval))) {
// Function properties:
// when usage < 0.6, ratio close to 1.(interval close to max_interval)
// when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27.
@@ -205,7 +205,7 @@ void StorageEngine::_disk_stat_monitor_thread_callback() {
<< ", force set to 1";
interval = 1;
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
void StorageEngine::check_cumulative_compaction_config() {
@@ -242,7 +242,7 @@ void StorageEngine::_unused_rowset_monitor_thread_callback() {
<< ", force set to 1";
interval = 1;
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
@@ -265,7 +265,7 @@ void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
<< "will be forced set to half hour";
interval = 1800; // 0.5 hour
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) {
@@ -284,7 +284,7 @@ void StorageEngine::_path_scan_thread_callback(DataDir* data_dir) {
<< "will be forced set to one day";
interval = 24 * 3600; // one day
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
@@ -305,7 +305,7 @@ void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& dat
}
}
interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
void StorageEngine::_compaction_tasks_producer_callback() {
@@ -338,24 +338,30 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int64_t interval = config::generate_compaction_tasks_min_interval_ms;
do {
if (!config::disable_auto_compaction) {
- VLOG_CRITICAL << "compaction thread pool. num_threads: " << _compaction_thread_pool->num_threads()
- << ", num_threads_pending_start: " << _compaction_thread_pool->num_threads_pending_start()
- << ", num_active_threads: " << _compaction_thread_pool->num_active_threads()
- << ", max_threads: " << _compaction_thread_pool->max_threads()
- << ", min_threads: " << _compaction_thread_pool->min_threads()
- << ", num_total_queued_tasks: " << _compaction_thread_pool->get_queue_size();
-
- if(_compaction_thread_pool->max_threads() != config::max_compaction_threads) {
+ VLOG_CRITICAL << "compaction thread pool. num_threads: "
+ << _compaction_thread_pool->num_threads()
+ << ", num_threads_pending_start: "
+ << _compaction_thread_pool->num_threads_pending_start()
+ << ", num_active_threads: "
+ << _compaction_thread_pool->num_active_threads()
+ << ", max_threads: " << _compaction_thread_pool->max_threads()
+ << ", min_threads: " << _compaction_thread_pool->min_threads()
+ << ", num_total_queued_tasks: "
+ << _compaction_thread_pool->get_queue_size();
+
+ if (_compaction_thread_pool->max_threads() != config::max_compaction_threads) {
int old_max_threads = _compaction_thread_pool->max_threads();
- Status status = _compaction_thread_pool->set_max_threads(config::max_compaction_threads);
+ Status status =
+ _compaction_thread_pool->set_max_threads(config::max_compaction_threads);
if (status.ok()) {
LOG(INFO) << "update compaction thread pool max_threads from "
<< old_max_threads << " to " << config::max_compaction_threads;
}
}
- if(_compaction_thread_pool->min_threads() != config::max_compaction_threads) {
+ if (_compaction_thread_pool->min_threads() != config::max_compaction_threads) {
int old_min_threads = _compaction_thread_pool->min_threads();
- Status status = _compaction_thread_pool->set_min_threads(config::max_compaction_threads);
+ Status status =
+ _compaction_thread_pool->set_min_threads(config::max_compaction_threads);
if (status.ok()) {
LOG(INFO) << "update compaction thread pool min_threads from "
<< old_min_threads << " to " << config::max_compaction_threads;
@@ -399,20 +405,19 @@ void StorageEngine::_compaction_tasks_producer_callback() {
for (const auto& tablet : tablets_compaction) {
Status st = _submit_compaction_task(tablet, compaction_type);
if (!st.ok()) {
- LOG(WARNING) << "failed to submit compaction task for tablet: " << tablet->tablet_id()
- << ", err: " << st.get_error_msg();
+ LOG(WARNING) << "failed to submit compaction task for tablet: "
+ << tablet->tablet_id() << ", err: " << st.get_error_msg();
}
}
interval = config::generate_compaction_tasks_min_interval_ms;
} else {
interval = config::check_auto_compaction_interval_seconds * 1000;
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromMilliseconds(interval)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
}
std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
-
_update_cumulative_compaction_policy();
std::vector<TabletSharedPtr> tablets_compaction;
@@ -439,7 +444,8 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
// If so, the last Slot can be assigned to Base compaction,
// otherwise, this Slot needs to be reserved for cumulative compaction.
int count = copied_cumu_map[data_dir].size() + copied_base_map[data_dir].size();
- int thread_per_disk = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk : config::compaction_task_num_per_disk;
+ int thread_per_disk = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk
+ : config::compaction_task_num_per_disk;
if (count >= thread_per_disk) {
// Return if no available slot
need_pick_tablet = false;
@@ -517,10 +523,14 @@ bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table
bool already_existed = false;
switch (compaction_type) {
case CompactionType::CUMULATIVE_COMPACTION:
- already_existed = !(_tablet_submitted_cumu_compaction[tablet->data_dir()].insert(tablet->tablet_id()).second);
+ already_existed = !(_tablet_submitted_cumu_compaction[tablet->data_dir()]
+ .insert(tablet->tablet_id())
+ .second);
break;
default:
- already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()].insert(tablet->tablet_id()).second);
+ already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()]
+ .insert(tablet->tablet_id())
+ .second);
break;
}
return already_existed;
@@ -546,7 +556,8 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
}
}
-Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type) {
+Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
+ CompactionType compaction_type) {
bool already_exist = _push_tablet_into_submitted_compaction(tablet, compaction_type);
if (already_exist) {
return Status::AlreadyExist(strings::Substitute(
@@ -557,21 +568,22 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Compaction
Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits);
if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
auto st = _compaction_thread_pool->submit_func([=]() {
- CgroupsMgr::apply_system_cgroup();
- tablet->execute_compaction(compaction_type);
- _permit_limiter.release(permits);
- // reset compaction
- tablet->reset_compaction(compaction_type);
- _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+ CgroupsMgr::apply_system_cgroup();
+ tablet->execute_compaction(compaction_type);
+ _permit_limiter.release(permits);
+ // reset compaction
+ tablet->reset_compaction(compaction_type);
+ _pop_tablet_from_submitted_compaction(tablet, compaction_type);
});
if (!st.ok()) {
_permit_limiter.release(permits);
// reset compaction
tablet->reset_compaction(compaction_type);
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
- return Status::InternalError(strings::Substitute(
- "failed to submit compaction task to thread pool, tablet_id=$0, compaction_type=$1.",
- tablet->tablet_id(), compaction_type));
+ return Status::InternalError(
+ strings::Substitute("failed to submit compaction task to thread pool, "
+ "tablet_id=$0, compaction_type=$1.",
+ tablet->tablet_id(), compaction_type));
}
return Status::OK();
} else {
@@ -588,10 +600,12 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Compaction
}
}
-Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type) {
+Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
+ CompactionType compaction_type) {
_update_cumulative_compaction_policy();
if (tablet->get_cumulative_compaction_policy() == nullptr ||
- tablet->get_cumulative_compaction_policy()->name() != _cumulative_compaction_policy->name()) {
+ tablet->get_cumulative_compaction_policy()->name() !=
+ _cumulative_compaction_policy->name()) {
tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy);
}
return _submit_compaction_task(tablet, compaction_type);
diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp
index 2b7000ec23..3c032d4467 100644
--- a/be/src/olap/out_stream.cpp
+++ b/be/src/olap/out_stream.cpp
@@ -21,7 +21,6 @@
#include "olap/file_helper.h"
#include "olap/utils.h"
#include "util/mem_util.hpp"
-#include "util/monotime.h"
namespace doris {
@@ -228,9 +227,9 @@ OLAPStatus OutStream::_spill() {
_spilled_bytes += sizeof(StreamHead) + output_bytes;
} else {
- // directly output _current
- // If there is _compress before, output m_compress first
- // Note that there must be no _overflow at this time
+ // directly output _current
+ // If there is _compress before, output m_compress first
+ // Note that there must be no _overflow at this time
_compressed->set_position(head_pos);
if (head_pos != 0) {
@@ -252,11 +251,11 @@ OLAPStatus OutStream::write(const char* buffer, uint64_t length) {
uint64_t remain = length;
while (remain > 0) {
- // The reason why it was thrown in is because in the case of compression, _current will only be created once
- // It has been multiplexing since then, and the output is compress
- // In the case of uncompressed, current will be put into the list and cannot be reused. The reason is
- // If it is reused, the previous content will be modified, so it needs to be redistributed.
- // Only allocate once and the second block will hang up
+ // The reason why it was thrown in is because in the case of compression, _current will only be created once
+ // It has been multiplexing since then, and the output is compress
+ // In the case of uncompressed, current will be put into the list and cannot be reused. The reason is
+ // If it is reused, the previous content will be modified, so it needs to be redistributed.
+ // Only allocate once and the second block will hang up
if (nullptr == _current) {
res = _create_new_input_buffer();
if (OLAP_SUCCESS != res) {
@@ -360,7 +359,7 @@ OLAPStatus OutStream::write_to_file(FileHandler* file_handle, uint32_t write_mby
if (sleep_time > 0) {
VLOG_TRACE << "sleep to limit merge speed. time=" << sleep_time
<< ", bytes=" << total_stream_len;
- SleepFor(MonoDelta::FromMicroseconds(sleep_time));
+ std::this_thread::sleep_for(std::chrono::microseconds(sleep_time));
}
}
}
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 18da6064a4..234db7ad63 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -87,7 +87,7 @@ OLAPStatus PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TP
return OLAP_ERR_TABLE_NOT_FOUND;
}
- ReadLock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock);
+ std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock);
if (!base_migration_rlock.owns_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
@@ -119,9 +119,9 @@ OLAPStatus PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TP
DeletePredicatePB del_pred;
DeleteConditionHandler del_cond_handler;
{
- ReadLock rdlock(tablet_var.tablet->get_header_lock());
- res = del_cond_handler.generate_delete_predicate(tablet_var.tablet->tablet_schema(),
- request.delete_conditions, &del_pred);
+ std::shared_lock rdlock(tablet_var.tablet->get_header_lock());
+ res = del_cond_handler.generate_delete_predicate(
+ tablet_var.tablet->tablet_schema(), request.delete_conditions, &del_pred);
del_preds.push(del_pred);
}
if (res != OLAP_SUCCESS) {
@@ -298,7 +298,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr
if (reader->eof()) {
break;
}
- //if read row but fill tuple fails,
+ //if read row but fill tuple fails,
if (!reader->is_fill_tuple()) {
break;
}
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 07927ba10e..ddab183de4 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -120,7 +120,8 @@ OLAPStatus TabletReader::init(const ReaderParams& read_params) {
}
// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation
-bool TabletReader::_optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
+bool TabletReader::_optimize_for_single_rowset(
+ const std::vector<RowsetReaderSharedPtr>& rs_readers) {
bool has_delete_rowset = false;
bool has_overlapping = false;
int nonoverlapping_count = 0;
@@ -144,7 +145,7 @@ bool TabletReader::_optimize_for_single_rowset(const std::vector<RowsetReaderSha
}
OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params,
- std::vector<RowsetReaderSharedPtr>* valid_rs_readers) {
+ std::vector<RowsetReaderSharedPtr>* valid_rs_readers) {
const std::vector<RowsetReaderSharedPtr>* rs_readers = &read_params.rs_readers;
if (rs_readers->empty()) {
LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name();
@@ -452,102 +453,102 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
}
}
-#define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE) \
- ColumnPredicate* TabletReader::_new_##NAME##_pred(const TabletColumn& column, int index, \
- const std::string& cond, bool opposite) const { \
- ColumnPredicate* predicate = nullptr; \
- switch (column.type()) { \
- case OLAP_FIELD_TYPE_TINYINT: { \
- int8_t value = 0; \
- std::from_chars(cond.data(), cond.data() + cond.size(), value); \
- predicate = new PREDICATE<int8_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_SMALLINT: { \
- int16_t value = 0; \
- std::from_chars(cond.data(), cond.data() + cond.size(), value); \
- predicate = new PREDICATE<int16_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_INT: { \
- int32_t value = 0; \
- std::from_chars(cond.data(), cond.data() + cond.size(), value); \
- predicate = new PREDICATE<int32_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_BIGINT: { \
- int64_t value = 0; \
- std::from_chars(cond.data(), cond.data() + cond.size(), value); \
- predicate = new PREDICATE<int64_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_LARGEINT: { \
- int128_t value = 0; \
- StringParser::ParseResult result; \
- value = StringParser::string_to_int<__int128>(cond.data(), cond.size(), &result); \
- predicate = new PREDICATE<int128_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_DECIMAL: { \
- decimal12_t value = {0, 0}; \
- value.from_string(cond); \
- predicate = new PREDICATE<decimal12_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_CHAR: { \
- StringValue value; \
- size_t length = std::max(static_cast<size_t>(column.length()), cond.length()); \
- char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length)); \
- memset(buffer, 0, length); \
- memory_copy(buffer, cond.c_str(), cond.length()); \
- value.len = length; \
- value.ptr = buffer; \
- predicate = new PREDICATE<StringValue>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_VARCHAR: \
- case OLAP_FIELD_TYPE_STRING: { \
- StringValue value; \
- int32_t length = cond.length(); \
- char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length)); \
- memory_copy(buffer, cond.c_str(), length); \
- value.len = length; \
- value.ptr = buffer; \
- predicate = new PREDICATE<StringValue>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_DATE: { \
- uint24_t value = timestamp_from_date(cond); \
- predicate = new PREDICATE<uint24_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_DATETIME: { \
- uint64_t value = timestamp_from_datetime(cond); \
- predicate = new PREDICATE<uint64_t>(index, value, opposite); \
- break; \
- } \
- case OLAP_FIELD_TYPE_BOOL: { \
- int32_t ivalue = 0; \
- auto result = std::from_chars(cond.data(), cond.data() + cond.size(), ivalue); \
- bool value = false; \
- if (result.ec == std::errc()) { \
- if (ivalue == 0) { \
- value = false; \
- } else { \
- value = true; \
- } \
- } else { \
- StringParser::ParseResult parse_result; \
- value = StringParser::string_to_bool(cond.data(), cond.size(), &parse_result); \
- } \
- predicate = new PREDICATE<bool>(index, value, opposite); \
- break; \
- } \
- default: \
- break; \
- } \
- \
- return predicate; \
+#define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE) \
+ ColumnPredicate* TabletReader::_new_##NAME##_pred( \
+ const TabletColumn& column, int index, const std::string& cond, bool opposite) const { \
+ ColumnPredicate* predicate = nullptr; \
+ switch (column.type()) { \
+ case OLAP_FIELD_TYPE_TINYINT: { \
+ int8_t value = 0; \
+ std::from_chars(cond.data(), cond.data() + cond.size(), value); \
+ predicate = new PREDICATE<int8_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_SMALLINT: { \
+ int16_t value = 0; \
+ std::from_chars(cond.data(), cond.data() + cond.size(), value); \
+ predicate = new PREDICATE<int16_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_INT: { \
+ int32_t value = 0; \
+ std::from_chars(cond.data(), cond.data() + cond.size(), value); \
+ predicate = new PREDICATE<int32_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_BIGINT: { \
+ int64_t value = 0; \
+ std::from_chars(cond.data(), cond.data() + cond.size(), value); \
+ predicate = new PREDICATE<int64_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_LARGEINT: { \
+ int128_t value = 0; \
+ StringParser::ParseResult result; \
+ value = StringParser::string_to_int<__int128>(cond.data(), cond.size(), &result); \
+ predicate = new PREDICATE<int128_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_DECIMAL: { \
+ decimal12_t value = {0, 0}; \
+ value.from_string(cond); \
+ predicate = new PREDICATE<decimal12_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_CHAR: { \
+ StringValue value; \
+ size_t length = std::max(static_cast<size_t>(column.length()), cond.length()); \
+ char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length)); \
+ memset(buffer, 0, length); \
+ memory_copy(buffer, cond.c_str(), cond.length()); \
+ value.len = length; \
+ value.ptr = buffer; \
+ predicate = new PREDICATE<StringValue>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_VARCHAR: \
+ case OLAP_FIELD_TYPE_STRING: { \
+ StringValue value; \
+ int32_t length = cond.length(); \
+ char* buffer = reinterpret_cast<char*>(_predicate_mem_pool->allocate(length)); \
+ memory_copy(buffer, cond.c_str(), length); \
+ value.len = length; \
+ value.ptr = buffer; \
+ predicate = new PREDICATE<StringValue>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_DATE: { \
+ uint24_t value = timestamp_from_date(cond); \
+ predicate = new PREDICATE<uint24_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_DATETIME: { \
+ uint64_t value = timestamp_from_datetime(cond); \
+ predicate = new PREDICATE<uint64_t>(index, value, opposite); \
+ break; \
+ } \
+ case OLAP_FIELD_TYPE_BOOL: { \
+ int32_t ivalue = 0; \
+ auto result = std::from_chars(cond.data(), cond.data() + cond.size(), ivalue); \
+ bool value = false; \
+ if (result.ec == std::errc()) { \
+ if (ivalue == 0) { \
+ value = false; \
+ } else { \
+ value = true; \
+ } \
+ } else { \
+ StringParser::ParseResult parse_result; \
+ value = StringParser::string_to_bool(cond.data(), cond.size(), &parse_result); \
+ } \
+ predicate = new PREDICATE<bool>(index, value, opposite); \
+ break; \
+ } \
+ default: \
+ break; \
+ } \
+ \
+ return predicate; \
}
COMPARISON_PREDICATE_CONDITION_VALUE(eq, EqualPredicate)
@@ -568,7 +569,8 @@ ColumnPredicate* TabletReader::_parse_to_predicate(
column.type());
}
-ColumnPredicate* TabletReader::_parse_to_predicate(const TCondition& condition, bool opposite) const {
+ColumnPredicate* TabletReader::_parse_to_predicate(const TCondition& condition,
+ bool opposite) const {
// TODO: not equal and not in predicate is not pushed down
int32_t index = _tablet->field_index(condition.column_name);
if (index < 0) {
@@ -761,7 +763,7 @@ void TabletReader::_init_load_bf_columns(const ReaderParams& read_params) {
}
void TabletReader::_init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions,
- std::set<uint32_t>* load_bf_columns) {
+ std::set<uint32_t>* load_bf_columns) {
// add all columns with condition to load_bf_columns
for (const auto& cond_column : conditions->columns()) {
if (!_tablet->tablet_schema().column(cond_column.first).is_bf_column()) {
@@ -820,9 +822,9 @@ OLAPStatus TabletReader::_init_delete_condition(const ReaderParams& read_params)
}
OLAPStatus ret;
{
- ReadLock rdlock(_tablet->get_header_lock());
+ std::shared_lock rdlock(_tablet->get_header_lock());
ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
- read_params.version.second, this);
+ read_params.version.second, this);
}
// Only BASE_COMPACTION need set filter_delete = true
// other reader type:
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index edbfa77c9c..ebe69f8b75 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -53,9 +53,7 @@ class RowBlockSorter {
public:
explicit RowBlockSorter(RowBlockAllocator* allocator);
virtual ~RowBlockSorter();
- size_t num_rows() {
- return _swap_row_block != nullptr ? _swap_row_block->capacity() : 0;
- }
+ size_t num_rows() { return _swap_row_block != nullptr ? _swap_row_block->capacity() : 0; }
bool sort(RowBlock** row_block);
@@ -690,7 +688,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
}
if (_row_block_allocator->allocate(&_swap_row_block, row_num, null_supported) !=
- OLAP_SUCCESS) {
+ OLAP_SUCCESS) {
LOG(WARNING) << "fail to allocate memory.";
return false;
}
@@ -758,11 +756,12 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo
if (_memory_limitation > 0 &&
_mem_tracker->consumption() + row_block_size > _memory_limitation) {
- LOG(WARNING) << "RowBlockAllocator::alocate() memory exceeded. "
- << "m_memory_allocated=" << _mem_tracker->consumption() << " "
- << "mem limit for schema change=" << _memory_limitation << " "
- << "You can increase the memory "
- << "by changing the Config.memory_limitation_per_thread_for_schema_change_bytes";
+ LOG(WARNING)
+ << "RowBlockAllocator::alocate() memory exceeded. "
+ << "m_memory_allocated=" << _mem_tracker->consumption() << " "
+ << "mem limit for schema change=" << _memory_limitation << " "
+ << "You can increase the memory "
+ << "by changing the Config.memory_limitation_per_thread_for_schema_change_bytes";
*row_block = nullptr;
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
@@ -924,13 +923,13 @@ void RowBlockMerger::_pop_heap() {
OLAPStatus LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet) {
-
// In some cases, there may be more than one type of rowset in a tablet,
// in which case the conversion cannot be done directly by linked schema change,
// but requires direct schema change to rewrite the data.
if (rowset_reader->type() != new_rowset_writer->type()) {
- LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id() << " in base tablet " << base_tablet->tablet_id()
- << " is not same as type " << new_rowset_writer->type() << ", use direct schema change.";
+ LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id()
+ << " in base tablet " << base_tablet->tablet_id() << " is not same as type "
+ << new_rowset_writer->type() << ", use direct schema change.";
SchemaChangeDirectly scd(_row_block_changer);
return scd.process(rowset_reader, new_rowset_writer, new_tablet, base_tablet);
} else {
@@ -938,10 +937,10 @@ OLAPStatus LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert rowset."
- << ", new_tablet=" << new_tablet->full_name()
- << ", base_tablet=" << base_tablet->full_name()
- << ", version=" << new_rowset_writer->version().first << "-"
- << new_rowset_writer->version().second;
+ << ", new_tablet=" << new_tablet->full_name()
+ << ", base_tablet=" << base_tablet->full_name()
+ << ", version=" << new_rowset_writer->version().first << "-"
+ << new_rowset_writer->version().second;
}
return status;
}
@@ -1014,8 +1013,7 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
}
OLAPStatus res = OLAP_SUCCESS;
- if (rowset_reader->rowset()->empty() ||
- rowset_reader->rowset()->num_rows() == 0) {
+ if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
res = rowset_writer->flush();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "create empty version for schema change failed."
@@ -1146,7 +1144,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> src_rowsets;
- Defer defer{[&]() {
+ Defer defer {[&]() {
// remove the intermediate rowsets generated by internal sorting
for (auto& row_set : src_rowsets) {
StorageEngine::instance()->add_unused_rowset(row_set);
@@ -1177,10 +1175,10 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
LOG(WARNING) << "failed to allocate RowBlock.";
return OLAP_ERR_INPUT_PARAMETER_ERROR;
} else {
- // do memory check for sorting, in case schema change task fail at row block sorting because of
+ // do memory check for sorting, in case schema change task fail at row block sorting because of
// not doing internal sorting first
- if (!_row_block_allocator->is_memory_enough_for_sorting(ref_row_block->row_block_info().row_num,
- row_block_sorter.num_rows())) {
+ if (!_row_block_allocator->is_memory_enough_for_sorting(
+ ref_row_block->row_block_info().row_num, row_block_sorter.num_rows())) {
if (new_row_block != nullptr) {
_row_block_allocator->release(new_row_block);
new_row_block = nullptr;
@@ -1204,8 +1202,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
if (!_internal_sorting(
row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
- new_tablet, new_rowset_type,
- segments_overlap, &rowset)) {
+ new_tablet, new_rowset_type, segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return OLAP_ERR_ALTER_STATUS_ERR;
}
@@ -1260,8 +1257,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
if (!_internal_sorting(row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
- new_tablet, new_rowset_type,
- segments_overlap, &rowset)) {
+ new_tablet, new_rowset_type, segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return OLAP_ERR_ALTER_STATUS_ERR;
}
@@ -1312,8 +1308,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& row_block_arr,
- const Version& version,
- TabletSharedPtr new_tablet,
+ const Version& version, TabletSharedPtr new_tablet,
RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap,
RowsetSharedPtr* rowset) {
@@ -1398,7 +1393,8 @@ OLAPStatus SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2&
return OLAP_ERR_TABLE_NOT_FOUND;
}
// Lock schema_change_lock util schema change info is stored in tablet header
- std::unique_lock<std::mutex> schema_change_lock(base_tablet->get_schema_change_lock(), std::try_to_lock);
+ std::unique_lock<std::mutex> schema_change_lock(base_tablet->get_schema_change_lock(),
+ std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
LOG(WARNING) << "failed to obtain schema change lock. "
<< "base_tablet=" << request.base_tablet_id;
@@ -1446,11 +1442,11 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
<< " base_tablet=" << base_tablet->full_name()
<< " new_tablet=" << new_tablet->full_name();
- ReadLock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock);
+ std::shared_lock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock);
if (!base_migration_rlock.owns_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
- ReadLock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock);
+ std::shared_lock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock);
if (!new_migration_rlock.owns_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
@@ -1466,8 +1462,8 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
{
std::lock_guard<std::mutex> base_tablet_lock(base_tablet->get_push_lock());
std::lock_guard<std::mutex> new_tablet_lock(new_tablet->get_push_lock());
- WriteLock base_tablet_rdlock(base_tablet->get_header_lock());
- WriteLock new_tablet_rdlock(new_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> base_tablet_rdlock(base_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> new_tablet_rdlock(new_tablet->get_header_lock());
// check if the tablet has alter task
// if it has alter task, it means it is under old alter process
size_t num_cols = base_tablet->tablet_schema().num_columns();
@@ -1537,11 +1533,11 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
}
}
- res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(),
- end_version);
+ res = delete_handler.init(base_tablet->tablet_schema(),
+ base_tablet->delete_predicates(), end_version);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name()
- << ", end_version=" << end_version;
+ LOG(WARNING) << "init delete handler failed. base_tablet="
+ << base_tablet->full_name() << ", end_version=" << end_version;
// release delete handlers which have been inited successfully.
delete_handler.finalize();
@@ -1615,7 +1611,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
break;
}
// set state to ready
- WriteLock new_wlock(new_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
if (res != OLAP_SUCCESS) {
break;
@@ -1669,8 +1665,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
LOG(INFO) << "doing schema change with sorting for base_tablet "
<< base_tablet->full_name();
sc_procedure = new (nothrow) SchemaChangeWithSorting(
- rb_changer,
- config::memory_limitation_per_thread_for_schema_change_bytes);
+ rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
} else if (sc_directly) {
LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name();
sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer);
@@ -1821,8 +1816,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
LOG(INFO) << "doing schema change with sorting for base_tablet "
<< sc_params.base_tablet->full_name();
sc_procedure = new (nothrow) SchemaChangeWithSorting(
- rb_changer,
- config::memory_limitation_per_thread_for_schema_change_bytes);
+ rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
} else if (sc_directly) {
LOG(INFO) << "doing schema change directly for base_tablet "
<< sc_params.base_tablet->full_name();
@@ -1923,7 +1917,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
PROCESS_ALTER_EXIT : {
// save tablet meta here because rowset meta is not saved during add rowset
- WriteLock new_wlock(sc_params.new_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
sc_params.new_tablet->save_meta();
}
if (res == OLAP_SUCCESS) {
@@ -2000,14 +1994,13 @@ OLAPStatus SchemaChangeHandler::_parse_request(
*sc_directly = true;
}
- if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, new_column,
- new_column.default_value()))) {
- return res;
+ if (OLAP_SUCCESS !=
+ (res = _init_column_mapping(column_mapping, new_column, new_column.default_value()))) {
+ return res;
}
VLOG_TRACE << "A column with default value will be added after schema changing. "
- << "column=" << column_name
- << ", default_value=" << new_column.default_value();
+ << "column=" << column_name << ", default_value=" << new_column.default_value();
}
// Check if re-aggregation is needed.
@@ -2133,7 +2126,7 @@ OLAPStatus SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_table
std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
{
- ReadLock rdlock(new_tablet->get_header_lock());
+ std::shared_lock rdlock(new_tablet->get_header_lock());
new_tablet->acquire_version_and_rowsets(&version_rowsets);
}
for (auto& pair : version_rowsets) {
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 797cd59564..53e1718a23 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -128,7 +128,8 @@ OLAPStatus SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_des
// check clone dir existed
if (!FileUtils::check_exist(clone_dir_desc.filepath)) {
res = OLAP_ERR_DIR_NOT_EXIST;
- LOG(WARNING) << "clone dir not existed when convert rowsetids. clone_dir=" << clone_dir_desc.debug_string();
+ LOG(WARNING) << "clone dir not existed when convert rowsetids. clone_dir="
+ << clone_dir_desc.debug_string();
return res;
}
@@ -177,8 +178,8 @@ OLAPStatus SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_des
}
RowsetMetaPB* rowset_meta = new_tablet_meta_pb.add_stale_rs_metas();
RowsetId rowset_id = StorageEngine::instance()->next_rowset_id();
- RETURN_NOT_OK(
- _rename_rowset_id(stale_rowset, clone_dir_desc, tablet_schema, rowset_id, rowset_meta));
+ RETURN_NOT_OK(_rename_rowset_id(stale_rowset, clone_dir_desc, tablet_schema, rowset_id,
+ rowset_meta));
rowset_meta->set_tablet_id(tablet_id);
rowset_meta->set_tablet_schema_hash(schema_hash);
}
@@ -193,7 +194,8 @@ OLAPStatus SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_des
}
OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
- const FilePathDesc& new_path_desc, TabletSchema& tablet_schema,
+ const FilePathDesc& new_path_desc,
+ TabletSchema& tablet_schema,
const RowsetId& rowset_id,
RowsetMetaPB* new_rs_meta_pb) {
OLAPStatus res = OLAP_SUCCESS;
@@ -206,8 +208,8 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
RowsetMetaSharedPtr alpha_rowset_meta(new AlphaRowsetMeta());
alpha_rowset_meta->init_from_pb(rs_meta_pb);
RowsetSharedPtr org_rowset;
- RETURN_NOT_OK(
- RowsetFactory::create_rowset(&tablet_schema, new_path_desc, alpha_rowset_meta, &org_rowset));
+ RETURN_NOT_OK(RowsetFactory::create_rowset(&tablet_schema, new_path_desc, alpha_rowset_meta,
+ &org_rowset));
// do not use cache to load index
// because the index file may conflict
// and the cached fd may be invalid
@@ -265,7 +267,8 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path(const TabletSharedPtr& tablet
}
std::stringstream snapshot_id_path_stream;
- MutexLock auto_lock(&_snapshot_mutex); // will automatically unlock when function return.
+ std::unique_lock<std::mutex> auto_lock(
+ _snapshot_mutex); // will automatically unlock when function return.
snapshot_id_path_stream << tablet->data_dir()->path() << SNAPSHOT_PREFIX << "/" << time_str
<< "." << _snapshot_base_id++ << "." << timeout_s;
*out_path = snapshot_id_path_stream.str();
@@ -277,8 +280,8 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path(const TabletSharedPtr& tablet
FilePathDesc SnapshotManager::get_schema_hash_full_path(const TabletSharedPtr& ref_tablet,
const FilePathDesc& location_desc) const {
FilePathDescStream schema_full_path_desc_s;
- schema_full_path_desc_s << location_desc << "/" << ref_tablet->tablet_id()
- << "/" << ref_tablet->schema_hash();
+ schema_full_path_desc_s << location_desc << "/" << ref_tablet->tablet_id() << "/"
+ << ref_tablet->schema_hash();
return schema_full_path_desc_s.path_desc();
}
@@ -338,7 +341,8 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
FileUtils::remove_all(schema_full_path_desc.filepath);
}
- RETURN_WITH_WARN_IF_ERROR(FileUtils::create_dir(schema_full_path_desc.filepath), OLAP_ERR_CANNOT_CREATE_DIR,
+ RETURN_WITH_WARN_IF_ERROR(FileUtils::create_dir(schema_full_path_desc.filepath),
+ OLAP_ERR_CANNOT_CREATE_DIR,
"create path " + schema_full_path_desc.filepath + " failed");
string snapshot_id;
@@ -359,7 +363,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
/// If some of them not exist in tablet, we will fall back to
/// make the full snapshot of the tablet.
{
- ReadLock rdlock(ref_tablet->get_header_lock());
+ std::shared_lock rdlock(ref_tablet->get_header_lock());
if (request.__isset.missing_version) {
for (int64_t missed_version : request.missing_version) {
Version version = {missed_version, missed_version};
@@ -383,7 +387,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
res = OLAP_SUCCESS; // reset res
consistent_rowsets.clear(); // reset vector
- ReadLock rdlock(ref_tablet->get_header_lock());
+ std::shared_lock rdlock(ref_tablet->get_header_lock());
// get latest version
const RowsetSharedPtr last_version = ref_tablet->rowset_with_max_version();
if (last_version == nullptr) {
@@ -447,8 +451,8 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
if (snapshot_version == g_Types_constants.TSNAPSHOT_REQ_VERSION1) {
// convert beta rowset to alpha rowset
- res = _convert_beta_rowsets_to_alpha(
- new_tablet_meta, new_tablet_meta->all_rs_metas(), schema_full_path_desc);
+ res = _convert_beta_rowsets_to_alpha(new_tablet_meta, new_tablet_meta->all_rs_metas(),
+ schema_full_path_desc);
if (res != OLAP_SUCCESS) {
break;
}
@@ -499,8 +503,8 @@ OLAPStatus SnapshotManager::_convert_beta_rowsets_to_alpha(
if (rowset_meta->rowset_type() == BETA_ROWSET) {
modified = true;
RowsetMetaPB rowset_meta_pb;
- auto st =
- rowset_converter.convert_beta_to_alpha(rowset_meta, dst_path_desc, &rowset_meta_pb);
+ auto st = rowset_converter.convert_beta_to_alpha(rowset_meta, dst_path_desc,
+ &rowset_meta_pb);
if (st != OLAP_SUCCESS) {
res = st;
LOG(WARNING) << "convert beta to alpha failed"
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index c13a133ab6..0ae77784ad 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -100,7 +100,7 @@ private:
static std::mutex _mlock;
// snapshot
- Mutex _snapshot_mutex;
+ std::mutex _snapshot_mutex;
uint64_t _snapshot_base_id;
std::shared_ptr<MemTracker> _mem_tracker = nullptr;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index c999e1dd80..821f2463e1 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1024,13 +1024,13 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
task->get_related_tablets(&tablet_infos);
sort(tablet_infos.begin(), tablet_infos.end());
std::vector<TabletSharedPtr> related_tablets;
- std::vector<UniqueWriteLock> wrlocks;
+ std::vector<std::unique_lock<std::shared_mutex>> wrlocks;
for (TabletInfo& tablet_info : tablet_infos) {
TabletSharedPtr tablet =
_tablet_manager->get_tablet(tablet_info.tablet_id);
if (tablet != nullptr) {
related_tablets.push_back(tablet);
- wrlocks.push_back(UniqueWriteLock(tablet->get_header_lock()));
+ wrlocks.push_back(std::unique_lock<std::shared_mutex>(tablet->get_header_lock()));
} else {
LOG(WARNING) << "could not get tablet before prepare tabletid: "
<< tablet_info.tablet_id;
@@ -1055,13 +1055,13 @@ OLAPStatus StorageEngine::execute_task(EngineTask* task) {
task->get_related_tablets(&tablet_infos);
sort(tablet_infos.begin(), tablet_infos.end());
std::vector<TabletSharedPtr> related_tablets;
- std::vector<UniqueWriteLock> wrlocks;
+ std::vector<std::unique_lock<std::shared_mutex>> wrlocks;
for (TabletInfo& tablet_info : tablet_infos) {
TabletSharedPtr tablet =
_tablet_manager->get_tablet(tablet_info.tablet_id);
if (tablet != nullptr) {
related_tablets.push_back(tablet);
- wrlocks.push_back(UniqueWriteLock(tablet->get_header_lock()));
+ wrlocks.push_back(std::unique_lock<std::shared_mutex>(tablet->get_header_lock()));
} else {
LOG(WARNING) << "could not get tablet before finish tabletid: "
<< tablet_info.tablet_id;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 4a73c21e79..5460f198a9 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -360,7 +360,7 @@ private:
std::mutex _report_mtx;
std::set<TaskWorkerPool*> _report_listeners;
- Mutex _engine_task_mutex;
+ std::mutex _engine_task_mutex;
std::unique_ptr<TabletManager> _tablet_manager;
std::unique_ptr<TxnManager> _txn_manager;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9a4509e69d..35238819a1 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -207,7 +207,7 @@ OLAPStatus Tablet::revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& ro
OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
DCHECK(rowset != nullptr);
- WriteLock wrlock(_meta_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
// If the rowset already exist, just return directly. The rowset_id is an unique-id,
// we can use it to check this situation.
if (_contains_rowset(rowset->rowset_id())) {
@@ -369,7 +369,7 @@ RowsetSharedPtr Tablet::_rowset_with_largest_size() {
// add inc rowset should not persist tablet meta, because it will be persisted when publish txn.
OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
DCHECK(rowset != nullptr);
- WriteLock wrlock(_meta_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
if (_contains_rowset(rowset->rowset_id())) {
return OLAP_SUCCESS;
}
@@ -395,7 +395,7 @@ void Tablet::_delete_stale_rowset_by_version(const Version& version) {
void Tablet::delete_expired_stale_rowset() {
int64_t now = UnixSeconds();
- WriteLock wrlock(_meta_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
// Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
double expired_stale_sweep_endtime =
::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
@@ -513,10 +513,10 @@ void Tablet::delete_expired_stale_rowset() {
StorageEngine::instance()->add_unused_rowset(it->second);
_stale_rs_version_map.erase(it);
VLOG_NOTICE << "delete stale rowset tablet=" << full_name() << " version["
- << timestampedVersion->version().first << ","
- << timestampedVersion->version().second
- << "] move to unused_rowset success " << std::fixed
- << expired_stale_sweep_endtime;
+ << timestampedVersion->version().first << ","
+ << timestampedVersion->version().second
+ << "] move to unused_rowset success " << std::fixed
+ << expired_stale_sweep_endtime;
} else {
LOG(WARNING) << "delete stale rowset tablet=" << full_name() << " version["
<< timestampedVersion->version().first << ","
@@ -534,7 +534,7 @@ void Tablet::delete_expired_stale_rowset() {
<< " current_size=" << _stale_rs_version_map.size() << " old_size=" << old_size
<< " current_meta_size=" << _tablet_meta->all_stale_rs_metas().size()
<< " old_meta_size=" << old_meta_size << " sweep endtime " << std::fixed
- << expired_stale_sweep_endtime << ", reconstructed=" << reconstructed;
+ << expired_stale_sweep_endtime << ", reconstructed=" << reconstructed;
#ifndef BE_TEST
save_meta();
@@ -544,8 +544,8 @@ void Tablet::delete_expired_stale_rowset() {
bool Tablet::_reconstruct_version_tracker_if_necessary() {
double orphan_vertex_ratio = _timestamped_version_tracker.get_orphan_vertex_ratio();
if (orphan_vertex_ratio >= config::tablet_version_graph_orphan_vertex_ratio) {
- _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(),
- _tablet_meta->all_stale_rs_metas());
+ _timestamped_version_tracker.construct_versioned_tracker(
+ _tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas());
return true;
}
return false;
@@ -564,13 +564,13 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
// so to avoid print too many logs.
if (version_path != nullptr) {
LOG(WARNING) << "tablet:" << full_name()
- << ", version already has been merged. spec_version: " << spec_version;
+ << ", version already has been merged. spec_version: " << spec_version;
}
status = OLAP_ERR_VERSION_ALREADY_MERGED;
} else {
if (version_path != nullptr) {
LOG(WARNING) << "status:" << status << ", tablet:" << full_name()
- << ", missed version for version:" << spec_version;
+ << ", missed version for version:" << spec_version;
_print_missed_versions(missed_versions);
}
}
@@ -579,7 +579,7 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
}
OLAPStatus Tablet::check_version_integrity(const Version& version, bool quiet) {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
return capture_consistent_versions(version, nullptr, quiet);
}
@@ -594,7 +594,8 @@ bool Tablet::check_version_exist(const Version& version) const {
}
// The meta read lock should be held before calling
-void Tablet::acquire_version_and_rowsets(std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) const {
+void Tablet::acquire_version_and_rowsets(
+ std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) const {
for (const auto& it : _rs_version_map) {
version_rowsets->emplace_back(it.first, it.second);
}
@@ -704,7 +705,7 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
if (tablet_state() == TABLET_RUNNING) {
// if tablet state is running, we need to check if it has consistent versions.
// tablet in other state such as TABLET_NOTREADY may not have complete versions.
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
const RowsetSharedPtr lastest_delta = rowset_with_max_version();
if (lastest_delta == nullptr) {
return false;
@@ -722,7 +723,7 @@ uint32_t Tablet::calc_compaction_score(
CompactionType compaction_type,
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
// Need meta lock, because it will iterator "all_rs_metas" of tablet meta.
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
return _calc_cumulative_compaction_score(cumulative_compaction_policy);
} else {
@@ -740,8 +741,8 @@ const uint32_t Tablet::_calc_cumulative_compaction_score(
}
#endif
uint32_t score = 0;
- _cumulative_compaction_policy->calc_cumulative_compaction_score(tablet_state(),
- _tablet_meta->all_rs_metas(), cumulative_layer_point(), &score);
+ _cumulative_compaction_policy->calc_cumulative_compaction_score(
+ tablet_state(), _tablet_meta->all_rs_metas(), cumulative_layer_point(), &score);
return score;
}
@@ -766,7 +767,7 @@ const uint32_t Tablet::_calc_base_compaction_score() const {
}
void Tablet::calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions) {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
calc_missed_versions_unlocked(spec_version, missed_versions);
}
@@ -806,11 +807,12 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
}
void Tablet::max_continuous_version_from_beginning(Version* version, Version* max_version) {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
_max_continuous_version_from_beginning_unlocked(version, max_version);
}
-void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version) const {
+void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version,
+ Version* max_version) const {
std::vector<Version> existing_versions;
for (auto& rs : _tablet_meta->all_rs_metas()) {
existing_versions.emplace_back(rs->version());
@@ -838,7 +840,7 @@ void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, V
}
void Tablet::calculate_cumulative_point() {
- WriteLock wrlock(_meta_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
int64_t ret_cumulative_point;
_cumulative_compaction_policy->calculate_cumulative_point(
this, _tablet_meta->all_rs_metas(), _cumulative_point, &ret_cumulative_point);
@@ -901,7 +903,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTup
end_key.build_max_key();
}
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
RowsetSharedPtr rowset = _rowset_with_largest_size();
// 如果找不到合适的rowset,就直接返回startkey,endkey
@@ -918,7 +920,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTup
// NOTE: only used when create_table, so it is sure that there is no concurrent reader and writer.
void Tablet::delete_all_files() {
// Release resources like memory and disk space.
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
for (auto it : _rs_version_map) {
it.second->remove();
}
@@ -931,7 +933,7 @@ void Tablet::delete_all_files() {
}
bool Tablet::check_path(const std::string& path_to_check) const {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
if (path_to_check == _tablet_path_desc.filepath) {
return true;
}
@@ -961,7 +963,7 @@ bool Tablet::check_path(const std::string& path_to_check) const {
// 4. check in rowset meta manager return false. so that the rowset maybe checked return false it means it is useless and
// will be treated as a garbage.
bool Tablet::check_rowset_id(const RowsetId& rowset_id) {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
if (StorageEngine::instance()->rowset_id_in_use(rowset_id)) {
return true;
}
@@ -1022,13 +1024,13 @@ void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return;
}
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
_cumulative_compaction_policy->pick_candidate_rowsets(skip_window_sec, _rs_version_map,
_cumulative_point, candidate_rowsets);
}
void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
for (auto& it : _rs_version_map) {
if (it.first.first < _cumulative_point) {
candidate_rowsets->push_back(it.second);
@@ -1048,7 +1050,7 @@ void Tablet::get_compaction_status(std::string* json_result) {
std::vector<RowsetSharedPtr> stale_rowsets;
std::vector<bool> delete_flags;
{
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
rowsets.reserve(_rs_version_map.size());
for (auto& it : _rs_version_map) {
rowsets.push_back(it.second);
@@ -1104,7 +1106,9 @@ void Tablet::get_compaction_status(std::string* json_result) {
const Version& ver = rowsets[i]->version();
if (ver.first != last_version + 1) {
rapidjson::Value miss_value;
- miss_value.SetString(strings::Substitute("[$0-$1]", last_version + 1, ver.first).c_str(), missing_versions_arr.GetAllocator());
+ miss_value.SetString(
+ strings::Substitute("[$0-$1]", last_version + 1, ver.first).c_str(),
+ missing_versions_arr.GetAllocator());
missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
}
rapidjson::Value value;
@@ -1150,7 +1154,7 @@ void Tablet::get_compaction_status(std::string* json_result) {
}
bool Tablet::do_tablet_meta_checkpoint() {
- WriteLock store_lock(_meta_store_lock);
+ std::lock_guard<std::shared_mutex> store_lock(_meta_store_lock);
if (_newly_created_rowset_num == 0) {
return false;
}
@@ -1161,7 +1165,7 @@ bool Tablet::do_tablet_meta_checkpoint() {
}
// hold read-lock other than write-lock, because it will not modify meta structure
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
if (tablet_state() != TABLET_RUNNING) {
LOG(INFO) << "tablet is under state=" << tablet_state()
<< ", not running, skip do checkpoint"
@@ -1207,7 +1211,7 @@ bool Tablet::do_tablet_meta_checkpoint() {
}
bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
bool find_version = false;
for (auto& version_rowset : _rs_version_map) {
if (version_rowset.second->rowset_id() == rowset_meta->rowset_id()) {
@@ -1243,7 +1247,7 @@ bool Tablet::_contains_rowset(const RowsetId rowset_id) {
}
void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
tablet_info->tablet_id = _tablet_meta->tablet_id();
tablet_info->schema_hash = _tablet_meta->schema_hash();
tablet_info->row_count = _tablet_meta->num_rows();
@@ -1285,7 +1289,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
// there are some rowset meta in local meta store and in in-memory tablet meta
// but not in tablet meta in local meta store
void Tablet::generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
generate_tablet_meta_copy_unlocked(new_tablet_meta);
}
@@ -1311,8 +1315,7 @@ double Tablet::calculate_scan_frequency() {
}
Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compaction_type,
- TabletSharedPtr tablet,
- int64_t* permits) {
+ TabletSharedPtr tablet, int64_t* permits) {
std::vector<RowsetSharedPtr> compaction_rowsets;
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
scoped_refptr<Trace> trace(new Trace);
@@ -1385,7 +1388,8 @@ void Tablet::execute_compaction(CompactionType compaction_type) {
MonotonicStopWatch watch;
watch.start();
SCOPED_CLEANUP({
- if (!config::disable_compaction_trace_log && watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) {
+ if (!config::disable_compaction_trace_log &&
+ watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) {
LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL);
}
});
@@ -1407,7 +1411,8 @@ void Tablet::execute_compaction(CompactionType compaction_type) {
MonotonicStopWatch watch;
watch.start();
SCOPED_CLEANUP({
- if (!config::disable_compaction_trace_log && watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) {
+ if (!config::disable_compaction_trace_log &&
+ watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) {
LOG(WARNING) << "Trace:" << std::endl << trace->DumpToString(Trace::INCLUDE_ALL);
}
});
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 700f7d34f7..75235c96c8 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -250,8 +250,8 @@ public:
return _cumulative_compaction_policy;
}
- bool all_beta() const {
- ReadLock rdlock(_meta_lock);
+ inline bool all_beta() const {
+ std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
}
@@ -381,14 +381,14 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
// TODO(lingbin): Why other methods that need to get information from _tablet_meta
// are not locked, here needs a comment to explain.
inline size_t Tablet::tablet_footprint() {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
return _tablet_meta->tablet_footprint();
}
// TODO(lingbin): Why other methods which need to get information from _tablet_meta
// are not locked, here needs a comment to explain.
inline size_t Tablet::num_rows() {
- ReadLock rdlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
return _tablet_meta->num_rows();
}
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 59b8b6a96d..0d5b9714ab 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -86,12 +86,11 @@ TabletManager::~TabletManager() {
DEREGISTER_HOOK_METRIC(tablet_meta_mem_consumption);
}
-OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const TabletSharedPtr& tablet,
- bool update_meta, bool force) {
+OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const TabletSharedPtr& tablet,
+ bool update_meta, bool force) {
OLAPStatus res = OLAP_SUCCESS;
VLOG_NOTICE << "begin to add tablet to TabletManager. "
- << "tablet_id=" << tablet_id
- << ", force=" << force;
+ << "tablet_id=" << tablet_id << ", force=" << force;
TabletSharedPtr existed_tablet = nullptr;
tablet_map_t& tablet_map = _get_tablet_map(tablet_id);
@@ -101,9 +100,10 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const Tablet
}
if (existed_tablet == nullptr) {
- return _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, false /*keep_files*/, false /*drop_old*/);
+ return _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, false /*keep_files*/,
+ false /*drop_old*/);
}
- // During restore process, the tablet is exist and snapshot loader will replace the tablet's rowsets
+ // During restore process, the tablet is exist and snapshot loader will replace the tablet's rowsets
// and then reload the tablet, the tablet's path will the same
if (!force) {
if (existed_tablet->tablet_path_desc().filepath == tablet->tablet_path_desc().filepath) {
@@ -123,7 +123,7 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const Tablet
int64_t old_time, new_time;
int32_t old_version, new_version;
{
- ReadLock rdlock(existed_tablet->get_header_lock());
+ std::shared_lock rdlock(existed_tablet->get_header_lock());
const RowsetSharedPtr old_rowset = existed_tablet->rowset_with_max_version();
const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version();
// If new tablet is empty, it is a newly created schema change tablet.
@@ -155,14 +155,15 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const Tablet
if (force ||
(new_version > old_version || (new_version == old_version && new_time > old_time))) {
// check if new tablet's meta is in store and add new tablet's meta to meta store
- res = _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, keep_files, true /*drop_old*/);
+ res = _add_tablet_to_map_unlocked(tablet_id, tablet, update_meta, keep_files,
+ true /*drop_old*/);
} else {
res = OLAP_ERR_ENGINE_INSERT_OLD_TABLET;
}
LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res
- << ", tablet_id=" << tablet_id
- << ", old_version=" << old_version << ", new_version=" << new_version
- << ", old_time=" << old_time << ", new_time=" << new_time
+ << ", tablet_id=" << tablet_id << ", old_version=" << old_version
+ << ", new_version=" << new_version << ", old_time=" << old_time
+ << ", new_time=" << new_time
<< ", old_tablet_path=" << existed_tablet->tablet_path_desc().debug_string()
<< ", new_tablet_path=" << tablet->tablet_path_desc().debug_string();
@@ -184,7 +185,8 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
// the existing tablet with the new one.
RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, keep_files),
strings::Substitute("failed to drop old tablet when add new tablet. "
- "tablet_id=$0", tablet_id));
+ "tablet_id=$0",
+ tablet_id));
}
// Register tablet into DataDir, so that we can manage tablet from
// the perspective of root path.
@@ -194,13 +196,14 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id,
tablet_map[tablet_id] = tablet;
_add_tablet_to_partition(tablet);
- VLOG_NOTICE << "add tablet to map successfully." << " tablet_id=" << tablet_id ;
+ VLOG_NOTICE << "add tablet to map successfully."
+ << " tablet_id=" << tablet_id;
return res;
}
bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) {
- ReadLock rdlock(_get_tablets_shard_lock(tablet_id));
+ std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id));
return _check_tablet_id_exist_unlocked(tablet_id);
}
@@ -217,7 +220,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
int64_t tablet_id = request.tablet_id;
LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id;
- WriteLock wrlock(_get_tablets_shard_lock(tablet_id));
+ std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id));
TRACE("got tablets shard lock");
// Make create_tablet operation to be idempotent:
// 1. Return true if tablet with same tablet_id and schema_hash exist;
@@ -239,7 +242,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request,
base_tablet = _get_tablet_unlocked(request.base_tablet_id);
if (base_tablet == nullptr) {
LOG(WARNING) << "fail to create tablet(change schema), base tablet does not exist. "
- << "new_tablet_id=" << tablet_id
+ << "new_tablet_id=" << tablet_id
<< ", base_tablet_id=" << request.base_tablet_id;
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
return OLAP_ERR_TABLE_CREATE_META_ERROR;
@@ -433,7 +436,7 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
}
OLAPStatus TabletManager::drop_tablet(TTabletId tablet_id, bool keep_files) {
- WriteLock wrlock(_get_tablets_shard_lock(tablet_id));
+ std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id));
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
return _drop_tablet_unlocked(tablet_id, keep_files);
}
@@ -476,14 +479,15 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path(
if (local_tmp_vector[i].empty()) {
continue;
}
- WriteLock wrlock(_tablets_shards[i].lock);
+ std::lock_guard<std::shared_mutex> wrlock(_tablets_shards[i].lock);
for (size_t idx : local_tmp_vector[i]) {
const TabletInfo& tablet_info = tablet_info_vec[idx];
TTabletId tablet_id = tablet_info.tablet_id;
VLOG_NOTICE << "drop_tablet begin. tablet_id=" << tablet_id;
TabletSharedPtr dropped_tablet = _get_tablet_unlocked(tablet_id);
if (dropped_tablet == nullptr) {
- LOG(WARNING) << "dropping tablet not exist, " << " tablet=" << tablet_id;
+ LOG(WARNING) << "dropping tablet not exist, "
+ << " tablet=" << tablet_id;
continue;
} else {
_remove_tablet_from_partition(dropped_tablet);
@@ -496,15 +500,16 @@ OLAPStatus TabletManager::drop_tablets_on_error_root_path(
}
TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_deleted, string* err) {
- ReadLock rdlock(_get_tablets_shard_lock(tablet_id));
+ std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id));
return _get_tablet_unlocked(tablet_id, include_deleted, err);
}
-TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, string* err) {
+TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted,
+ string* err) {
TabletSharedPtr tablet;
tablet = _get_tablet_unlocked(tablet_id);
if (tablet == nullptr && include_deleted) {
- ReadLock rdlock(_shutdown_tablets_lock);
+ std::shared_lock rdlock(_shutdown_tablets_lock);
for (auto& deleted_tablet : _shutdown_tablets) {
CHECK(deleted_tablet != nullptr) << "deleted tablet is nullptr";
if (deleted_tablet->tablet_id() == tablet_id) {
@@ -533,7 +538,7 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool in
}
TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, TabletUid tablet_uid, bool include_deleted, string* err) {
- ReadLock rdlock(_get_tablets_shard_lock(tablet_id));
+ std::shared_lock rdlock(_get_tablets_shard_lock(tablet_id));
TabletSharedPtr tablet = _get_tablet_unlocked(tablet_id, include_deleted, err);
if (tablet != nullptr && tablet->tablet_uid() == tablet_uid) {
return tablet;
@@ -596,7 +601,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
double tablet_scan_frequency = 0.0;
TabletSharedPtr best_tablet;
for (const auto& tablets_shard : _tablets_shards) {
- ReadLock rdlock(tablets_shard.lock);
+ std::shared_lock rdlock(tablets_shard.lock);
for (const auto& tablet_map : tablets_shard.tablet_map) {
const TabletSharedPtr& tablet_ptr = tablet_map.second;
if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) {
@@ -612,23 +617,24 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
if (compaction_type == CompactionType::BASE_COMPACTION) {
last_failure_ms = tablet_ptr->last_base_compaction_failure_time();
}
- if (now_ms - last_failure_ms <=
- config::min_compaction_failure_interval_sec * 1000) {
+ if (now_ms - last_failure_ms <= config::min_compaction_failure_interval_sec * 1000) {
VLOG_DEBUG << "Too often to check compaction, skip it. "
- << "compaction_type=" << compaction_type_str
- << ", last_failure_time_ms=" << last_failure_ms
- << ", tablet_id=" << tablet_ptr->tablet_id();
+ << "compaction_type=" << compaction_type_str
+ << ", last_failure_time_ms=" << last_failure_ms
+ << ", tablet_id=" << tablet_ptr->tablet_id();
continue;
}
if (compaction_type == CompactionType::BASE_COMPACTION) {
- std::unique_lock<std::mutex> lock(tablet_ptr->get_base_compaction_lock(), std::try_to_lock);
+ std::unique_lock<std::mutex> lock(tablet_ptr->get_base_compaction_lock(),
+ std::try_to_lock);
if (!lock.owns_lock()) {
LOG(INFO) << "can not get base lock: " << tablet_ptr->tablet_id();
continue;
}
} else {
- std::unique_lock<std::mutex> lock(tablet_ptr->get_cumulative_compaction_lock(), std::try_to_lock);
+ std::unique_lock<std::mutex> lock(tablet_ptr->get_cumulative_compaction_lock(),
+ std::try_to_lock);
if (!lock.owns_lock()) {
LOG(INFO) << "can not get cumu lock: " << tablet_ptr->tablet_id();
continue;
@@ -645,8 +651,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
double tablet_score =
config::compaction_tablet_scan_frequency_factor * scan_frequency +
- config::compaction_tablet_compaction_score_factor *
- current_compaction_score;
+ config::compaction_tablet_compaction_score_factor * current_compaction_score;
if (tablet_score > highest_score) {
highest_score = tablet_score;
compaction_score = current_compaction_score;
@@ -731,7 +736,7 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab
LOG(INFO) << "fail to load tablet because it is to be deleted. tablet_id=" << tablet_id
<< " schema_hash=" << schema_hash << ", path=" << data_dir->path();
{
- WriteLock shutdown_tablets_wrlock(_shutdown_tablets_lock);
+ std::lock_guard<std::shared_mutex> shutdown_tablets_wrlock(_shutdown_tablets_lock);
_shutdown_tablets.push_back(tablet);
}
return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR;
@@ -748,7 +753,7 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab
RETURN_NOT_OK_LOG(tablet->init(),
strings::Substitute("tablet init failed. tablet=$0", tablet->full_name()));
- WriteLock wrlock(_get_tablets_shard_lock(tablet_id));
+ std::lock_guard<std::shared_mutex> wrlock(_get_tablets_shard_lock(tablet_id));
RETURN_NOT_OK_LOG(_add_tablet_unlocked(tablet_id, tablet, update_meta, force),
strings::Substitute("fail to add tablet. tablet=$0", tablet->full_name()));
@@ -831,10 +836,10 @@ OLAPStatus TabletManager::build_all_report_tablets_info(
HistogramStat tablet_version_num_hist;
auto local_cache = std::make_shared<std::vector<TTabletStat>>();
for (const auto& tablets_shard : _tablets_shards) {
- ReadLock rdlock(tablets_shard.lock);
+ std::shared_lock rdlock(tablets_shard.lock);
for (const auto& item : tablets_shard.tablet_map) {
uint64_t tablet_id = item.first;
- TabletSharedPtr tablet_ptr = item.second;
+ TabletSharedPtr tablet_ptr = item.second;
TTablet t_tablet;
TTabletInfo tablet_info;
tablet_ptr->build_tablet_report_info(&tablet_info);
@@ -868,11 +873,12 @@ OLAPStatus TabletManager::build_all_report_tablets_info(
OLAPStatus TabletManager::start_trash_sweep() {
{
- std::vector<TabletSharedPtr> all_tablets; // we use this vector to save all tablet ptr for saving lock time.
+ std::vector<TabletSharedPtr>
+ all_tablets; // we use this vector to save all tablet ptr for saving lock time.
for (auto& tablets_shard : _tablets_shards) {
tablet_map_t& tablet_map = tablets_shard.tablet_map;
{
- ReadLock rdlock(tablets_shard.lock);
+ std::shared_lock rdlock(tablets_shard.lock);
for (auto& item : tablet_map) {
// try to clean empty item
all_tablets.push_back(item.second);
@@ -894,7 +900,7 @@ OLAPStatus TabletManager::start_trash_sweep() {
clean_num = 0;
// should get write lock here, because it will remove tablet from shut_down_tablets
// and get tablet will access shut_down_tablets
- WriteLock wrlock(_shutdown_tablets_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_shutdown_tablets_lock);
auto it = _shutdown_tablets.begin();
while (it != _shutdown_tablets.end()) {
// check if the meta has the tablet info and its state is shutdown
@@ -928,7 +934,8 @@ OLAPStatus TabletManager::start_trash_sweep() {
LOG(INFO) << "start to move tablet to trash. " << tablet_path_desc.debug_string();
OLAPStatus rm_st = (*it)->data_dir()->move_to_trash(tablet_path_desc);
if (rm_st != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to move dir to trash. " << tablet_path_desc.debug_string();
+ LOG(WARNING)
+ << "fail to move dir to trash. " << tablet_path_desc.debug_string();
++it;
continue;
}
@@ -972,13 +979,13 @@ OLAPStatus TabletManager::start_trash_sweep() {
void TabletManager::register_clone_tablet(int64_t tablet_id) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
- WriteLock wrlock(shard.lock);
+ std::lock_guard<std::shared_mutex> wrlock(shard.lock);
shard.tablets_under_clone.insert(tablet_id);
}
void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
tablets_shard& shard = _get_tablets_shard(tablet_id);
- WriteLock wrlock(shard.lock);
+ std::lock_guard<std::shared_mutex> wrlock(shard.lock);
shard.tablets_under_clone.erase(tablet_id);
}
@@ -988,7 +995,7 @@ void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
// acquire the read lock, so that there is no creating tablet or load tablet from meta tasks
// create tablet and load tablet task should check whether the dir exists
tablets_shard& shard = _get_tablets_shard(tablet_id);
- ReadLock rdlock(shard.lock);
+ std::shared_lock rdlock(shard.lock);
// check if meta already exists
TabletMetaSharedPtr tablet_meta(new TabletMeta());
@@ -1023,7 +1030,7 @@ void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
DCHECK(tablet_count != 0);
*tablet_count = 0;
for (const auto& tablets_shard : _tablets_shards) {
- ReadLock rdlock(tablets_shard.lock);
+ std::shared_lock rdlock(tablets_shard.lock);
for (const auto& item : tablets_shard.tablet_map) {
TabletSharedPtr tablet = item.second;
++(*tablet_count);
@@ -1041,7 +1048,7 @@ void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
void TabletManager::get_partition_related_tablets(int64_t partition_id,
std::set<TabletInfo>* tablet_infos) {
- ReadLock rdlock(_partition_tablet_map_lock);
+ std::shared_lock rdlock(_partition_tablet_map_lock);
if (_partition_tablet_map.find(partition_id) != _partition_tablet_map.end()) {
*tablet_infos = _partition_tablet_map[partition_id];
}
@@ -1051,7 +1058,7 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
std::vector<TabletSharedPtr> related_tablets;
{
for (auto& tablets_shard : _tablets_shards) {
- ReadLock rdlock(tablets_shard.lock);
+ std::shared_lock rdlock(tablets_shard.lock);
for (auto& item : tablets_shard.tablet_map) {
TabletSharedPtr& tablet_ptr = item.second;
if (tablet_ptr->tablet_state() != TABLET_RUNNING) {
@@ -1209,10 +1216,10 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bo
tablet_map.erase(tablet_id);
if (!keep_files) {
// drop tablet will update tablet meta, should lock
- WriteLock wrlock(dropped_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> wrlock(dropped_tablet->get_header_lock());
LOG(INFO) << "set tablet to shutdown state and remove it from memory. "
- << "tablet_id=" << tablet_id
- << ", tablet_path=" << dropped_tablet->tablet_path_desc().filepath;
+ << "tablet_id=" << tablet_id
+ << ", tablet_path=" << dropped_tablet->tablet_path_desc().filepath;
// NOTE: has to update tablet here, but must not update tablet meta directly.
// because other thread may hold the tablet object, they may save meta too.
// If update meta directly here, other thread may override the meta
@@ -1221,7 +1228,7 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id, bo
dropped_tablet->set_tablet_state(TABLET_SHUTDOWN);
dropped_tablet->save_meta();
{
- WriteLock wrdlock(_shutdown_tablets_lock);
+ std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock);
_shutdown_tablets.push_back(dropped_tablet);
}
}
@@ -1241,12 +1248,12 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) {
}
void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) {
- WriteLock wrlock(_partition_tablet_map_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_partition_tablet_map_lock);
_partition_tablet_map[tablet->partition_id()].insert(tablet->get_tablet_info());
}
void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) {
- WriteLock wrlock(_partition_tablet_map_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_partition_tablet_map_lock);
_partition_tablet_map[tablet->partition_id()].erase(tablet->get_tablet_info());
if (_partition_tablet_map[tablet->partition_id()].empty()) {
_partition_tablet_map.erase(tablet->partition_id());
@@ -1256,7 +1263,7 @@ void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet)
void TabletManager::obtain_specific_quantity_tablets(vector<TabletInfo>& tablets_info,
int64_t num) {
for (const auto& tablets_shard : _tablets_shards) {
- ReadLock rdlock(tablets_shard.lock);
+ std::shared_lock rdlock(tablets_shard.lock);
for (const auto& item : tablets_shard.tablet_map) {
TabletSharedPtr tablet = item.second;
if (tablets_info.size() >= num) {
@@ -1291,7 +1298,7 @@ void TabletManager::get_tablets_distribution_on_different_disks(
// When drop tablet, '_partition_tablet_map_lock' is locked in 'tablet_shard_lock'.
// To avoid locking 'tablet_shard_lock' in '_partition_tablet_map_lock', we lock and
// copy _partition_tablet_map here.
- ReadLock rdlock(_partition_tablet_map_lock);
+ std::shared_lock rdlock(_partition_tablet_map_lock);
partition_tablet_map = _partition_tablet_map;
}
std::map<int64_t, std::set<TabletInfo>>::iterator partition_iter = partition_tablet_map.begin();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 69417ced3e..71b6940b52 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -91,12 +91,12 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
}
schema->set_compress_kind(COMPRESS_LZ4);
- switch(tablet_schema.sort_type) {
- case TSortType::type::ZORDER:
- schema->set_sort_type(SortType::ZORDER);
- break;
- default:
- schema->set_sort_type(SortType::LEXICAL);
+ switch (tablet_schema.sort_type) {
+ case TSortType::type::ZORDER:
+ schema->set_sort_type(SortType::ZORDER);
+ break;
+ default:
+ schema->set_sort_type(SortType::LEXICAL);
}
schema->set_sort_col_num(tablet_schema.sort_col_num);
tablet_meta_pb.set_in_restore_mode(false);
@@ -301,7 +301,7 @@ OLAPStatus TabletMeta::save(const string& file_path, const TabletMetaPB& tablet_
}
OLAPStatus TabletMeta::save_meta(DataDir* data_dir) {
- WriteLock wrlock(_meta_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
return _save_meta(data_dir);
}
@@ -546,7 +546,7 @@ void TabletMeta::modify_rs_metas(const std::vector<RowsetMetaSharedPtr>& to_add,
// an existing tablet before. Add after revise, only the passing "rs_metas"
// is needed.
void TabletMeta::revise_rs_metas(std::vector<RowsetMetaSharedPtr>&& rs_metas) {
- WriteLock wrlock(_meta_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
_rs_metas = std::move(rs_metas);
_stale_rs_metas.clear();
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 1f83ccea45..1829ff44ba 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -31,7 +31,6 @@
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet_schema.h"
-#include "util/mutex.h"
#include "util/uid_util.h"
namespace doris {
diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp
index 5068b72a29..d064906241 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -17,18 +17,15 @@
#include "olap/task/engine_checksum_task.h"
-#include "olap/tuple_reader.h"
#include "olap/row.h"
#include "runtime/thread_context.h"
+#include "olap/tuple_reader.h"
namespace doris {
EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash,
TVersion version, uint32_t* checksum)
- : _tablet_id(tablet_id),
- _schema_hash(schema_hash),
- _version(version),
- _checksum(checksum) {
+ : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) {
_mem_tracker = MemTracker::create_tracker(-1, "compute checksum: " + std::to_string(tablet_id),
StorageEngine::instance()->consistency_mem_tracker(),
MemTrackerLevel::TASK);
@@ -66,7 +63,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
reader_params.version = Version(0, _version);
{
- ReadLock rdlock(tablet->get_header_lock());
+ std::shared_lock rdlock(tablet->get_header_lock());
const RowsetSharedPtr message = tablet->rowset_with_max_version();
if (message == nullptr) {
LOG(FATAL) << "fail to get latest version. tablet_id=" << _tablet_id;
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index 8fa5818ea8..4d1f6c44b8 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -81,7 +81,7 @@ OLAPStatus EngineCloneTask::_do_clone() {
bool is_new_tablet = tablet == nullptr;
// try to repair a tablet with missing version
if (tablet != nullptr) {
- ReadLock migration_rlock(tablet->get_migration_lock(), std::try_to_lock);
+ std::shared_lock migration_rlock(tablet->get_migration_lock(), std::try_to_lock);
if (!migration_rlock.owns_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
@@ -268,10 +268,12 @@ void EngineCloneTask::_set_tablet_info(Status status, bool is_new_tablet) {
/// 2. Download all snapshots to CLONE dir.
/// 3. Convert rowset ids of downloaded snapshots.
/// 4. Release the snapshots on source BE.
-Status EngineCloneTask::_make_and_download_snapshots(
- DataDir& data_dir, const string& local_data_path, TBackend* src_host, string* snapshot_path,
- std::vector<string>* error_msgs, const std::vector<Version>* missed_versions,
- bool* allow_incremental_clone) {
+Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
+ const string& local_data_path,
+ TBackend* src_host, string* snapshot_path,
+ std::vector<string>* error_msgs,
+ const std::vector<Version>* missed_versions,
+ bool* allow_incremental_clone) {
Status status = Status::OK();
std::string local_path = local_data_path + "/";
@@ -529,10 +531,11 @@ OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_di
// clone and compaction operation should be performed sequentially
{
std::lock_guard<std::mutex> base_compaction_lock(tablet->get_base_compaction_lock());
- std::lock_guard<std::mutex> cumulative_compaction_lock(tablet->get_cumulative_compaction_lock());
+ std::lock_guard<std::mutex> cumulative_compaction_lock(
+ tablet->get_cumulative_compaction_lock());
tablet->set_clone_occurred(true);
std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
- WriteLock wrlock(tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
do {
// check clone dir existed
if (!FileUtils::check_exist(clone_dir)) {
@@ -547,7 +550,8 @@ OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_di
string cloned_tablet_meta_file =
clone_dir + "/" + std::to_string(tablet->tablet_id()) + ".hdr";
TabletMeta cloned_tablet_meta;
- if ((res = cloned_tablet_meta.create_from_file(cloned_tablet_meta_file)) != OLAP_SUCCESS) {
+ if ((res = cloned_tablet_meta.create_from_file(cloned_tablet_meta_file)) !=
+ OLAP_SUCCESS) {
LOG(WARNING) << "fail to load src header when clone. "
<< ", cloned_tablet_meta_file=" << cloned_tablet_meta_file;
break;
@@ -557,9 +561,11 @@ OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_di
// check all files in /clone and /tablet
set<string> clone_files;
- Status ret = FileUtils::list_dirs_files(clone_dir, nullptr, &clone_files, Env::Default());
+ Status ret =
+ FileUtils::list_dirs_files(clone_dir, nullptr, &clone_files, Env::Default());
if (!ret.ok()) {
- LOG(WARNING) << "failed to list clone dir when clone. [clone_dir=" << clone_dir << "]"
+ LOG(WARNING) << "failed to list clone dir when clone. [clone_dir=" << clone_dir
+ << "]"
<< " error: " << ret.to_string();
res = OLAP_ERR_DISK_FAILURE;
break;
@@ -569,8 +575,8 @@ OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_di
string tablet_dir = tablet->tablet_path_desc().filepath;
ret = FileUtils::list_dirs_files(tablet_dir, nullptr, &local_files, Env::Default());
if (!ret.ok()) {
- LOG(WARNING) << "failed to list local tablet dir when clone. [tablet_dir=" << tablet_dir
- << "]"
+ LOG(WARNING) << "failed to list local tablet dir when clone. [tablet_dir="
+ << tablet_dir << "]"
<< " error: " << ret.to_string();
res = OLAP_ERR_DISK_FAILURE;
break;
@@ -582,7 +588,8 @@ OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_di
for (const string& clone_file : clone_files) {
if (local_files.find(clone_file) != local_files.end()) {
VLOG_NOTICE << "find same file when clone, skip it. "
- << "tablet=" << tablet->full_name() << ", clone_file=" << clone_file;
+ << "tablet=" << tablet->full_name()
+ << ", clone_file=" << clone_file;
continue;
}
@@ -758,9 +765,9 @@ OLAPStatus EngineCloneTask::_finish_full_clone(Tablet* tablet, TabletMeta* clone
// but some rowset is useless, so that remove them here
for (auto& rs_meta_ptr : rs_metas_found_in_src) {
RowsetSharedPtr rowset_to_remove;
- auto s =
- RowsetFactory::create_rowset(&(cloned_tablet_meta->tablet_schema()),
- tablet->tablet_path_desc().filepath, rs_meta_ptr, &rowset_to_remove);
+ auto s = RowsetFactory::create_rowset(&(cloned_tablet_meta->tablet_schema()),
+ tablet->tablet_path_desc().filepath, rs_meta_ptr,
+ &rowset_to_remove);
if (s != OLAP_SUCCESS) {
LOG(WARNING) << "failed to init rowset to remove: "
<< rs_meta_ptr->rowset_id().to_string();
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp
index 7316e59202..b0deb4595c 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -31,20 +31,20 @@ const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60;
EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr& tablet,
DataDir* dest_store)
: _tablet(tablet), _dest_store(dest_store) {
- _task_start_time = time(nullptr);
- }
+ _task_start_time = time(nullptr);
+}
OLAPStatus EngineStorageMigrationTask::execute() {
return _migrate();
}
-OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version, int32_t* end_version,
- std::vector<RowsetSharedPtr> *consistent_rowsets) {
- ReadLock rdlock(_tablet->get_header_lock());
+OLAPStatus EngineStorageMigrationTask::_get_versions(
+ int32_t start_version, int32_t* end_version,
+ std::vector<RowsetSharedPtr>* consistent_rowsets) {
+ std::shared_lock rdlock(_tablet->get_header_lock());
const RowsetSharedPtr last_version = _tablet->rowset_with_max_version();
if (last_version == nullptr) {
- LOG(WARNING) << "failed to get rowset with max version, tablet="
- << _tablet->full_name();
+ LOG(WARNING) << "failed to get rowset with max version, tablet=" << _tablet->full_name();
return OLAP_ERR_VERSION_NOT_EXIST;
}
@@ -52,13 +52,13 @@ OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version, int3
if (*end_version < start_version) {
// rowsets are empty
VLOG_DEBUG << "consistent rowsets empty. tablet=" << _tablet->full_name()
- << ", start_version=" << start_version << ", end_version=" << *end_version;
+ << ", start_version=" << start_version << ", end_version=" << *end_version;
return OLAP_SUCCESS;
}
_tablet->capture_consistent_rowsets(Version(start_version, *end_version), consistent_rowsets);
if (consistent_rowsets->empty()) {
LOG(WARNING) << "fail to capture consistent rowsets. tablet=" << _tablet->full_name()
- << ", version=" << *end_version;
+ << ", version=" << *end_version;
return OLAP_ERR_VERSION_NOT_EXIST;
}
return OLAP_SUCCESS;
@@ -68,7 +68,7 @@ bool EngineStorageMigrationTask::_is_timeout() {
int64_t time_elapsed = time(nullptr) - _task_start_time;
if (time_elapsed > config::migration_task_timeout_secs) {
LOG(WARNING) << "migration failed due to timeout, time_eplapsed=" << time_elapsed
- << ", tablet=" << _tablet->full_name();
+ << ", tablet=" << _tablet->full_name();
return true;
}
return false;
@@ -80,14 +80,16 @@ OLAPStatus EngineStorageMigrationTask::_check_running_txns() {
std::set<int64_t> transaction_ids;
// check if this tablet has related running txns. if yes, can not do migration.
StorageEngine::instance()->txn_manager()->get_tablet_related_txns(
- _tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid(), &partition_id, &transaction_ids);
+ _tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid(), &partition_id,
+ &transaction_ids);
if (transaction_ids.size() > 0) {
return OLAP_ERR_HEADER_HAS_PENDING_DATA;
}
return OLAP_SUCCESS;
}
-OLAPStatus EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueWriteLock* migration_wlock) {
+OLAPStatus EngineStorageMigrationTask::_check_running_txns_until_timeout(
+ std::unique_lock<std::shared_mutex>* migration_wlock) {
// caller should not hold migration lock, and 'migration_wlock' should not be nullptr
// ownership of the migration_wlock is transferred to the caller if check succ
DCHECK_NE(migration_wlock, nullptr);
@@ -95,7 +97,7 @@ OLAPStatus EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueW
int try_times = 1;
do {
// to avoid invalid loops, the lock is guaranteed to be acquired here
- UniqueWriteLock wlock(_tablet->get_migration_lock());
+ std::unique_lock<std::shared_mutex> wlock(_tablet->get_migration_lock());
res = _check_running_txns();
if (res == OLAP_SUCCESS) {
// transfer the lock to the caller
@@ -103,9 +105,8 @@ OLAPStatus EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueW
return res;
}
LOG(INFO) << "check running txns fail, try again until timeout."
- << " tablet=" << _tablet->full_name()
- << ", try times=" << try_times
- << ", res=" << res;
+ << " tablet=" << _tablet->full_name() << ", try times=" << try_times
+ << ", res=" << res;
// unlock and sleep for a while, try again
wlock.unlock();
sleep(std::min(config::sleep_one_second * try_times, CHECK_TXNS_MAX_WAIT_TIME_SECS));
@@ -115,16 +116,15 @@ OLAPStatus EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueW
}
OLAPStatus EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
- uint64_t shard,
- const std::string& full_path,
- const std::vector<RowsetSharedPtr>& consistent_rowsets) {
+ uint64_t shard, const std::string& full_path,
+ const std::vector<RowsetSharedPtr>& consistent_rowsets) {
// need hold migration lock and push lock outside
OLAPStatus res = OLAP_SUCCESS;
int64_t tablet_id = _tablet->tablet_id();
int32_t schema_hash = _tablet->schema_hash();
TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta());
{
- ReadLock rdlock(_tablet->get_header_lock());
+ std::shared_lock rdlock(_tablet->get_header_lock());
_generate_new_header(shard, consistent_rowsets, new_tablet_meta);
}
std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) + ".hdr";
@@ -145,14 +145,13 @@ OLAPStatus EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
res = SnapshotManager::instance()->convert_rowset_ids(full_path, tablet_id, schema_hash);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to convert rowset id when do storage migration"
- << " path = " << full_path;
+ << " path = " << full_path;
return res;
}
return res;
}
-OLAPStatus EngineStorageMigrationTask::_reload_tablet(
- const std::string& full_path) {
+OLAPStatus EngineStorageMigrationTask::_reload_tablet(const std::string& full_path) {
// need hold migration lock and push lock outside
OLAPStatus res = OLAP_SUCCESS;
int64_t tablet_id = _tablet->tablet_id();
@@ -161,7 +160,7 @@ OLAPStatus EngineStorageMigrationTask::_reload_tablet(
_dest_store, tablet_id, schema_hash, full_path, false);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to load tablet from new path. tablet_id=" << tablet_id
- << " schema_hash=" << schema_hash << " path = " << full_path;
+ << " schema_hash=" << schema_hash << " path = " << full_path;
return res;
}
@@ -204,7 +203,8 @@ OLAPStatus EngineStorageMigrationTask::_migrate() {
uint64_t shard = 0;
string full_path;
{
- UniqueWriteLock migration_wlock(_tablet->get_migration_lock(), std::try_to_lock);
+ std::unique_lock<std::shared_mutex> migration_wlock(_tablet->get_migration_lock(),
+ std::try_to_lock);
if (!migration_wlock.owns_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
@@ -213,7 +213,7 @@ OLAPStatus EngineStorageMigrationTask::_migrate() {
res = _check_running_txns();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "could not migration because has unfinished txns, "
- << " tablet=" << _tablet->full_name();
+ << " tablet=" << _tablet->full_name();
return res;
}
@@ -239,7 +239,7 @@ OLAPStatus EngineStorageMigrationTask::_migrate() {
// should not remove the dir directly, for safety reason.
if (FileUtils::check_exist(full_path)) {
LOG(INFO) << "schema hash path already exist, skip this path. "
- << "full_path=" << full_path;
+ << "full_path=" << full_path;
return OLAP_ERR_FILE_ALREADY_EXIST;
}
@@ -247,7 +247,7 @@ OLAPStatus EngineStorageMigrationTask::_migrate() {
if (!st.ok()) {
res = OLAP_ERR_CANNOT_CREATE_DIR;
LOG(WARNING) << "fail to create path. path=" << full_path
- << ", error:" << st.to_string();
+ << ", error:" << st.to_string();
return res;
}
}
@@ -260,7 +260,7 @@ OLAPStatus EngineStorageMigrationTask::_migrate() {
LOG(WARNING) << "fail to copy index and data files when migrate. res=" << res;
break;
}
- UniqueWriteLock migration_wlock;
+ std::unique_lock<std::shared_mutex> migration_wlock;
res = _check_running_txns_until_timeout(&migration_wlock);
if (res != OLAP_SUCCESS) {
break;
@@ -276,17 +276,19 @@ OLAPStatus EngineStorageMigrationTask::_migrate() {
}
if (start_version < end_version) {
// we have remaining versions to be migrated
- consistent_rowsets.insert(consistent_rowsets.end(),
- temp_consistent_rowsets.begin(), temp_consistent_rowsets.end());
+ consistent_rowsets.insert(consistent_rowsets.end(), temp_consistent_rowsets.begin(),
+ temp_consistent_rowsets.end());
LOG(INFO) << "we have remaining versions to be migrated. start_version="
- << start_version << " end_version=" << end_version;
+ << start_version << " end_version=" << end_version;
// if the remaining size is less than config::migration_remaining_size_threshold_mb(default 10MB),
// we take the lock to complete it to avoid long-term competition with other tasks
if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets)) {
// force to copy the remaining data and index
res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to copy the remaining index and data files when migrate. res=" << res;
+ LOG(WARNING)
+ << "fail to copy the remaining index and data files when migrate. res="
+ << res;
break;
}
} else {
diff --git a/be/src/olap/task/engine_storage_migration_task.h b/be/src/olap/task/engine_storage_migration_task.h
index ffd42a459c..0d67ca5b67 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -38,21 +38,21 @@ private:
OLAPStatus _migrate();
// check if task is timeout
bool _is_timeout();
- OLAPStatus _get_versions(int32_t start_version,
- int32_t* end_version,
- std::vector<RowsetSharedPtr> *consistent_rowsets);
+ OLAPStatus _get_versions(int32_t start_version, int32_t* end_version,
+ std::vector<RowsetSharedPtr>* consistent_rowsets);
OLAPStatus _check_running_txns();
// caller should not hold migration lock, and 'migration_wlock' should not be nullptr
// ownership of the migration lock is transferred to the caller if check succ
- OLAPStatus _check_running_txns_until_timeout(UniqueWriteLock* migration_wlock);
+ OLAPStatus _check_running_txns_until_timeout(
+ std::unique_lock<std::shared_mutex>* migration_wlock);
// if the size less than threshold, return true
- bool _is_rowsets_size_less_than_threshold(const std::vector<RowsetSharedPtr>& consistent_rowsets);
+ bool _is_rowsets_size_less_than_threshold(
+ const std::vector<RowsetSharedPtr>& consistent_rowsets);
OLAPStatus _gen_and_write_header_to_hdr_file(
- uint64_t shard,
- const std::string& full_path,
- const std::vector<RowsetSharedPtr>& consistent_rowsets);
+ uint64_t shard, const std::string& full_path,
+ const std::vector<RowsetSharedPtr>& consistent_rowsets);
OLAPStatus _reload_tablet(const std::string& full_path);
void _generate_new_header(uint64_t new_shard,
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 799aaffbda..f7b994beb2 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -79,7 +79,7 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size)
_txn_map_locks = new std::shared_mutex[_txn_map_shard_size];
_txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
_txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
- _txn_mutex = new Mutex[_txn_shard_size];
+ _txn_mutex = new std::mutex[_txn_shard_size];
}
OLAPStatus TxnManager::prepare_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
@@ -123,7 +123,7 @@ OLAPStatus TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId tra
TabletUid tablet_uid, const PUniqueId& load_id) {
TxnKey key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- WriteLock txn_wrlock(_get_txn_map_lock(transaction_id));
+ std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it != txn_tablet_map.end()) {
@@ -184,10 +184,10 @@ OLAPStatus TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
return OLAP_ERR_ROWSET_INVALID;
}
- MutexLock txn_lock(&_get_txn_lock(transaction_id));
+ std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
{
// get tx
- ReadLock rdlock(_get_txn_map_lock(transaction_id));
+ std::shared_lock rdlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it != txn_tablet_map.end()) {
@@ -210,7 +210,8 @@ OLAPStatus TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
load_info.load_id.lo() == load_id.lo() && load_info.rowset != nullptr &&
load_info.rowset->rowset_id() != rowset_ptr->rowset_id()) {
// find a rowset with different rowset id, then it should not happen, just return errors
- LOG(WARNING) << "find rowset exists when commit transaction to engine. but rowset ids "
+ LOG(WARNING) << "find rowset exists when commit transaction to engine. but "
+ "rowset ids "
"are not same."
<< "partition_id: " << key.first
<< ", transaction_id: " << key.second
@@ -239,7 +240,7 @@ OLAPStatus TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
}
{
- WriteLock wrlock(_get_txn_map_lock(transaction_id));
+ std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
TabletTxnInfo load_info(load_id, rowset_ptr);
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
txn_tablet_map[key][tablet_info] = load_info;
@@ -261,9 +262,9 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
RowsetSharedPtr rowset_ptr = nullptr;
- MutexLock txn_lock(&_get_txn_lock(transaction_id));
+ std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
{
- ReadLock rlock(_get_txn_map_lock(transaction_id));
+ std::shared_lock rlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it != txn_tablet_map.end()) {
@@ -295,7 +296,7 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
return OLAP_ERR_TRANSACTION_NOT_EXIST;
}
{
- WriteLock wrlock(_get_txn_map_lock(transaction_id));
+ std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it != txn_tablet_map.end()) {
@@ -303,8 +304,8 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
VLOG_NOTICE << "publish txn successfully."
<< " partition_id: " << key.first << ", txn_id: " << key.second
<< ", tablet: " << tablet_info.to_string()
- << ", rowsetid: " << rowset_ptr->rowset_id() << ", version: " << version.first
- << "," << version.second;
+ << ", rowsetid: " << rowset_ptr->rowset_id()
+ << ", version: " << version.first << "," << version.second;
if (it->second.empty()) {
txn_tablet_map.erase(it);
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
@@ -323,7 +324,7 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
TabletUid tablet_uid) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- WriteLock wrlock(_get_txn_map_lock(transaction_id));
+ std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it != txn_tablet_map.end()) {
@@ -357,7 +358,7 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
SchemaHash schema_hash, TabletUid tablet_uid) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- WriteLock txn_wrlock(_get_txn_map_lock(transaction_id));
+ std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it == txn_tablet_map.end()) {
@@ -410,7 +411,7 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
for (int32_t i = 0; i < _txn_map_shard_size; i++) {
- ReadLock txn_rdlock(_txn_map_locks[i]);
+ std::shared_lock txn_rdlock(_txn_map_locks[i]);
txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
for (auto& it : txn_tablet_map) {
if (it.second.find(tablet_info) != it.second.end()) {
@@ -431,7 +432,7 @@ void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta
SchemaHash schema_hash, TabletUid tablet_uid) {
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
for (int32_t i = 0; i < _txn_map_shard_size; i++) {
- WriteLock txn_wrlock(_txn_map_locks[i]);
+ std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]);
txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) {
auto load_itr = it->second.find(tablet_info);
@@ -467,7 +468,7 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) {
// get tablets in this transaction
pair<int64_t, int64_t> key(partition_id, transaction_id);
- ReadLock txn_rdlock(_get_txn_map_lock(transaction_id));
+ std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
if (it == txn_tablet_map.end()) {
@@ -488,7 +489,7 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
for (int32_t i = 0; i < _txn_map_shard_size; i++) {
- ReadLock txn_rdlock(_txn_map_locks[i]);
+ std::shared_lock txn_rdlock(_txn_map_locks[i]);
for (auto& it : _txn_tablet_maps[i]) {
for (auto& tablet_load_it : it.second) {
tablet_infos->emplace(tablet_load_it.first);
@@ -501,7 +502,7 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- ReadLock txn_rdlock(_get_txn_map_lock(transaction_id));
+ std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
auto it = txn_tablet_map.find(key);
bool found = it != txn_tablet_map.end() && it->second.find(tablet_info) != it->second.end();
@@ -513,7 +514,7 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
int64_t now = UnixSeconds();
// traverse the txn map, and get all expired txns
for (int32_t i = 0; i < _txn_map_shard_size; i++) {
- ReadLock txn_rdlock(_txn_map_locks[i]);
+ std::shared_lock txn_rdlock(_txn_map_locks[i]);
for (auto& it : _txn_tablet_maps[i]) {
auto txn_id = it.first.second;
for (auto& t_map : it.second) {
@@ -533,7 +534,7 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
void TxnManager::get_partition_ids(const TTransactionId transaction_id,
std::vector<TPartitionId>* partition_ids) {
- ReadLock txn_rdlock(_get_txn_map_lock(transaction_id));
+ std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
auto it = txn_partition_map.find(transaction_id);
if (it != txn_partition_map.end()) {
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 596b8aab1f..66a2eef6cb 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -166,7 +166,7 @@ private:
txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId);
- Mutex& _get_txn_lock(TTransactionId transactionId);
+ inline std::mutex& _get_txn_lock(TTransactionId transactionId);
// Insert or remove (transaction_id, partition_id) from _txn_partition_map
// get _txn_map_lock before calling.
@@ -188,7 +188,7 @@ private:
std::shared_mutex* _txn_map_locks;
- Mutex* _txn_mutex;
+ std::mutex* _txn_mutex;
DISALLOW_COPY_AND_ASSIGN(TxnManager);
}; // TxnManager
@@ -205,7 +205,7 @@ inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map(
return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)];
}
-inline Mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
+inline std::mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
return _txn_mutex[transactionId & (_txn_shard_size - 1)];
}
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index 4695127dc4..e82e3e80f3 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -50,7 +50,6 @@
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "util/errno.h"
-#include "util/mutex.h"
#include "util/string_parser.hpp"
using std::string;
diff --git a/be/src/runtime/broker_mgr.cpp b/be/src/runtime/broker_mgr.cpp
index 220ce1abb0..f0c80d4cad 100644
--- a/be/src/runtime/broker_mgr.cpp
+++ b/be/src/runtime/broker_mgr.cpp
@@ -108,7 +108,7 @@ void BrokerMgr::ping_worker() {
for (auto& addr : addresses) {
ping(addr);
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(5)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
}
} // namespace doris
diff --git a/be/src/runtime/bufferpool/buffer_pool_internal.h b/be/src/runtime/bufferpool/buffer_pool_internal.h
index 5c483c06b0..d7231b1157 100644
--- a/be/src/runtime/bufferpool/buffer_pool_internal.h
+++ b/be/src/runtime/bufferpool/buffer_pool_internal.h
@@ -25,7 +25,6 @@
#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/bufferpool/buffer_pool_counters.h"
#include "runtime/bufferpool/reservation_tracker.h"
-#include "util/condition_variable.h"
// Ensure that DCheckConsistency() function calls get removed in release builds.
#ifndef NDEBUG
diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp
index 71d79a6df4..f662c1b04c 100644
--- a/be/src/runtime/external_scan_context_mgr.cpp
+++ b/be/src/runtime/external_scan_context_mgr.cpp
@@ -108,7 +108,7 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id)
void ExternalScanContextMgr::gc_expired_context() {
#ifndef BE_TEST
while (!_stop_background_threads_latch.wait_for(
- MonoDelta::FromSeconds(doris::config::scan_context_gc_interval_min * 60))) {
+ std::chrono::seconds(doris::config::scan_context_gc_interval_min * 60))) {
time_t current_time = time(nullptr);
std::vector<std::shared_ptr<ScanContext>> expired_contexts;
{
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 586f5c1cd2..34ac356909 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -695,7 +695,7 @@ void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment "
<< print_id(id);
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(1)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
}
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 1596903ac3..9de0736301 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -29,8 +29,8 @@
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption, MetricUnit::BYTES, "", mem_consumption,
- Labels({{"type", "load"}}));
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption, MetricUnit::BYTES, "",
+ mem_consumption, Labels({{"type", "load"}}));
// Calculate the total memory limit of all load tasks on this BE
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
@@ -243,7 +243,7 @@ Status LoadChannelMgr::_start_bg_worker() {
#else
uint32_t interval = 1;
#endif
- while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))) {
+ while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
_start_load_channels_clean();
}
},
diff --git a/be/src/runtime/load_path_mgr.cpp b/be/src/runtime/load_path_mgr.cpp
index 1e8e5d2fa0..061beac605 100644
--- a/be/src/runtime/load_path_mgr.cpp
+++ b/be/src/runtime/load_path_mgr.cpp
@@ -71,7 +71,7 @@ Status LoadPathMgr::init() {
"LoadPathMgr", "clean_expired_temp_path",
[this]() {
// TODO(zc): add this thread to cgroup for control resource it use
- while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(3600))) {
+ while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(3600))) {
this->clean();
}
},
diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp
index a62a061ca5..aaaae29b09 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -162,7 +162,7 @@ void ResultBufferMgr::cancel_thread() {
for (int i = 0; i < query_to_cancel.size(); ++i) {
cancel(query_to_cancel[i]);
}
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(1)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
LOG(INFO) << "result buffer manager cancel thread finish.";
}
diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp
index 5b0d9fabbe..c0f8a044b7 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.cpp
+++ b/be/src/runtime/routine_load/data_consumer_pool.cpp
@@ -89,14 +89,14 @@ void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) {
if (_pool.size() == _max_pool_size) {
VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-" << _max_pool_size
- << ", discard the returned consumer: " << consumer->id();
+ << ", discard the returned consumer: " << consumer->id();
return;
}
consumer->reset();
_pool.push_back(consumer);
VLOG_NOTICE << "return the data consumer: " << consumer->id()
- << ", current pool size: " << _pool.size();
+ << ", current pool size: " << _pool.size();
return;
}
@@ -115,7 +115,7 @@ Status DataConsumerPool::start_bg_worker() {
#endif
do {
_clean_idle_consumer_bg();
- } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(60)));
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(60)));
},
&_clean_idle_consumer_thread));
return Status::OK();
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index c4402fbece..98abb3ba6d 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -91,9 +91,6 @@ set(UTIL_FILES
cgroup_util.cpp
path_util.cpp
file_cache.cpp
- monotime.cpp
- mutex.cpp
- condition_variable.cpp
thread.cpp
threadpool.cpp
trace.cpp
diff --git a/be/src/util/barrier.h b/be/src/util/barrier.h
index 3f9835b409..40e42ea81c 100644
--- a/be/src/util/barrier.h
+++ b/be/src/util/barrier.h
@@ -18,10 +18,10 @@
#ifndef DORIS_BE_SRC_UTIL_BARRIER_H
#define DORIS_BE_SRC_UTIL_BARRIER_H
-#include "gutil/macros.h"
+#include <condition_variable>
+#include <mutex>
+
#include "olap/olap_define.h"
-#include "util/condition_variable.h"
-#include "util/mutex.h"
namespace doris {
@@ -29,17 +29,16 @@ namespace doris {
class Barrier {
public:
// Initialize the barrier with the given initial count.
- explicit Barrier(int count) : _cond(&_mutex), _count(count), _initial_count(count) {
- DCHECK_GT(count, 0);
- }
+ explicit Barrier(int count) : _count(count), _initial_count(count) { DCHECK_GT(count, 0); }
~Barrier() {}
-
+ Barrier(const Barrier&) = delete;
+ void operator=(const Barrier&) = delete;
// wait until all threads have reached the barrier.
// Once all threads have reached the barrier, the barrier is reset
// to the initial count.
void wait() {
- MutexLock l(&_mutex);
+ std::unique_lock<std::mutex> l(_mutex);
if (--_count == 0) {
_count = _initial_count;
_cycle_count++;
@@ -49,17 +48,16 @@ public:
int initial_cycle = _cycle_count;
while (_cycle_count == initial_cycle) {
- _cond.wait();
+ _cond.wait(l);
}
}
private:
- Mutex _mutex;
- ConditionVariable _cond;
int _count;
- uint32_t _cycle_count = 0;
const int _initial_count;
- DISALLOW_COPY_AND_ASSIGN(Barrier);
+ uint32_t _cycle_count = 0;
+ std::mutex _mutex;
+ std::condition_variable _cond;
};
#endif //DORIS_BE_SRC_UTIL_BARRIER_H
diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp
index c00f5510c4..6da6fb3fcb 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -78,8 +78,8 @@ public:
timer.start();
if (timeout_ms != 0) {
- auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
- if (_get_cv.wait_until(unique_lock, deadline) == std::cv_status::timeout) {
+ if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) ==
+ std::cv_status::timeout) {
return false;
}
} else {
diff --git a/be/src/util/condition_variable.cpp b/be/src/util/condition_variable.cpp
deleted file mode 100644
index ac763fce0f..0000000000
--- a/be/src/util/condition_variable.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "util/condition_variable.h"
-
-#include <sys/time.h>
-
-#include <cerrno>
-#include <cstdint>
-#include <ctime>
-
-#include "common/logging.h"
-#include "util/debug/sanitizer_scopes.h"
-#include "util/monotime.h"
-#include "util/mutex.h"
-
-namespace doris {
-
-ConditionVariable::ConditionVariable(Mutex* user_lock) : _user_mutex(&user_lock->_lock) {
- int rv = 0;
- pthread_condattr_t attrs;
- rv = pthread_condattr_init(&attrs);
- DCHECK_EQ(0, rv);
- pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
- rv = pthread_cond_init(&_condition, &attrs);
- pthread_condattr_destroy(&attrs);
- DCHECK_EQ(0, rv);
-}
-
-ConditionVariable::~ConditionVariable() {
- int rv = pthread_cond_destroy(&_condition);
- DCHECK_EQ(0, rv);
-}
-
-void ConditionVariable::wait() const {
- debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
- int rv = pthread_cond_wait(&_condition, _user_mutex);
- DCHECK_EQ(0, rv);
-}
-
-bool ConditionVariable::wait_until(const MonoTime& until) const {
- debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
- // Have we already timed out?
- MonoTime now = MonoTime::Now();
- if (now > until) {
- return false;
- }
-
- struct timespec absolute_time;
- until.ToTimeSpec(&absolute_time);
- int rv = pthread_cond_timedwait(&_condition, _user_mutex, &absolute_time);
- DCHECK(rv == 0 || rv == ETIMEDOUT) << "unexpected pthread_cond_timedwait return value: " << rv;
-
- return rv == 0;
-}
-
-bool ConditionVariable::wait_for(const MonoDelta& delta) const {
- debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
- // Negative delta means we've already timed out.
- int64_t nsecs = delta.ToNanoseconds();
- if (nsecs < 0) {
- return false;
- }
-
- // The timeout argument to pthread_cond_timedwait is in absolute time.
- struct timespec absolute_time;
- MonoTime deadline = MonoTime::Now() + delta;
- deadline.ToTimeSpec(&absolute_time);
- int rv = pthread_cond_timedwait(&_condition, _user_mutex, &absolute_time);
-
- DCHECK(rv == 0 || rv == ETIMEDOUT) << "unexpected pthread_cond_timedwait return value: " << rv;
- return rv == 0;
-}
-
-void ConditionVariable::notify_all() {
- int rv = pthread_cond_broadcast(&_condition);
- DCHECK_EQ(0, rv);
-}
-
-void ConditionVariable::notify_one() {
- int rv = pthread_cond_signal(&_condition);
- DCHECK_EQ(0, rv);
-}
-
-} // namespace doris
diff --git a/be/src/util/condition_variable.h b/be/src/util/condition_variable.h
deleted file mode 100644
index 7462893bd2..0000000000
--- a/be/src/util/condition_variable.h
+++ /dev/null
@@ -1,114 +0,0 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-// ConditionVariable wraps pthreads condition variable synchronization or, on
-// Windows, simulates it. This functionality is very helpful for having
-// several threads wait for an event, as is common with a thread pool managed
-// by a master. The meaning of such an event in the (worker) thread pool
-// scenario is that additional tasks are now available for processing. It is
-// used in Chrome in the DNS prefetching system to notify worker threads that
-// a queue now has items (tasks) which need to be tended to. A related use
-// would have a pool manager waiting on a ConditionVariable, waiting for a
-// thread in the pool to announce (signal) that there is now more room in a
-// (bounded size) communications queue for the manager to deposit tasks, or,
-// as a second example, that the queue of tasks is completely empty and all
-// workers are waiting.
-//
-// USAGE NOTE 1: spurious signal events are possible with this and
-// most implementations of condition variables. As a result, be
-// *sure* to retest your condition before proceeding. The following
-// is a good example of doing this correctly:
-//
-// while (!work_to_be_done()) Wait(...);
-//
-// In contrast do NOT do the following:
-//
-// if (!work_to_be_done()) Wait(...); // Don't do this.
-//
-// Especially avoid the above if you are relying on some other thread only
-// issuing a signal up *if* there is work-to-do. There can/will
-// be spurious signals. Recheck state on waiting thread before
-// assuming the signal was intentional. Caveat caller ;-).
-//
-// USAGE NOTE 2: notify_all() frees up all waiting threads at once,
-// which leads to contention for the locks they all held when they
-// called wait(). This results in POOR performance. A much better
-// approach to getting a lot of threads out of wait() is to have each
-// thread (upon exiting wait()) call notify_one() to free up another
-// Waiting thread. Look at condition_variable_unittest.cc for
-// both examples.
-//
-// notify_all() can be used nicely during teardown, as it gets the job
-// done, and leaves no sleeping threads... and performance is less
-// critical at that point.
-//
-// The semantics of notify_all() are carefully crafted so that *all*
-// threads that were waiting when the request was made will indeed
-// get signaled. Some implementations mess up, and don't signal them
-// all, while others allow the wait to be effectively turned off (for
-// a while while waiting threads come around). This implementation
-// appears correct, as it will not "lose" any signals, and will guarantee
-// that all threads get signaled by notify_all().
-//
-// This implementation offers support for "performance" in its selection of
-// which thread to revive. Performance, in direct contrast with "fairness,"
-// assures that the thread that most recently began to wait() is selected by
-// Signal to revive. Fairness would (if publicly supported) assure that the
-// thread that has wait()ed the longest is selected. The default policy
-// may improve performance, as the selected thread may have a greater chance of
-// having some of its stack data in various CPU caches.
-//
-// For a discussion of the many very subtle implementation details, see the FAQ
-// at the end of condition_variable_win.cc.
-
-#ifndef DORIS_BE_SRC_UTIL_CONDITION_VARIABLE_H
-#define DORIS_BE_SRC_UTIL_CONDITION_VARIABLE_H
-
-#include <pthread.h>
-
-#include "olap/olap_define.h"
-
-namespace doris {
-
-class MonoDelta;
-class MonoTime;
-class Mutex;
-
-class ConditionVariable {
-public:
- // Construct a cv for use with ONLY one user lock.
- explicit ConditionVariable(Mutex* user_lock);
-
- ~ConditionVariable();
-
- // wait() releases the caller's critical section atomically as it starts to
- // sleep, and the reacquires it when it is signaled.
- void wait() const;
-
- // Like wait(), but only waits up to a certain point in time.
- //
- // Returns true if we were notify_one()'ed, or false if we reached 'until'.
- bool wait_until(const MonoTime& until) const;
-
- // Like wait(), but only waits up to a limited amount of time.
- //
- // Returns true if we were notify_one()'ed, or false if 'delta' elapsed.
- bool wait_for(const MonoDelta& delta) const;
-
- // notify_all() revives all waiting threads.
- void notify_all();
-
- // notify_one() revives one waiting thread.
- void notify_one();
-
-private:
- mutable pthread_cond_t _condition;
- pthread_mutex_t* _user_mutex;
-
- DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
-};
-
-} // namespace doris
-
-#endif // DORIS_BE_SRC_UTIL_CONDITION_VARIABLE_H
diff --git a/be/src/util/countdown_latch.h b/be/src/util/countdown_latch.h
index fbd43c9ffd..f8388b2676 100644
--- a/be/src/util/countdown_latch.h
+++ b/be/src/util/countdown_latch.h
@@ -18,11 +18,12 @@
#ifndef DORIS_BE_SRC_UTIL_COUNTDOWN_LATCH_H
#define DORIS_BE_SRC_UTIL_COUNTDOWN_LATCH_H
+#include <chrono>
+#include <condition_variable>
+#include <mutex>
+
#include "common/logging.h"
#include "olap/olap_define.h"
-#include "util/condition_variable.h"
-#include "util/monotime.h"
-#include "util/mutex.h"
namespace doris {
@@ -32,27 +33,27 @@ namespace doris {
class CountDownLatch {
public:
// Initialize the latch with the given initial count.
- explicit CountDownLatch(int count) : cond_(&lock_), count_(count) {}
+ explicit CountDownLatch(int count) : _count(count) {}
// Decrement the count of this latch by 'amount'
// If the new count is less than or equal to zero, then all waiting threads are woken up.
// If the count is already zero, this has no effect.
void count_down(int amount) {
DCHECK_GE(amount, 0);
- MutexLock lock(&lock_);
- if (count_ == 0) {
+ std::lock_guard<std::mutex> lock(_lock);
+ if (_count == 0) {
return;
}
- if (amount >= count_) {
- count_ = 0;
+ if (amount >= _count) {
+ _count = 0;
} else {
- count_ -= amount;
+ _count -= amount;
}
- if (count_ == 0) {
+ if (_count == 0) {
// Latch has triggered.
- cond_.notify_all();
+ _cond.notify_all();
}
}
@@ -63,64 +64,59 @@ public:
// Wait until the count on the latch reaches zero.
// If the count is already zero, this returns immediately.
- void wait() const {
- MutexLock lock(&lock_);
- while (count_ > 0) {
- cond_.wait();
+ void wait() {
+ std::unique_lock<std::mutex> lock(_lock);
+ while (_count > 0) {
+ _cond.wait(lock);
}
}
- // Waits for the count on the latch to reach zero, or until 'until' time is reached.
- // Returns true if the count became zero, false otherwise.
- bool wait_until(const MonoTime& when) const {
- MutexLock lock(&lock_);
- while (count_ > 0) {
- if (!cond_.wait_until(when)) {
- return false;
- }
- }
- return true;
- }
-
// Waits for the count on the latch to reach zero, or until 'delta' time elapses.
// Returns true if the count became zero, false otherwise.
- bool wait_for(const MonoDelta& delta) const { return wait_until(MonoTime::Now() + delta); }
+ template <class Rep, class Period>
+ bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
+ std::unique_lock lock(_lock);
+ return _cond.wait_for(lock, delta, [&]() { return _count <= 0; });
+ }
// Reset the latch with the given count. This is equivalent to reconstructing
// the latch. If 'count' is 0, and there are currently waiters, those waiters
// will be triggered as if you counted down to 0.
void reset(uint64_t count) {
- MutexLock lock(&lock_);
- count_ = count;
- if (count_ == 0) {
+ std::lock_guard<std::mutex> lock(_lock);
+ _count = count;
+ if (_count == 0) {
// Awake any waiters if we reset to 0.
- cond_.notify_all();
+ _cond.notify_all();
}
}
uint64_t count() const {
- MutexLock lock(&lock_);
- return count_;
+ std::lock_guard<std::mutex> lock(_lock);
+ return _count;
}
private:
- mutable Mutex lock_;
- ConditionVariable cond_;
+ mutable std::mutex _lock;
+ mutable std::condition_variable _cond;
+
+ uint64_t _count;
- uint64_t count_;
- DISALLOW_COPY_AND_ASSIGN(CountDownLatch);
+ CountDownLatch(const CountDownLatch&) = delete;
+ void operator=(const CountDownLatch&) = delete;
};
// Utility class which calls latch->CountDown() in its destructor.
class CountDownOnScopeExit {
public:
- explicit CountDownOnScopeExit(CountDownLatch* latch) : latch_(latch) {}
- ~CountDownOnScopeExit() { latch_->count_down(); }
+ explicit CountDownOnScopeExit(CountDownLatch* latch) : _latch(latch) {}
+ ~CountDownOnScopeExit() { _latch->count_down(); }
private:
- DISALLOW_COPY_AND_ASSIGN(CountDownOnScopeExit);
+ CountDownLatch* _latch;
- CountDownLatch* latch_;
+ CountDownOnScopeExit(const CountDownOnScopeExit&) = delete;
+ void operator=(const CountDownOnScopeExit&) = delete;
};
} // namespace doris
diff --git a/be/src/util/monotime.cpp b/be/src/util/monotime.cpp
deleted file mode 100644
index e13cb5b596..0000000000
--- a/be/src/util/monotime.cpp
+++ /dev/null
@@ -1,305 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/monotime.h"
-
-#include <sys/time.h>
-
-#include <limits>
-
-#include "common/logging.h"
-#include "gutil/sysinfo.h"
-
-namespace doris {
-
-#define MAX_MONOTONIC_SECONDS (((1ULL << 63) - 1ULL) / (int64_t)MonoTime::kNanosecondsPerSecond)
-
-const int64_t MonoDelta::kUninitialized = kint64min;
-
-MonoDelta MonoDelta::FromSeconds(double seconds) {
- int64_t delta = seconds * MonoTime::kNanosecondsPerSecond;
- return MonoDelta(delta);
-}
-
-MonoDelta MonoDelta::FromMilliseconds(int64_t ms) {
- return MonoDelta(ms * MonoTime::kNanosecondsPerMillisecond);
-}
-
-MonoDelta MonoDelta::FromMicroseconds(int64_t us) {
- return MonoDelta(us * MonoTime::kNanosecondsPerMicrosecond);
-}
-
-MonoDelta MonoDelta::FromNanoseconds(int64_t ns) {
- return MonoDelta(ns);
-}
-
-MonoDelta::MonoDelta() : nano_delta_(kUninitialized) {}
-
-bool MonoDelta::Initialized() const {
- return nano_delta_ != kUninitialized;
-}
-
-bool MonoDelta::LessThan(const MonoDelta& rhs) const {
- DCHECK(Initialized());
- DCHECK(rhs.Initialized());
- return nano_delta_ < rhs.nano_delta_;
-}
-
-bool MonoDelta::MoreThan(const MonoDelta& rhs) const {
- DCHECK(Initialized());
- DCHECK(rhs.Initialized());
- return nano_delta_ > rhs.nano_delta_;
-}
-
-bool MonoDelta::Equals(const MonoDelta& rhs) const {
- DCHECK(Initialized());
- DCHECK(rhs.Initialized());
- return nano_delta_ == rhs.nano_delta_;
-}
-
-std::string MonoDelta::ToString() const {
- char buf[1024] = {'\0'};
- snprintf(buf, sizeof(buf), "%.3f", ToSeconds());
- return std::string(buf);
-}
-
-MonoDelta::MonoDelta(int64_t delta) : nano_delta_(delta) {}
-
-double MonoDelta::ToSeconds() const {
- DCHECK(Initialized());
- double d(nano_delta_);
- d /= MonoTime::kNanosecondsPerSecond;
- return d;
-}
-
-int64_t MonoDelta::ToNanoseconds() const {
- DCHECK(Initialized());
- return nano_delta_;
-}
-
-int64_t MonoDelta::ToMicroseconds() const {
- DCHECK(Initialized());
- return nano_delta_ / MonoTime::kNanosecondsPerMicrosecond;
-}
-
-int64_t MonoDelta::ToMilliseconds() const {
- DCHECK(Initialized());
- return nano_delta_ / MonoTime::kNanosecondsPerMillisecond;
-}
-
-void MonoDelta::ToTimeVal(struct timeval* tv) const {
- DCHECK(Initialized());
- tv->tv_sec = nano_delta_ / MonoTime::kNanosecondsPerSecond;
- tv->tv_usec = (nano_delta_ - (tv->tv_sec * MonoTime::kNanosecondsPerSecond)) /
- MonoTime::kNanosecondsPerMicrosecond;
-
- // tv_usec must be between 0 and 999999.
- // There is little use for negative timevals so wrap it in PREDICT_FALSE.
- if (PREDICT_FALSE(tv->tv_usec < 0)) {
- --(tv->tv_sec);
- tv->tv_usec += 1000000;
- }
-
- // Catch positive corner case where we "round down" and could potentially set a timeout of 0.
- // Make it 1 usec.
- if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ > 0)) {
- tv->tv_usec = 1;
- }
-
- // Catch negative corner case where we "round down" and could potentially set a timeout of 0.
- // Make it -1 usec (but normalized, so tv_usec is not negative).
- if (PREDICT_FALSE(tv->tv_usec == 0 && tv->tv_sec == 0 && nano_delta_ < 0)) {
- tv->tv_sec = -1;
- tv->tv_usec = 999999;
- }
-}
-
-void MonoDelta::NanosToTimeSpec(int64_t nanos, struct timespec* ts) {
- ts->tv_sec = nanos / MonoTime::kNanosecondsPerSecond;
- ts->tv_nsec = nanos - (ts->tv_sec * MonoTime::kNanosecondsPerSecond);
-
- // tv_nsec must be between 0 and 999999999.
- // There is little use for negative timespecs so wrap it in PREDICT_FALSE.
- if (PREDICT_FALSE(ts->tv_nsec < 0)) {
- --(ts->tv_sec);
- ts->tv_nsec += MonoTime::kNanosecondsPerSecond;
- }
-}
-
-void MonoDelta::ToTimeSpec(struct timespec* ts) const {
- DCHECK(Initialized());
- NanosToTimeSpec(nano_delta_, ts);
-}
-
-MonoTime MonoTime::Now() {
- struct timespec ts;
- PCHECK(clock_gettime(CLOCK_MONOTONIC, &ts) == 0);
- return MonoTime(ts);
-}
-
-MonoTime MonoTime::Max() {
- return MonoTime(std::numeric_limits<int64_t>::max());
-}
-
-MonoTime MonoTime::Min() {
- return MonoTime(1);
-}
-
-const MonoTime& MonoTime::Earliest(const MonoTime& a, const MonoTime& b) {
- if (b.nanos_ < a.nanos_) {
- return b;
- }
- return a;
-}
-
-MonoTime::MonoTime() : nanos_(0) {}
-
-bool MonoTime::Initialized() const {
- return nanos_ != 0;
-}
-
-MonoDelta MonoTime::GetDeltaSince(const MonoTime& rhs) const {
- DCHECK(Initialized());
- DCHECK(rhs.Initialized());
- int64_t delta(nanos_);
- delta -= rhs.nanos_;
- return MonoDelta(delta);
-}
-
-void MonoTime::AddDelta(const MonoDelta& delta) {
- DCHECK(Initialized());
- nanos_ += delta.nano_delta_;
-}
-
-bool MonoTime::ComesBefore(const MonoTime& rhs) const {
- DCHECK(Initialized());
- DCHECK(rhs.Initialized());
- return nanos_ < rhs.nanos_;
-}
-
-std::string MonoTime::ToString() const {
- char buf[1024] = {'\0'};
- snprintf(buf, sizeof(buf), "%.3f", ToSeconds());
- return std::string(buf);
-}
-
-void MonoTime::ToTimeSpec(struct timespec* ts) const {
- DCHECK(Initialized());
- MonoDelta::NanosToTimeSpec(nanos_, ts);
-}
-
-bool MonoTime::Equals(const MonoTime& other) const {
- return nanos_ == other.nanos_;
-}
-
-MonoTime& MonoTime::operator+=(const MonoDelta& delta) {
- this->AddDelta(delta);
- return *this;
-}
-
-MonoTime& MonoTime::operator-=(const MonoDelta& delta) {
- this->AddDelta(MonoDelta(-1 * delta.nano_delta_));
- return *this;
-}
-
-MonoTime::MonoTime(const struct timespec& ts) {
- // Monotonic time resets when the machine reboots. The 64-bit limitation
- // means that we can't represent times larger than 292 years, which should be
- // adequate.
- CHECK_LT(ts.tv_sec, MAX_MONOTONIC_SECONDS);
- nanos_ = ts.tv_sec;
- nanos_ *= MonoTime::kNanosecondsPerSecond;
- nanos_ += ts.tv_nsec;
-}
-
-MonoTime::MonoTime(int64_t nanos) : nanos_(nanos) {}
-
-double MonoTime::ToSeconds() const {
- double d(nanos_);
- d /= MonoTime::kNanosecondsPerSecond;
- return d;
-}
-
-void SleepFor(const MonoDelta& delta) {
- base::SleepForNanoseconds(delta.ToNanoseconds());
-}
-
-bool operator==(const MonoDelta& lhs, const MonoDelta& rhs) {
- return lhs.Equals(rhs);
-}
-
-bool operator!=(const MonoDelta& lhs, const MonoDelta& rhs) {
- return !lhs.Equals(rhs);
-}
-
-bool operator<(const MonoDelta& lhs, const MonoDelta& rhs) {
- return lhs.LessThan(rhs);
-}
-
-bool operator<=(const MonoDelta& lhs, const MonoDelta& rhs) {
- return lhs.LessThan(rhs) || lhs.Equals(rhs);
-}
-
-bool operator>(const MonoDelta& lhs, const MonoDelta& rhs) {
- return lhs.MoreThan(rhs);
-}
-
-bool operator>=(const MonoDelta& lhs, const MonoDelta& rhs) {
- return lhs.MoreThan(rhs) || lhs.Equals(rhs);
-}
-
-bool operator==(const MonoTime& lhs, const MonoTime& rhs) {
- return lhs.Equals(rhs);
-}
-
-bool operator!=(const MonoTime& lhs, const MonoTime& rhs) {
- return !lhs.Equals(rhs);
-}
-
-bool operator<(const MonoTime& lhs, const MonoTime& rhs) {
- return lhs.ComesBefore(rhs);
-}
-
-bool operator<=(const MonoTime& lhs, const MonoTime& rhs) {
- return lhs.ComesBefore(rhs) || lhs.Equals(rhs);
-}
-
-bool operator>(const MonoTime& lhs, const MonoTime& rhs) {
- return rhs.ComesBefore(lhs);
-}
-
-bool operator>=(const MonoTime& lhs, const MonoTime& rhs) {
- return rhs.ComesBefore(lhs) || rhs.Equals(lhs);
-}
-
-MonoTime operator+(const MonoTime& t, const MonoDelta& delta) {
- MonoTime tmp(t);
- tmp.AddDelta(delta);
- return tmp;
-}
-
-MonoTime operator-(const MonoTime& t, const MonoDelta& delta) {
- MonoTime tmp(t);
- tmp.AddDelta(MonoDelta::FromNanoseconds(-delta.ToNanoseconds()));
- return tmp;
-}
-
-MonoDelta operator-(const MonoTime& t_end, const MonoTime& t_beg) {
- return t_end.GetDeltaSince(t_beg);
-}
-
-} // namespace doris
diff --git a/be/src/util/monotime.h b/be/src/util/monotime.h
deleted file mode 100644
index 76ccbfb23c..0000000000
--- a/be/src/util/monotime.h
+++ /dev/null
@@ -1,406 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_UTIL_MONOTIME_H
-#define DORIS_BE_SRC_UTIL_MONOTIME_H
-
-#include <cstdint>
-#include <string>
-
-#include "gutil/port.h"
-
-namespace doris {
-
-/// @brief A representation of a time interval.
-///
-/// The MonoDelta class represents an elapsed duration of time -- i.e.
-/// the delta between two MonoTime instances.
-class MonoDelta {
-public:
- /// @name Converters from seconds representation (and ubiquitous SI prefixes).
- ///
- /// @param [in] seconds/ms/us/ns
- /// Time interval representation in seconds (with ubiquitous SI prefixes).
- /// @return The resulting MonoDelta object initialized in accordance with
- /// the specified parameter.
- ///
- ///@{
- static MonoDelta FromSeconds(double seconds);
- static MonoDelta FromMilliseconds(int64_t ms);
- static MonoDelta FromMicroseconds(int64_t us);
- static MonoDelta FromNanoseconds(int64_t ns);
- ///@}
-
- /// Build a MonoDelta object.
- ///
- /// @note A MonoDelta instance built with the this default constructor is
- /// "uninitialized" and may not be used for any operation.
- MonoDelta();
-
- /// @return @c true iff this object is initialized.
- bool Initialized() const;
-
- /// Check whether this time interval is shorter than the specified one.
- ///
- /// @param [in] rhs
- /// A time interval for comparison.
- /// @return @c true iff this time interval is strictly shorter
- /// than the specified one.
- bool LessThan(const MonoDelta& rhs) const;
-
- /// Check whether this time interval is longer than the specified one.
- ///
- /// @param [in] rhs
- /// A time interval for comparison.
- /// @return @c true iff this time interval is strictly longer
- /// than the specified one.
- bool MoreThan(const MonoDelta& rhs) const;
-
- /// Check whether this time interval has the same duration
- /// as the specified one.
- ///
- /// @param [in] rhs
- /// A time interval for comparison.
- /// @return @c true iff this time interval has the same duration as the
- /// the specified one.
- bool Equals(const MonoDelta& rhs) const;
-
- /// @return String representation of this interval's duration (in seconds).
- std::string ToString() const;
-
- /// @name Converters into seconds representation (and ubiquitous SI prefixes).
- ///
- /// @return Representation of the time interval in appropriate SI units.
- ///
- ///@{
- double ToSeconds() const;
- int64_t ToMilliseconds() const;
- int64_t ToMicroseconds() const;
- int64_t ToNanoseconds() const;
- ///@}
-
- /// Represent this time interval as a timeval structure, with microsecond
- /// accuracy.
- ///
- /// @param [out] tv
- /// Placeholder for the result value.
- void ToTimeVal(struct timeval* tv) const;
-
- /// Represent this time interval as a timespec structure, with nanosecond
- /// accuracy.
- ///
- /// @param [out] ts
- /// Placeholder for the result value.
- void ToTimeSpec(struct timespec* ts) const;
-
- /// Convert a nanosecond value to a timespec.
- ///
- /// @param [in] nanos
- /// Representation of a relative point in time in nanoseconds.
- /// @param [out] ts
- /// Placeholder for the resulting timespec representation.
- static void NanosToTimeSpec(int64_t nanos, struct timespec* ts);
-
-private:
- static const int64_t kUninitialized;
-
- friend class MonoTime;
-
- explicit MonoDelta(int64_t delta);
- int64_t nano_delta_;
-};
-
-/// @brief Representation of a particular point in time.
-///
-/// The MonoTime class represents a particular point in time,
-/// relative to some fixed but unspecified reference point.
-///
-/// This time is monotonic, meaning that if the user changes his or her system
-/// clock, the monotime does not change.
-class MonoTime {
-public:
- /// @name Conversion constants for ubiquitous time units.
- ///
- ///@{
- static const int64_t kNanosecondsPerSecond = 1000000000L;
- static const int64_t kNanosecondsPerMillisecond = 1000000L;
- static const int64_t kNanosecondsPerMicrosecond = 1000L;
-
- static const int64_t kMicrosecondsPerSecond = 1000000L;
- ///@}
-
- /// Get current time in MonoTime representation.
- ///
- /// @return Time specification for the moment of the method's invocation.
- static MonoTime Now();
-
- /// @return MonoTime equal to farthest possible time into the future.
- static MonoTime Max();
-
- /// @return MonoTime equal to farthest possible time into the past.
- static MonoTime Min();
-
- /// Select the earliest between the specified time points.
- ///
- /// @param [in] a
- /// The first MonoTime object to select from.
- /// @param [in] b
- /// The second MonoTime object to select from.
- /// @return The earliest (minimum) of the two monotimes.
- static const MonoTime& Earliest(const MonoTime& a, const MonoTime& b)
- ATTRIBUTE_DEPRECATED("use std::min() instead");
-
- /// Build a MonoTime object. The resulting object is not initialized
- /// and not ready to use.
- MonoTime();
-
- /// @return @c true iff the object is initialized.
- bool Initialized() const;
-
- /// Compute time interval between the point in time specified by this
- /// and the specified object.
- ///
- /// @param [in] rhs
- /// The object that corresponds to the left boundary of the time interval,
- /// where this object corresponds to the right boundary of the interval.
- /// @return The resulting time interval represented as a MonoDelta object.
- MonoDelta GetDeltaSince(const MonoTime& rhs) const;
-
- /// Advance this object's time specification by the specified interval.
- ///
- /// @param [in] delta
- /// The time interval to add.
- void AddDelta(const MonoDelta& delta);
-
- /// Check whether the point in time specified by this object is earlier
- /// than the specified one.
- ///
- /// @param [in] rhs
- /// The other MonoTime object to compare with.
- /// @return @c true iff the point in time represented by this MonoTime object
- /// is earlier then the point in time represented by the parameter.
- bool ComesBefore(const MonoTime& rhs) const;
-
- /// @return String representation of the object (in seconds).
- std::string ToString() const;
-
- /// Represent this point in time as a timespec structure, with nanosecond
- /// accuracy.
- ///
- /// @param [out] ts
- /// Placeholder for the result value.
- void ToTimeSpec(struct timespec* ts) const;
-
- /// Check whether this object represents the same point in time as the other.
- ///
- /// @param [in] other
- /// The other MonoTime object to compare.
- /// @return @c true iff the point in time represented by this MonoTime object
- /// is the same as the one represented by the other.
- bool Equals(const MonoTime& other) const;
-
- /// @name Syntactic sugar: increment/decrement operators for MonoTime.
- ///@{
- ///
- /// Add a delta to the point in time represented by the object.
- ///
- /// @param [in] delta
- /// The delta to add.
- /// @return Reference to the modified object.
- MonoTime& operator+=(const MonoDelta& delta);
-
- /// Substract a delta from the point in time represented by the object.
- ///
- /// @param [in] delta
- /// The delta to substract.
- /// @return Reference to the modified object.
- MonoTime& operator-=(const MonoDelta& delta);
- ///@}
-
-private:
- friend class MonoDelta;
-
- explicit MonoTime(const struct timespec& ts);
- explicit MonoTime(int64_t nanos);
- double ToSeconds() const;
- int64_t nanos_;
-};
-
-/// Sleep for an interval specified by a MonoDelta instance.
-///
-/// This is preferred over sleep(3), usleep(3), and nanosleep(3).
-/// It's less prone to mixups with units since it uses the MonoDelta for
-/// interval specification.
-/// Besides, it ignores signals/EINTR, so will reliably sleep at least for the
-/// MonoDelta duration.
-///
-/// @param [in] delta
-/// The time interval to sleep for.
-void SleepFor(const MonoDelta& delta);
-
-/// @name Syntactic sugar: binary operators for MonoDelta.
-///@{
-///
-/// @param [in] lhs
-/// A time interval for comparison: the left-hand operand.
-/// @param [in] rhs
-/// A time interval for comparison: the right-hand operand.
-/// @return @c true iff the time interval represented by @c lhs is equal
-/// to the time interval represented by @c rhs.
-bool operator==(const MonoDelta& lhs, const MonoDelta& rhs);
-
-/// @param [in] lhs
-/// A time interval for comparison: the left-hand operand.
-/// @param [in] rhs
-/// A time interval for comparison: the right-hand operand.
-/// @return @c true iff the time interval represented by @c lhs is not equal
-/// to the time interval represented by @c rhs.
-bool operator!=(const MonoDelta& lhs, const MonoDelta& rhs);
-
-/// @param [in] lhs
-/// A time interval for comparison: the left-hand operand.
-/// @param [in] rhs
-/// A time interval for comparison: the right-hand operand.
-/// @return @c true iff the time interval represented by @c lhs is shorter
-/// than the time interval represented by @c rhs.
-bool operator<(const MonoDelta& lhs, const MonoDelta& rhs);
-
-/// @param [in] lhs
-/// A time interval for comparison: the left-hand operand.
-/// @param [in] rhs
-/// A time interval for comparison: the right-hand operand.
-/// @return @c true iff the time interval represented by @c lhs is shorter
-/// than or equal to the time interval represented by @c rhs.
-bool operator<=(const MonoDelta& lhs, const MonoDelta& rhs);
-
-/// @param [in] lhs
-/// A time interval for comparison: the left-hand operand.
-/// @param [in] rhs
-/// A time interval for comparison: the right-hand operand.
-/// @return @c true iff the time interval represented by @c lhs is longer
-/// than the time interval represented by @c rhs.
-bool operator>(const MonoDelta& lhs, const MonoDelta& rhs);
-
-/// @param [in] lhs
-/// A time interval for comparison: the left-hand operand.
-/// @param [in] rhs
-/// A time interval for comparison: the right-hand operand.
-/// @return @c true iff the time interval represented by @c lhs is longer
-/// than or equal to the time interval represented by @c rhs.
-bool operator>=(const MonoDelta& lhs, const MonoDelta& rhs);
-///@}
-
-/// @name Syntactic sugar: binary operators for MonoTime.
-///@{
-///
-/// Check if the specified objects represent the same point in time.
-///
-/// This is a handy operator which is semantically equivalent to
-/// MonoTime::Equals().
-///
-/// @param [in] lhs
-/// The left-hand operand.
-/// @param [in] rhs
-/// The right-hand operand.
-/// @return @c true iff the given objects represent the same point in time.
-bool operator==(const MonoTime& lhs, const MonoTime& rhs);
-
-/// Check if the specified objects represent different points in time.
-///
-/// This is a handy operator which is semantically equivalent to the negation of
-/// MonoTime::Equals().
-///
-/// @param [in] lhs
-/// The left-hand operand.
-/// @param [in] rhs
-/// The right-hand operand.
-/// @return @c true iff the given object represents a different point in time
-/// than the specified one.
-bool operator!=(const MonoTime& lhs, const MonoTime& rhs);
-
-/// @param [in] lhs
-/// The left-hand operand.
-/// @param [in] rhs
-/// The right-hand operand.
-/// @return @c true iff the @c lhs object represents an earlier point in time
-/// than the @c rhs object.
-bool operator<(const MonoTime& lhs, const MonoTime& rhs);
-
-/// @param [in] lhs
-/// The left-hand operand.
-/// @param [in] rhs
-/// The right-hand operand.
-/// @return @c true iff the @c lhs object represents an earlier than or
-/// the same point in time as the @c rhs object.
-bool operator<=(const MonoTime& lhs, const MonoTime& rhs);
-
-/// @param [in] lhs
-/// The left-hand operand.
-/// @param [in] rhs
-/// The right-hand operand.
-/// @return @c true iff the @c lhs object represents a later point in time
-/// than the @c rhs object.
-bool operator>(const MonoTime& lhs, const MonoTime& rhs);
-
-/// @param [in] lhs
-/// The left-hand operand.
-/// @param [in] rhs
-/// The right-hand operand.
-/// @return @c true iff the @c lhs object represents a later than or
-/// the same point in time as the @c rhs object.
-bool operator>=(const MonoTime& lhs, const MonoTime& rhs);
-///@}
-
-/// @name Syntactic sugar: mixed binary operators for MonoTime/MonoDelta.
-///@{
-///
-/// Add the specified time interval to the given point in time.
-///
-/// @param [in] t
-/// A MonoTime object representing the given point in time.
-/// @param [in] delta
-/// A MonoDelta object representing the specified time interval.
-/// @return A MonoTime object representing the resulting point in time.
-MonoTime operator+(const MonoTime& t, const MonoDelta& delta);
-
-/// Subtract the specified time interval from the given point in time.
-///
-/// @param [in] t
-/// A MonoTime object representing the given point in time.
-/// @param [in] delta
-/// A MonoDelta object representing the specified time interval.
-/// @return A MonoTime object representing the resulting point in time.
-MonoTime operator-(const MonoTime& t, const MonoDelta& delta);
-
-/// Compute the time interval between the specified points in time.
-///
-/// Semantically, this is equivalent to t0.GetDeltaSince(t1).
-///
-/// @param [in] t_end
-/// The second point in time. Semantically corresponds to the end
-/// of the resulting time interval.
-/// @param [in] t_beg
-/// The first point in time. Semantically corresponds to the beginning
-/// of the resulting time interval.
-/// @return A MonoDelta object representing the time interval between the
-/// specified points in time.
-MonoDelta operator-(const MonoTime& t_end, const MonoTime& t_begin);
-///@}
-
-} // namespace doris
-
-#endif //DORIS_BE_SRC_UTIL_MONOTIME_H
diff --git a/be/src/util/mutex.cpp b/be/src/util/mutex.cpp
deleted file mode 100644
index 921f7a24fb..0000000000
--- a/be/src/util/mutex.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/mutex.h"
-
-#include <cerrno>
-
-#include "common/logging.h"
-
-namespace doris {
-
-#define PTHREAD_MUTEX_INIT_WITH_LOG(lockptr, param) \
- do { \
- int lock_ret = 0; \
- if (0 != (lock_ret = pthread_mutex_init(lockptr, param))) { \
- LOG(FATAL) << "fail to init mutex. err=" << strerror(lock_ret); \
- } \
- } while (0)
-
-#define PTHREAD_MUTEX_DESTROY_WITH_LOG(lockptr) \
- do { \
- int lock_ret = 0; \
- if (0 != (lock_ret = pthread_mutex_destroy(lockptr))) { \
- LOG(FATAL) << "fail to destroy mutex. err=" << strerror(lock_ret); \
- } \
- } while (0)
-
-#define PTHREAD_MUTEX_LOCK_WITH_LOG(lockptr) \
- do { \
- int lock_ret = 0; \
- if (0 != (lock_ret = pthread_mutex_lock(lockptr))) { \
- LOG(FATAL) << "fail to lock mutex. err=" << strerror(lock_ret); \
- } \
- } while (0)
-
-#define PTHREAD_MUTEX_UNLOCK_WITH_LOG(lockptr) \
- do { \
- int lock_ret = 0; \
- if (0 != (lock_ret = pthread_mutex_unlock(lockptr))) { \
- LOG(FATAL) << "fail to unlock mutex. err=" << strerror(lock_ret); \
- } \
- } while (0)
-
-Mutex::Mutex() {
- PTHREAD_MUTEX_INIT_WITH_LOG(&_lock, nullptr);
-}
-
-Mutex::~Mutex() {
- PTHREAD_MUTEX_DESTROY_WITH_LOG(&_lock);
-}
-
-OLAPStatus Mutex::lock() {
- PTHREAD_MUTEX_LOCK_WITH_LOG(&_lock);
- return OLAP_SUCCESS;
-}
-
-OLAPStatus Mutex::trylock() {
- int rv = pthread_mutex_trylock(&_lock);
- if (rv != 0) {
- VLOG_NOTICE << "failed to got the mutex lock. error=" << strerror(rv);
- return OLAP_ERR_RWLOCK_ERROR;
- }
- return OLAP_SUCCESS;
-}
-
-OLAPStatus Mutex::unlock() {
- PTHREAD_MUTEX_UNLOCK_WITH_LOG(&_lock);
- return OLAP_SUCCESS;
-}
-
-} // namespace doris
diff --git a/be/src/util/mutex.h b/be/src/util/mutex.h
deleted file mode 100644
index 86b6e81e59..0000000000
--- a/be/src/util/mutex.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_UTIL_MUTEX_H
-#define DORIS_BE_SRC_UTIL_MUTEX_H
-#include <mutex>
-#include <shared_mutex>
-#include "olap/olap_define.h"
-
-namespace doris {
-
-using ReadLock = std::shared_lock<std::shared_mutex>;
-using WriteLock = std::lock_guard<std::shared_mutex>;
-using UniqueWriteLock = std::unique_lock<std::shared_mutex>;
-
-#define TRY_LOCK true
-
-// encapsulation of pthread_mutex to lock the critical sources.
-class Mutex {
-public:
- Mutex();
- ~Mutex();
-
- // wait until obtain the lock
- OLAPStatus lock();
-
- // try obtaining the lock
- OLAPStatus trylock();
-
- // unlock is called after lock()
- OLAPStatus unlock();
-
- pthread_mutex_t* getlock() { return &_lock; }
-
-private:
- friend class ConditionVariable;
-
- pthread_mutex_t _lock;
- DISALLOW_COPY_AND_ASSIGN(Mutex);
-};
-
-// Helper class than locks a mutex on construction
-// and unlocks the mutex on deconstruction.
-class MutexLock {
-public:
- // wait until obtain the lock
- explicit MutexLock(Mutex* mutex, bool try_lock = false) : _mutex(mutex), _locked(false) {
- if (try_lock) {
- _locked = (_mutex->trylock() == OLAP_SUCCESS);
- } else {
- _mutex->lock();
- _locked = true;
- }
- }
-
- // unlock is called after
- ~MutexLock() {
- if (_locked) {
- _mutex->unlock();
- }
- }
-
- void lock() {
- _mutex->lock();
- _locked = true;
- }
-
- void unlock() {
- _mutex->unlock();
- _locked = false;
- }
-
- bool own_lock() const { return _locked; }
-
-private:
- Mutex* _mutex;
- bool _locked;
- DISALLOW_COPY_AND_ASSIGN(MutexLock);
-};
-} //namespace doris
-
-#endif // DORIS_BE_SRC_UTIL_MUTEX_H
diff --git a/be/src/util/os_util.h b/be/src/util/os_util.h
index 7e0f514521..cb5d13058f 100644
--- a/be/src/util/os_util.h
+++ b/be/src/util/os_util.h
@@ -62,7 +62,6 @@ void disable_core_dumps();
//
// This may return false on unsupported (non-Linux) platforms.
bool is_being_debugged();
-
} // namespace doris
#endif
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 61c555dfc5..08d0d69f87 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -26,7 +26,6 @@
#include "util/container_util.hpp"
#include "util/cpu_info.h"
#include "util/debug_util.h"
-#include "util/monotime.h"
#include "util/pretty_printer.h"
#include "util/thrift_util.h"
#include "util/url_coding.h"
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index f351feda4b..b2b64cee14 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -34,16 +34,10 @@
namespace doris {
-// Define macros for updating counters. The macros make it very easy to disable
-// all counters at compile time. Set this to 0 to remove counters. This is useful
-// to do to make sure the counters aren't affecting the system.
-#define ENABLE_COUNTERS 1
-
// Some macro magic to generate unique ids using __COUNTER__
#define CONCAT_IMPL(x, y) x##y
#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
-#if ENABLE_COUNTERS
#define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type)
#define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS)
#define ADD_CHILD_TIMER(profile, name, parent) (profile)->add_counter(name, TUnit::TIME_NS, parent)
@@ -60,21 +54,6 @@ namespace doris {
__COUNTER__)(c)
#define COUNTER_UPDATE(c, v) (c)->update(v)
#define COUNTER_SET(c, v) (c)->set(v)
-#define ADD_THREAD_COUNTERS(profile, prefix) (profile)->add_thread_counters(prefix)
-#define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \
- /*ThreadCounterMeasurement \
- MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c)*/
-#else
-#define ADD_COUNTER(profile, name, type) nullptr
-#define ADD_TIMER(profile, name) nullptr
-#define SCOPED_TIMER(c)
-#define SCOPED_RAW_TIMER(c)
-#define SCOPED_ATOMIC_TIMER(c)
-#define COUNTER_UPDATE(c, v)
-#define COUNTER_SET(c, v)
-#define ADD_THREADCOUNTERS(profile, prefix) nullptr
-#define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
-#endif
class ObjectPool;
diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp
index 9e35cc134b..2d1334611b 100644
--- a/be/src/util/thread.cpp
+++ b/be/src/util/thread.cpp
@@ -37,7 +37,6 @@
#include "olap/olap_define.h"
#include "util/debug/sanitizer_scopes.h"
#include "util/easy_json.h"
-#include "util/mutex.h"
#include "util/os_util.h"
#include "util/scoped_cleanup.h"
#include "util/url_coding.h"
@@ -65,7 +64,7 @@ public:
ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {}
~ThreadMgr() {
- MutexLock lock(&_lock);
+ std::unique_lock<std::mutex> lock(_lock);
_thread_categories.clear();
}
@@ -112,7 +111,7 @@ private:
typedef std::map<std::string, ThreadCategory> ThreadCategoryMap;
// Protects _thread_categories and thread metrics.
- mutable Mutex _lock;
+ mutable std::mutex _lock;
// All thread categories that ever contained a thread, even if empty
ThreadCategoryMap _thread_categories;
@@ -152,7 +151,7 @@ void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name,
ANNOTATE_IGNORE_SYNC_BEGIN();
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
{
- MutexLock l(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
_thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid);
_threads_running_metric++;
_threads_started_metric++;
@@ -164,7 +163,7 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca
ANNOTATE_IGNORE_SYNC_BEGIN();
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
{
- MutexLock l(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
auto category_it = _thread_categories.find(category);
DCHECK(category_it != _thread_categories.end());
category_it->second.erase(pthread_id);
@@ -186,7 +185,7 @@ void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args,
// imposed on new threads that acquire the lock in write mode.
std::vector<ThreadDescriptor> descriptors_to_print;
if (!requested_all) {
- MutexLock l(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
const auto* category = FindOrNull(_thread_categories, *category_name);
if (!category) {
return;
@@ -195,7 +194,7 @@ void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args,
descriptors_to_print.emplace_back(elem.second);
}
} else {
- MutexLock l(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
for (const auto& category : _thread_categories) {
for (const auto& elem : category.second) {
descriptors_to_print.emplace_back(elem.second);
@@ -213,7 +212,7 @@ void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args,
std::vector<pair<string, uint64_t>> thread_categories_info;
uint64_t running;
{
- MutexLock l(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
running = _threads_running_metric;
thread_categories_info.reserve(_thread_categories.size());
for (const auto& category : _thread_categories) {
@@ -483,7 +482,7 @@ Status ThreadJoiner::join() {
int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn);
- if (_thread->_done.wait_for(MonoDelta::FromMilliseconds(wait_for))) {
+ if (_thread->_done.wait_for(std::chrono::milliseconds(wait_for))) {
// Unconditionally join before returning, to guarantee that any TLS
// has been destroyed (pthread_key_create() destructors only run
// after a pthread's user method has returned).
diff --git a/be/src/util/thread_group.h b/be/src/util/thread_group.h
index 7a13a7ce7f..b01a9943ee 100644
--- a/be/src/util/thread_group.h
+++ b/be/src/util/thread_group.h
@@ -18,10 +18,10 @@
#pragma once
#include <list>
+#include <shared_mutex>
#include <thread>
#include "common/status.h"
-#include "util/mutex.h"
namespace doris {
class ThreadGroup {
@@ -35,7 +35,7 @@ public:
bool is_this_thread_in() const {
std::thread::id id = std::this_thread::get_id();
- ReadLock rdlock(_mutex);
+ std::shared_lock rdlock(_mutex);
for (auto const& thrd : _threads) {
if (thrd->get_id() == id) {
return true;
@@ -47,7 +47,7 @@ public:
bool is_thread_in(std::thread* thrd) const {
if (thrd) {
std::thread::id id = thrd->get_id();
- ReadLock rdlock(_mutex);
+ std::shared_lock rdlock(_mutex);
for (auto const& th : _threads) {
if (th->get_id() == id) {
return true;
@@ -61,7 +61,7 @@ public:
template <typename F>
std::thread* create_thread(F threadfunc) {
- WriteLock wrlock(_mutex);
+ std::lock_guard<std::shared_mutex> wrlock(_mutex);
std::unique_ptr<std::thread> new_thread = std::make_unique<std::thread>(threadfunc);
_threads.push_back(new_thread.get());
return new_thread.release();
@@ -70,7 +70,7 @@ public:
Status add_thread(std::thread* thrd) {
if (thrd) {
if (!is_thread_in(thrd)) {
- WriteLock guard(_mutex);
+ std::lock_guard<std::shared_mutex> guard(_mutex);
_threads.push_back(thrd);
return Status::OK();
} else {
@@ -82,7 +82,7 @@ public:
}
void remove_thread(std::thread* thrd) {
- WriteLock wrlock(_mutex);
+ std::lock_guard<std::shared_mutex> wrlock(_mutex);
std::list<std::thread*>::const_iterator it =
std::find(_threads.begin(), _threads.end(), thrd);
if (it != _threads.end()) {
@@ -94,7 +94,7 @@ public:
if (is_this_thread_in()) {
return Status::RuntimeError("trying joining itself");
}
- ReadLock rdlock(_mutex);
+ std::shared_lock rdlock(_mutex);
for (auto thrd : _threads) {
if (thrd->joinable()) {
@@ -105,7 +105,7 @@ public:
}
size_t size() const {
- ReadLock rdlock(_mutex);
+ std::shared_lock rdlock(_mutex);
return _threads.size();
}
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index f7f194f735..e41283595a 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -50,7 +50,7 @@ ThreadPoolBuilder::ThreadPoolBuilder(string name)
_min_threads(0),
_max_threads(base::NumCPUs()),
_max_queue_size(std::numeric_limits<int>::max()),
- _idle_timeout(MonoDelta::FromMilliseconds(500)) {}
+ _idle_timeout(std::chrono::milliseconds(500)) {}
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
CHECK_GE(min_threads, 0);
@@ -69,27 +69,21 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
return *this;
}
-ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) {
- _idle_timeout = idle_timeout;
- return *this;
-}
-
Status ThreadPoolBuilder::build(std::unique_ptr<ThreadPool>* pool) const {
pool->reset(new ThreadPool(*this));
RETURN_IF_ERROR((*pool)->init());
return Status::OK();
}
-ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, int max_concurrency)
+ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
+ int max_concurrency)
: _mode(mode),
_pool(pool),
_state(State::IDLE),
- _not_running_cond(&pool->_lock),
_active_threads(0),
_max_concurrency(max_concurrency),
_num_submitted_tasks(0),
_num_unsubmitted_tasks(0) {
-
if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
_mode = ThreadPool::ExecutionMode::SERIAL;
}
@@ -109,7 +103,7 @@ Status ThreadPoolToken::submit_func(std::function<void()> f) {
}
void ThreadPoolToken::shutdown() {
- MutexLock unique_lock(&(_pool->_lock));
+ std::unique_lock<std::mutex> l(_pool->_lock);
_pool->check_not_pool_thread_unlocked();
// Clear the queue under the lock, but defer the releasing of the tasks
@@ -152,7 +146,7 @@ void ThreadPoolToken::shutdown() {
// The token is already quiescing. Just wait for a worker thread to
// switch it to QUIESCED.
while (state() != State::QUIESCED) {
- _not_running_cond.wait();
+ _not_running_cond.wait(l);
}
break;
default:
@@ -161,28 +155,13 @@ void ThreadPoolToken::shutdown() {
}
void ThreadPoolToken::wait() {
- MutexLock unique_lock(&(_pool->_lock));
+ std::unique_lock<std::mutex> l(_pool->_lock);
_pool->check_not_pool_thread_unlocked();
while (is_active()) {
- _not_running_cond.wait();
+ _not_running_cond.wait(l);
}
}
-bool ThreadPoolToken::wait_until(const MonoTime& until) {
- MutexLock unique_lock(&(_pool->_lock));
- _pool->check_not_pool_thread_unlocked();
- while (is_active()) {
- if (!_not_running_cond.wait_until(until)) {
- return false;
- }
- }
- return true;
-}
-
-bool ThreadPoolToken::wait_for(const MonoDelta& delta) {
- return wait_until(MonoTime::Now() + delta);
-}
-
void ThreadPoolToken::transition(State new_state) {
#ifndef NDEBUG
CHECK_NE(_state, new_state);
@@ -249,8 +228,9 @@ const char* ThreadPoolToken::state_to_string(State s) {
}
bool ThreadPoolToken::need_dispatch() {
- return _state == ThreadPoolToken::State::IDLE
- || (_mode == ThreadPool::ExecutionMode::CONCURRENT && _num_submitted_tasks < _max_concurrency);
+ return _state == ThreadPoolToken::State::IDLE ||
+ (_mode == ThreadPool::ExecutionMode::CONCURRENT &&
+ _num_submitted_tasks < _max_concurrency);
}
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
@@ -260,8 +240,6 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
_max_queue_size(builder._max_queue_size),
_idle_timeout(builder._idle_timeout),
_pool_status(Status::Uninitialized("The pool was not initialized.")),
- _idle_cond(&_lock),
- _no_threads_cond(&_lock),
_num_threads(0),
_num_threads_pending_start(0),
_active_threads(0),
@@ -293,7 +271,7 @@ Status ThreadPool::init() {
void ThreadPool::shutdown() {
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
- MutexLock unique_lock(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
// Note: this is the same error seen at submission if the pool is at
@@ -340,7 +318,7 @@ void ThreadPool::shutdown() {
_idle_threads.pop_front();
}
while (_num_threads + _num_threads_pending_start > 0) {
- _no_threads_cond.wait();
+ _no_threads_cond.wait(l);
}
// All the threads have exited. Check the state of each token.
@@ -351,14 +329,14 @@ void ThreadPool::shutdown() {
}
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
- MutexLock unique_lock(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
InsertOrDie(&_tokens, t.get());
return t;
}
void ThreadPool::release_token(ThreadPoolToken* t) {
- MutexLock unique_lock(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
CHECK(!t->is_active()) << strings::Substitute("Token with state $0 may not be released",
ThreadPoolToken::state_to_string(t->state()));
CHECK_EQ(1, _tokens.erase(t));
@@ -374,9 +352,10 @@ Status ThreadPool::submit_func(std::function<void()> f) {
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
DCHECK(token);
- MonoTime submit_time = MonoTime::Now();
+ std::chrono::time_point<std::chrono::system_clock> submit_time =
+ std::chrono::system_clock::now();
- MutexLock unique_lock(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
if (PREDICT_FALSE(!_pool_status.ok())) {
return _pool_status;
}
@@ -432,12 +411,12 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
token->_entries.emplace_back(std::move(task));
// When we need to execute the task in the token, we submit the token object to the queue.
// There are currently two places where tokens will be submitted to the queue:
- // 1. When submitting a new task, if the token is still in the IDLE state,
+ // 1. When submitting a new task, if the token is still in the IDLE state,
// or the concurrency of the token has not reached the online level, it will be added to the queue.
// 2. When the dispatch thread finishes executing a task:
// 1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
// 2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
- // then submitted to the queue.
+ // then submitted to the queue.
if (token->need_dispatch()) {
_queue.emplace_back(token);
++token->_num_submitted_tasks;
@@ -459,12 +438,12 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
_idle_threads.front().not_empty.notify_one();
_idle_threads.pop_front();
}
- unique_lock.unlock();
+ l.unlock();
if (need_a_thread) {
Status status = create_thread();
if (!status.ok()) {
- unique_lock.lock();
+ l.lock();
_num_threads_pending_start--;
if (_num_threads + _num_threads_pending_start == 0) {
// If we have no threads, we can't do any work.
@@ -480,38 +459,23 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
}
void ThreadPool::wait() {
- MutexLock unique_lock(&_lock);
- check_not_pool_thread_unlocked();
- while (_total_queued_tasks > 0 || _active_threads > 0) {
- _idle_cond.wait();
- }
-}
-
-bool ThreadPool::wait_until(const MonoTime& until) {
- MutexLock unique_lock(&_lock);
+ std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
while (_total_queued_tasks > 0 || _active_threads > 0) {
- if (!_idle_cond.wait_until(until)) {
- return false;
- }
+ _idle_cond.wait(l);
}
- return true;
-}
-
-bool ThreadPool::wait_for(const MonoDelta& delta) {
- return wait_until(MonoTime::Now() + delta);
}
void ThreadPool::dispatch_thread() {
+ std::unique_lock<std::mutex> l(_lock);
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
- MutexLock unique_lock(&_lock);
InsertOrDie(&_threads, Thread::current_thread());
DCHECK_GT(_num_threads_pending_start, 0);
_num_threads++;
_num_threads_pending_start--;
// Owned by this worker thread and added/removed from _idle_threads as needed.
- IdleThread me(&_lock);
+ IdleThread me;
while (true) {
// Note: Status::Aborted() is used to indicate normal shutdown.
@@ -537,7 +501,7 @@ void ThreadPool::dispatch_thread() {
_idle_threads.erase(_idle_threads.iterator_to(me));
}
});
- if (!me.not_empty.wait_for(_idle_timeout)) {
+ if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
// After much investigation, it appears that pthread condition variables have
// a weird behavior in which they can return ETIMEDOUT from timed_wait even if
// another thread did in fact signal. Apparently after a timeout there is some
@@ -545,9 +509,12 @@ void ThreadPool::dispatch_thread() {
// protecting the state, signal, and release again before we get the mutex. So,
// we'll recheck the empty queue case regardless.
if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
- VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
- << _idle_timeout.ToMilliseconds() << "ms of idle time.";
- break;
+ VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
+ << std::chrono::duration_cast<std::chrono::milliseconds>(
+ _idle_timeout)
+ .count()
+ << "ms of idle time.";
+ break;
}
}
continue;
@@ -564,7 +531,7 @@ void ThreadPool::dispatch_thread() {
--_total_queued_tasks;
++_active_threads;
- unique_lock.unlock();
+ l.unlock();
// Execute the task
task.runnable->run();
@@ -576,7 +543,7 @@ void ThreadPool::dispatch_thread() {
// In the worst case, the destructor might even try to do something
// with this threadpool, and produce a deadlock.
task.runnable.reset();
- unique_lock.lock();
+ l.lock();
// Possible states:
// 1. The token was shut down while we ran its task. Transition to QUIESCED.
@@ -598,8 +565,9 @@ void ThreadPool::dispatch_thread() {
++token->_num_submitted_tasks;
--token->_num_unsubmitted_tasks;
}
- } else if (token->mode() == ExecutionMode::CONCURRENT && token->_num_submitted_tasks < token->_max_concurrency
- && token->_num_unsubmitted_tasks > 0) {
+ } else if (token->mode() == ExecutionMode::CONCURRENT &&
+ token->_num_submitted_tasks < token->_max_concurrency &&
+ token->_num_unsubmitted_tasks > 0) {
_queue.emplace_back(token);
++token->_num_submitted_tasks;
--token->_num_unsubmitted_tasks;
@@ -613,7 +581,7 @@ void ThreadPool::dispatch_thread() {
// It's important that we hold the lock between exiting the loop and dropping
// _num_threads. Otherwise it's possible someone else could come along here
// and add a new task just as the last running thread is about to exit.
- CHECK(unique_lock.own_lock());
+ CHECK(l.owns_lock());
CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
_num_threads--;
@@ -643,12 +611,11 @@ void ThreadPool::check_not_pool_thread_unlocked() {
}
Status ThreadPool::set_min_threads(int min_threads) {
- MutexLock unique_lock(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (min_threads > _max_threads) {
// min threads can not be set greater than max threads
return Status::InternalError("set thread pool min_threads failed");
}
-
_min_threads = min_threads;
if (min_threads > _num_threads + _num_threads_pending_start) {
int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
@@ -666,7 +633,7 @@ Status ThreadPool::set_min_threads(int min_threads) {
}
Status ThreadPool::set_max_threads(int max_threads) {
- MutexLock unique_lock(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (_min_threads > max_threads) {
// max threads can not be set less than min threads
return Status::InternalError("set thread pool max_threads failed");
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 7e64362fda..0a7ad5ede1 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -20,6 +20,7 @@
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
+#include <condition_variable>
#include <deque>
#include <functional>
#include <memory>
@@ -29,9 +30,6 @@
#include "common/status.h"
#include "gutil/ref_counted.h"
-#include "util/condition_variable.h"
-#include "util/monotime.h"
-#include "util/mutex.h"
namespace doris {
@@ -99,8 +97,11 @@ public:
ThreadPoolBuilder& set_min_threads(int min_threads);
ThreadPoolBuilder& set_max_threads(int max_threads);
ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
- ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
-
+ template <class Rep, class Period>
+ ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
+ _idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
+ return *this;
+ }
// Instantiate a new ThreadPool with the existing builder arguments.
Status build(std::unique_ptr<ThreadPool>* pool) const;
@@ -110,9 +111,10 @@ private:
int _min_threads;
int _max_threads;
int _max_queue_size;
- MonoDelta _idle_timeout;
+ std::chrono::milliseconds _idle_timeout;
- DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
+ ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
+ void operator=(const ThreadPoolBuilder&) = delete;
};
// Thread pool with a variable number of threads.
@@ -146,7 +148,7 @@ private:
// .set_min_threads(0)
// .set_max_threads(5)
// .set_max_queue_size(10)
-// .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+// .set_idle_timeout(2000ms))
// .Build(&thread_pool));
// thread_pool->Submit(shared_ptr<Runnable>(new Task()));
// thread_pool->SubmitFunc(std::bind(&Func, 10));
@@ -171,14 +173,15 @@ public:
// Waits until all the tasks are completed.
void wait();
- // Waits for the pool to reach the idle state, or until 'until' time is reached.
- // Returns true if the pool reached the idle state, false otherwise.
- bool wait_until(const MonoTime& until);
-
// Waits for the pool to reach the idle state, or until 'delta' time elapses.
// Returns true if the pool reached the idle state, false otherwise.
- bool wait_for(const MonoDelta& delta);
-
+ template <class Rep, class Period>
+ bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
+ std::unique_lock<std::mutex> l(_lock);
+ check_not_pool_thread_unlocked();
+ return _idle_cond.wait_for(
+ l, delta, [&]() { return _total_queued_tasks <= 0 && _active_threads <= 0; });
+ }
Status set_min_threads(int min_threads);
Status set_max_threads(int max_threads);
@@ -198,32 +201,32 @@ public:
// Return the number of threads currently running (or in the process of starting up)
// for this thread pool.
int num_threads() const {
- MutexLock l(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _num_threads + _num_threads_pending_start;
}
int max_threads() const {
- MutexLock l(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _max_threads;
}
int min_threads() const {
- MutexLock l(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _min_threads;
}
int num_threads_pending_start() const {
- MutexLock l(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _num_threads_pending_start;
}
int num_active_threads() const {
- MutexLock l(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _active_threads;
}
int get_queue_size() const {
- MutexLock l(&_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _total_queued_tasks;
}
@@ -236,7 +239,7 @@ private:
std::shared_ptr<Runnable> runnable;
// Time at which the entry was submitted to the pool.
- MonoTime submit_time;
+ std::chrono::time_point<std::chrono::system_clock> submit_time;
};
// Creates a new thread pool using a builder.
@@ -267,7 +270,7 @@ private:
int _min_threads;
int _max_threads;
const int _max_queue_size;
- const MonoDelta _idle_timeout;
+ const std::chrono::milliseconds _idle_timeout;
// Overall status of the pool. Set to an error when the pool is shut down.
//
@@ -276,15 +279,15 @@ private:
// Synchronizes many of the members of the pool and all of its
// condition variables.
- mutable Mutex _lock;
+ mutable std::mutex _lock;
// Condition variable for "pool is idling". Waiters wake up when
// _active_threads reaches zero.
- ConditionVariable _idle_cond;
+ std::condition_variable _idle_cond;
// Condition variable for "pool has no threads". Waiters wake up when
// _num_threads and num_pending_threads_ are both 0.
- ConditionVariable _no_threads_cond;
+ std::condition_variable _no_threads_cond;
// Number of threads currently running.
//
@@ -331,24 +334,24 @@ private:
// A thread is added to the front of the list when it goes idle and is
// removed from the front and signaled when new work arrives. This produces a
// LIFO usage pattern that is more efficient than idling on a single
- // ConditionVariable (which yields FIFO semantics).
//
// Protected by _lock.
struct IdleThread : public boost::intrusive::list_base_hook<> {
- explicit IdleThread(Mutex* m) : not_empty(m) {}
+ explicit IdleThread() {}
// Condition variable for "queue is not empty". Waiters wake up when a new
// task is queued.
- ConditionVariable not_empty;
-
- DISALLOW_COPY_AND_ASSIGN(IdleThread);
+ std::condition_variable not_empty;
+ IdleThread(const IdleThread&) = delete;
+ void operator=(const IdleThread&) = delete;
};
boost::intrusive::list<IdleThread> _idle_threads; // NOLINT(build/include_what_you_use)
// ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
std::unique_ptr<ThreadPoolToken> _tokenless;
- DISALLOW_COPY_AND_ASSIGN(ThreadPool);
+ ThreadPool(const ThreadPool&) = delete;
+ void operator=(const ThreadPool&) = delete;
};
// Entry point for token-based task submission and blocking for a particular
@@ -378,22 +381,21 @@ public:
// Waits until all the tasks submitted via this token are completed.
void wait();
- // Waits for all submissions using this token are complete, or until 'until'
- // time is reached.
- //
- // Returns true if all submissions are complete, false otherwise.
- bool wait_until(const MonoTime& until);
-
// Waits for all submissions using this token are complete, or until 'delta'
// time elapses.
//
// Returns true if all submissions are complete, false otherwise.
- bool wait_for(const MonoDelta& delta);
+ template <class Rep, class Period>
+ bool wait_for(const std::chrono::duration<Rep, Period>& delta) {
+ std::unique_lock<std::mutex> l(_pool->_lock);
+ _pool->check_not_pool_thread_unlocked();
+ return _not_running_cond.wait_for(l, delta, [&]() { return !is_active(); });
+ }
bool need_dispatch();
size_t num_tasks() {
- MutexLock l(&_pool->_lock);
+ std::lock_guard<std::mutex> l(_pool->_lock);
return _entries.size();
}
@@ -467,7 +469,7 @@ private:
// Condition variable for "token is idle". Waiters wake up when the token
// transitions to IDLE or QUIESCED.
- ConditionVariable _not_running_cond;
+ std::condition_variable _not_running_cond;
// Number of worker threads currently executing tasks belonging to this
// token.
@@ -480,7 +482,8 @@ private:
// Number of tasks which has not been submitted to the thread pool's queue.
int _num_unsubmitted_tasks;
- DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
+ ThreadPoolToken(const ThreadPoolToken&) = delete;
+ void operator=(const ThreadPoolToken&) = delete;
};
} // namespace doris
diff --git a/be/src/util/thrift_client.cpp b/be/src/util/thrift_client.cpp
index 76e780c581..4ef7c6380c 100644
--- a/be/src/util/thrift_client.cpp
+++ b/be/src/util/thrift_client.cpp
@@ -21,7 +21,6 @@
#include <string>
#include "gutil/strings/substitute.h"
-#include "util/monotime.h"
namespace doris {
@@ -69,7 +68,7 @@ Status ThriftClientImpl::open_with_retry(int num_tries, int wait_ms) {
LOG(INFO) << "(Attempt " << try_count << " of " << num_tries << ")";
}
- SleepFor(MonoDelta::FromMilliseconds(wait_ms));
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
}
return status;
diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp
index 8e456c2596..1083df6bb4 100644
--- a/be/src/util/thrift_rpc_helper.cpp
+++ b/be/src/util/thrift_rpc_helper.cpp
@@ -23,7 +23,6 @@
#include "common/status.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
-#include "monotime.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
@@ -63,7 +62,8 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
LOG(WARNING) << "retrying call frontend service after "
<< config::thrift_client_retry_interval_ms << " ms, address=" << address
<< ", reason=" << e.what();
- SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms));
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(config::thrift_client_retry_interval_ms));
status = client.reopen(timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "client reopen failed. address=" << address
@@ -75,7 +75,8 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
} catch (apache::thrift::TException& e) {
LOG(WARNING) << "call frontend service failed, address=" << address
<< ", reason=" << e.what();
- SleepFor(MonoDelta::FromMilliseconds(config::thrift_client_retry_interval_ms * 2));
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
// just reopen to disable this connection
client.reopen(timeout_ms);
return Status::ThriftRpcError("failed to call frontend service");
diff --git a/be/src/util/thrift_util.cpp b/be/src/util/thrift_util.cpp
index 1e57ee8ccf..925d4aac32 100644
--- a/be/src/util/thrift_util.cpp
+++ b/be/src/util/thrift_util.cpp
@@ -27,7 +27,6 @@
#include "gen_cpp/Data_types.h"
#include "gen_cpp/Types_types.h"
#include "util/hash_util.hpp"
-#include "util/monotime.h"
#include "util/thrift_server.h"
// TCompactProtocol requires some #defines to work right. They also define UNLIKLEY
@@ -111,7 +110,7 @@ Status wait_for_server(const std::string& host, int port, int num_retries, int r
VLOG_QUERY << "Waiting " << retry_interval_ms << "ms for Thrift server at " << host << ":"
<< port << " to come up, failed attempt " << retry_count << " of "
<< num_retries;
- SleepFor(MonoDelta::FromMilliseconds(retry_interval_ms));
+ std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms));
}
return Status::InternalError("Server did not come up");
diff --git a/be/src/vec/functions/function_utility.cpp b/be/src/vec/functions/function_utility.cpp
index 6c7da89a3a..a42918ed08 100644
--- a/be/src/vec/functions/function_utility.cpp
+++ b/be/src/vec/functions/function_utility.cpp
@@ -14,8 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+#include <thread>
-#include "util/monotime.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
#include "vec/functions/simple_function_factory.h"
@@ -60,7 +60,7 @@ public:
null_map_column->insert(1);
} else {
int seconds = data_column->get_data()[i];
- SleepFor(MonoDelta::FromSeconds(seconds));
+ std::this_thread::sleep_for(std::chrono::seconds(seconds));
res_column->insert(1);
null_map_column->insert(0);
}
@@ -73,7 +73,7 @@ public:
for (int i = 0; i < input_rows_count; i++) {
int seconds = data_column->get_element(i);
- SleepFor(MonoDelta::FromSeconds(seconds));
+ std::this_thread::sleep_for(std::chrono::seconds(seconds));
res_column->insert(1);
}
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index a97a0b0407..07d4093387 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -310,7 +310,6 @@ set(UTIL_TEST_FILES
util/file_cache_test.cpp
util/parse_util_test.cpp
util/countdown_latch_test.cpp
- util/monotime_test.cpp
util/scoped_cleanup_test.cpp
util/thread_test.cpp
util/threadpool_test.cpp
diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp
index a3e8f6b672..561845106e 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -107,7 +107,7 @@ IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio
Status status = IRuntimeFilter::create(_runtime_stat, _obj_pool, &desc, options,
RuntimeFilterRole::PRODUCER, -1, &runtime_filter);
- assert(status.ok());
+ EXPECT_TRUE(status.ok()) << status.to_string();
return status.ok() ? runtime_filter : nullptr;
}
diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp
index 247fd107ec..d34723640c 100644
--- a/be/test/olap/lru_cache_test.cpp
+++ b/be/test/olap/lru_cache_test.cpp
@@ -49,7 +49,7 @@ const CacheKey EncodeKey(std::string* result, int k) {
}
static int DecodeKey(const CacheKey& k) {
- assert(k.size() == 4);
+ EXPECT_EQ(k.size(), 4);
return DecodeFixed32(k.data());
}
static void* EncodeValue(uintptr_t v) {
diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp
index 76d670d27c..c0999a76fa 100644
--- a/be/test/olap/skiplist_test.cpp
+++ b/be/test/olap/skiplist_test.cpp
@@ -26,9 +26,7 @@
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "testutil/test_util.h"
-#include "util/condition_variable.h"
#include "util/hash_util.hpp"
-#include "util/mutex.h"
#include "util/priority_thread_pool.hpp"
#include "util/random.h"
@@ -222,9 +220,9 @@ private:
}
static Key make_key(uint64_t k, uint64_t g) {
- assert(sizeof(Key) == sizeof(uint64_t));
- assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
- assert(g <= 0xffffffffu);
+ EXPECT_EQ(sizeof(Key), sizeof(uint64_t));
+ EXPECT_LE(k, K); // We sometimes pass K to seek to the end of the skiplist
+ EXPECT_LE(g, 0xffffffffu);
return ((k << 40) | (g << 8) | (hash_numbers(k, g) & 0xff));
}
@@ -362,27 +360,25 @@ public:
enum ReaderState { STARTING, RUNNING, DONE };
- explicit TestState(int s) : _seed(s), _quit_flag(false), _state(STARTING), _cv_state(&_mu) {}
+ explicit TestState(int s) : _seed(s), _quit_flag(false), _state(STARTING) {}
void wait(ReaderState s) {
- _mu.lock();
+ std::unique_lock l(_mu);
while (_state != s) {
- _cv_state.wait();
+ _cv_state.wait(l);
}
- _mu.unlock();
}
void change(ReaderState s) {
- _mu.lock();
+ std::lock_guard l(_mu);
_state = s;
_cv_state.notify_one();
- _mu.unlock();
}
private:
- Mutex _mu;
+ std::mutex _mu;
ReaderState _state;
- ConditionVariable _cv_state;
+ std::condition_variable _cv_state;
};
static void concurrent_reader(void* arg) {
diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp
index 4bb2cb58c5..bcf1b9f250 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -23,7 +23,6 @@
#include "exec/data_sink.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/row_batch.h"
-#include "util/monotime.h"
namespace doris {
@@ -43,7 +42,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
}
Status PlanFragmentExecutor::open() {
- SleepFor(MonoDelta::FromMilliseconds(50));
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
return s_open_status;
}
diff --git a/be/test/runtime/stream_load_pipe_test.cpp b/be/test/runtime/stream_load_pipe_test.cpp
index 2f50e85517..93c2cd91c2 100644
--- a/be/test/runtime/stream_load_pipe_test.cpp
+++ b/be/test/runtime/stream_load_pipe_test.cpp
@@ -21,8 +21,6 @@
#include <thread>
-#include "util/monotime.h"
-
namespace doris {
class StreamLoadPipeTest : public testing::Test {
@@ -210,7 +208,7 @@ TEST_F(StreamLoadPipeTest, cancel) {
char buf = '0' + (k++ % 10);
pipe.append(&buf, 1);
}
- SleepFor(MonoDelta::FromMilliseconds(100));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
pipe.cancel("test");
};
std::thread t1(appender);
@@ -253,7 +251,7 @@ TEST_F(StreamLoadPipeTest, close) {
};
std::thread t1(appender);
- SleepFor(MonoDelta::FromMilliseconds(10));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
pipe.close();
diff --git a/be/test/udf/uda_test.cpp b/be/test/udf/uda_test.cpp
index 0c025fcffa..79fc599762 100644
--- a/be/test/udf/uda_test.cpp
+++ b/be/test/udf/uda_test.cpp
@@ -192,7 +192,7 @@ BigIntVal XorFinalize(FunctionContext* context, const BigIntVal& val) {
// the return type is bigint
void DistinctEstimateInit(FunctionContext* context, StringVal* val) {
// Since this is known, this will be allocated to 256 bytes.
- assert(val->len == 256);
+ EXPECT_EQ(val->len, 256);
memset(val->ptr, 0, 256);
}
diff --git a/be/test/util/countdown_latch_test.cpp b/be/test/util/countdown_latch_test.cpp
index e4835bf7e3..b4e25da9f2 100644
--- a/be/test/util/countdown_latch_test.cpp
+++ b/be/test/util/countdown_latch_test.cpp
@@ -20,9 +20,9 @@
#include <gtest/gtest.h>
#include <functional>
+#include <thread>
#include "gutil/ref_counted.h"
-#include "util/monotime.h"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -47,7 +47,7 @@ TEST(TestCountDownLatch, TestLatch) {
// Decrement the count by 1 in another thread, this should not fire the
// latch.
EXPECT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch, 1)).ok());
- EXPECT_FALSE(latch.wait_for(MonoDelta::FromMilliseconds(200)));
+ EXPECT_FALSE(latch.wait_for(std::chrono::milliseconds(200)));
EXPECT_EQ(999, latch.count());
// Now decrement by 1000 this should decrement to 0 and fire the latch
@@ -65,7 +65,7 @@ TEST(TestCountDownLatch, TestResetToZero) {
EXPECT_TRUE(Thread::create("test", "cdl-test", &CountDownLatch::wait, &cdl, &t).ok());
// Sleep for a bit until it's likely the other thread is waiting on the latch.
- SleepFor(MonoDelta::FromMilliseconds(10));
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
cdl.reset(0);
t->join();
}
diff --git a/be/test/util/monotime_test.cpp b/be/test/util/monotime_test.cpp
deleted file mode 100644
index 8f66e3bbe0..0000000000
--- a/be/test/util/monotime_test.cpp
+++ /dev/null
@@ -1,404 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/monotime.h"
-
-#include <gtest/gtest.h>
-#include <sys/time.h>
-#include <unistd.h>
-
-#include <cstdint>
-#include <ctime>
-#include <ostream>
-#include <string>
-
-#include "common/logging.h"
-
-namespace doris {
-
-TEST(TestMonoTime, TestMonotonicity) {
- alarm(360);
- MonoTime prev(MonoTime::Now());
- MonoTime next;
-
- do {
- next = MonoTime::Now();
- //LOG(INFO) << " next = " << next.ToString();
- } while (!prev.ComesBefore(next));
- EXPECT_FALSE(next.ComesBefore(prev));
- alarm(0);
-}
-
-TEST(TestMonoTime, TestComparison) {
- MonoTime now(MonoTime::Now());
- MonoTime future(now);
- future.AddDelta(MonoDelta::FromNanoseconds(1L));
-
- EXPECT_GT((future - now).ToNanoseconds(), 0);
- EXPECT_LT((now - future).ToNanoseconds(), 0);
- EXPECT_EQ((now - now).ToNanoseconds(), 0);
-
- MonoDelta nano(MonoDelta::FromNanoseconds(1L));
- MonoDelta mil(MonoDelta::FromMilliseconds(1L));
- MonoDelta sec(MonoDelta::FromSeconds(1.0));
-
- EXPECT_TRUE(nano.LessThan(mil));
- EXPECT_TRUE(mil.LessThan(sec));
- EXPECT_TRUE(mil.MoreThan(nano));
- EXPECT_TRUE(sec.MoreThan(mil));
-}
-
-TEST(TestMonoTime, TestTimeVal) {
- struct timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 0;
-
- // Normal conversion case.
- MonoDelta one_sec_one_micro(MonoDelta::FromNanoseconds(1000001000L));
- one_sec_one_micro.ToTimeVal(&tv);
- EXPECT_EQ(1, tv.tv_sec);
- EXPECT_EQ(1, tv.tv_usec);
-
- // Case where we are still positive but sub-micro.
- // Round up to nearest microsecond. This is to avoid infinite timeouts
- // in APIs that take a struct timeval.
- MonoDelta zero_sec_one_nano(MonoDelta::FromNanoseconds(1L));
- zero_sec_one_nano.ToTimeVal(&tv);
- EXPECT_EQ(0, tv.tv_sec);
- EXPECT_EQ(1, tv.tv_usec); // Special case: 1ns rounds up to
-
- // Negative conversion case. Ensure the timeval is normalized.
- // That means sec is negative and usec is positive.
- MonoDelta neg_micro(MonoDelta::FromMicroseconds(-1L));
- EXPECT_EQ(-1000, neg_micro.ToNanoseconds());
- neg_micro.ToTimeVal(&tv);
- EXPECT_EQ(-1, tv.tv_sec);
- EXPECT_EQ(999999, tv.tv_usec);
-
- // Case where we are still negative but sub-micro.
- // Round up to nearest microsecond. This is to avoid infinite timeouts
- // in APIs that take a struct timeval and for consistency.
- MonoDelta zero_sec_neg_one_nano(MonoDelta::FromNanoseconds(-1L));
- zero_sec_neg_one_nano.ToTimeVal(&tv);
- EXPECT_EQ(-1, tv.tv_sec);
- EXPECT_EQ(999999, tv.tv_usec);
-}
-
-TEST(TestMonoTime, TestTimeSpec) {
- MonoTime one_sec_one_nano_expected(1000000001L);
- struct timespec ts;
- ts.tv_sec = 1;
- ts.tv_nsec = 1;
- MonoTime one_sec_one_nano_actual(ts);
- EXPECT_EQ(0, one_sec_one_nano_expected.GetDeltaSince(one_sec_one_nano_actual).ToNanoseconds());
-
- MonoDelta zero_sec_two_nanos(MonoDelta::FromNanoseconds(2L));
- zero_sec_two_nanos.ToTimeSpec(&ts);
- EXPECT_EQ(0, ts.tv_sec);
- EXPECT_EQ(2, ts.tv_nsec);
-
- // Negative conversion case. Ensure the timespec is normalized.
- // That means sec is negative and nsec is positive.
- MonoDelta neg_nano(MonoDelta::FromNanoseconds(-1L));
- EXPECT_EQ(-1, neg_nano.ToNanoseconds());
- neg_nano.ToTimeSpec(&ts);
- EXPECT_EQ(-1, ts.tv_sec);
- EXPECT_EQ(999999999, ts.tv_nsec);
-}
-
-TEST(TestMonoTime, TestDeltas) {
- alarm(360);
- const MonoDelta max_delta(MonoDelta::FromSeconds(0.1));
- MonoTime prev(MonoTime::Now());
- MonoTime next;
- MonoDelta cur_delta;
- do {
- next = MonoTime::Now();
- cur_delta = next.GetDeltaSince(prev);
- } while (cur_delta.LessThan(max_delta));
- alarm(0);
-}
-
-TEST(TestMonoTime, TestDeltaConversions) {
- // TODO: Reliably test MonoDelta::FromSeconds() considering floating-point rounding errors
-
- MonoDelta mil(MonoDelta::FromMilliseconds(500));
- EXPECT_EQ(500 * MonoTime::kNanosecondsPerMillisecond, mil.nano_delta_);
-
- MonoDelta micro(MonoDelta::FromMicroseconds(500));
- EXPECT_EQ(500 * MonoTime::kNanosecondsPerMicrosecond, micro.nano_delta_);
-
- MonoDelta nano(MonoDelta::FromNanoseconds(500));
- EXPECT_EQ(500, nano.nano_delta_);
-}
-
-static void DoTestMonoTimePerf() {
- const MonoDelta max_delta(MonoDelta::FromMilliseconds(500));
- uint64_t num_calls = 0;
- MonoTime prev(MonoTime::Now());
- MonoTime next;
- MonoDelta cur_delta;
- do {
- next = MonoTime::Now();
- cur_delta = next.GetDeltaSince(prev);
- num_calls++;
- } while (cur_delta.LessThan(max_delta));
- LOG(INFO) << "DoTestMonoTimePerf():" << num_calls << " in " << max_delta.ToString()
- << " seconds.";
-}
-
-TEST(TestMonoTime, TestSleepFor) {
- MonoTime start = MonoTime::Now();
- MonoDelta sleep = MonoDelta::FromMilliseconds(100);
- SleepFor(sleep);
- MonoTime end = MonoTime::Now();
- MonoDelta actualSleep = end.GetDeltaSince(start);
- EXPECT_GE(actualSleep.ToNanoseconds(), sleep.ToNanoseconds());
-}
-
-// Test functionality of the handy operators for MonoTime/MonoDelta objects.
-// The test assumes that the core functionality provided by the
-// MonoTime/MonoDelta objects are in place, and it tests that the operators
-// have the expected behavior expressed in terms of already existing,
-// semantically equivalent methods.
-TEST(TestMonoTime, TestOperators) {
- // MonoTime& MonoTime::operator+=(const MonoDelta& delta);
- {
- MonoTime tmp = MonoTime::Now();
- MonoTime start = tmp;
- MonoDelta delta = MonoDelta::FromMilliseconds(100);
- MonoTime o_end = start;
- o_end += delta;
- tmp.AddDelta(delta);
- MonoTime m_end = tmp;
- EXPECT_TRUE(m_end.Equals(o_end));
- }
-
- // MonoTime& MonoTime::operator-=(const MonoDelta& delta);
- {
- MonoTime tmp = MonoTime::Now();
- MonoTime start = tmp;
- MonoDelta delta = MonoDelta::FromMilliseconds(100);
- MonoTime o_end = start;
- o_end -= delta;
- tmp.AddDelta(MonoDelta::FromNanoseconds(-delta.ToNanoseconds()));
- MonoTime m_end = tmp;
- EXPECT_TRUE(m_end.Equals(o_end));
- }
-
- // bool operator==(const MonoDelta& lhs, const MonoDelta& rhs);
- {
- MonoDelta dn = MonoDelta::FromNanoseconds(0);
- MonoDelta dm = MonoDelta::FromMicroseconds(0);
- EXPECT_TRUE(dn.Equals(dm));
- EXPECT_TRUE(dn == dm);
- EXPECT_TRUE(dm == dn);
- }
-
- // bool operator!=(const MonoDelta& lhs, const MonoDelta& rhs);
- {
- MonoDelta dn = MonoDelta::FromNanoseconds(1);
- MonoDelta dm = MonoDelta::FromMicroseconds(1);
- EXPECT_FALSE(dn.Equals(dm));
- EXPECT_TRUE(dn != dm);
- EXPECT_TRUE(dm != dn);
- }
-
- // bool operator<(const MonoDelta& lhs, const MonoDelta& rhs);
- {
- MonoDelta d0 = MonoDelta::FromNanoseconds(0);
- MonoDelta d1 = MonoDelta::FromNanoseconds(1);
- EXPECT_TRUE(d0.LessThan(d1));
- EXPECT_TRUE(d0 < d1);
- }
-
- // bool operator<=(const MonoDelta& lhs, const MonoDelta& rhs);
- {
- MonoDelta d0 = MonoDelta::FromNanoseconds(0);
- MonoDelta d1 = MonoDelta::FromNanoseconds(1);
- EXPECT_TRUE(d0.LessThan(d1));
- EXPECT_TRUE(d0 <= d1);
-
- MonoDelta d20 = MonoDelta::FromNanoseconds(2);
- MonoDelta d21 = MonoDelta::FromNanoseconds(2);
- EXPECT_TRUE(d20.Equals(d21));
- EXPECT_TRUE(d20 <= d21);
- }
-
- // bool operator>(const MonoDelta& lhs, const MonoDelta& rhs);
- {
- MonoDelta d0 = MonoDelta::FromNanoseconds(0);
- MonoDelta d1 = MonoDelta::FromNanoseconds(1);
- EXPECT_TRUE(d1.MoreThan(d0));
- EXPECT_TRUE(d1 > d0);
- }
-
- // bool operator>=(const MonoDelta& lhs, const MonoDelta& rhs);
- {
- MonoDelta d0 = MonoDelta::FromNanoseconds(0);
- MonoDelta d1 = MonoDelta::FromNanoseconds(1);
- EXPECT_TRUE(d1.MoreThan(d0));
- EXPECT_TRUE(d1 >= d1);
-
- MonoDelta d20 = MonoDelta::FromNanoseconds(2);
- MonoDelta d21 = MonoDelta::FromNanoseconds(2);
- EXPECT_TRUE(d20.Equals(d21));
- EXPECT_TRUE(d21 >= d20);
- }
-
- // bool operator==(const MonoTime& lhs, const MonoTime& rhs);
- {
- MonoTime t0 = MonoTime::Now();
- MonoTime t1(t0);
- EXPECT_TRUE(t0.Equals(t1));
- EXPECT_TRUE(t1.Equals(t0));
- EXPECT_TRUE(t0 == t1);
- EXPECT_TRUE(t1 == t0);
- }
-
- // bool operator!=(const MonoTime& lhs, const MonoTime& rhs);
- {
- MonoTime t0 = MonoTime::Now();
- MonoTime t1(t0 + MonoDelta::FromMilliseconds(100));
- EXPECT_TRUE(!t0.Equals(t1));
- EXPECT_TRUE(!t1.Equals(t0));
- EXPECT_TRUE(t0 != t1);
- EXPECT_TRUE(t1 != t0);
- }
-
- // bool operator<(const MonoTime& lhs, const MonoTime& rhs);
- {
- MonoTime t0 = MonoTime::Now();
- MonoTime t1(t0 + MonoDelta::FromMilliseconds(100));
- EXPECT_TRUE(t0.ComesBefore(t1));
- EXPECT_FALSE(t1.ComesBefore(t0));
- EXPECT_TRUE(t0 < t1);
- EXPECT_FALSE(t1 < t0);
- }
-
- // bool operator<=(const MonoTime& lhs, const MonoTime& rhs);
- {
- MonoTime t00 = MonoTime::Now();
- MonoTime t01(t00);
- EXPECT_TRUE(t00.Equals(t00));
- EXPECT_TRUE(t00.Equals(t01));
- EXPECT_TRUE(t01.Equals(t00));
- EXPECT_TRUE(t01.Equals(t01));
- EXPECT_TRUE(t00 <= t00);
- EXPECT_TRUE(t00 <= t01);
- EXPECT_TRUE(t01 <= t00);
- EXPECT_TRUE(t01 <= t01);
-
- MonoTime t1(t00 + MonoDelta::FromMilliseconds(100));
- EXPECT_TRUE(t00.ComesBefore(t1));
- EXPECT_TRUE(t01.ComesBefore(t1));
- EXPECT_FALSE(t1.ComesBefore(t00));
- EXPECT_FALSE(t1.ComesBefore(t01));
- EXPECT_TRUE(t00 <= t1);
- EXPECT_TRUE(t01 <= t1);
- EXPECT_FALSE(t1 <= t00);
- EXPECT_FALSE(t1 <= t01);
- }
-
- // bool operator>(const MonoTime& lhs, const MonoTime& rhs);
- {
- MonoTime t0 = MonoTime::Now();
- MonoTime t1(t0 + MonoDelta::FromMilliseconds(100));
- EXPECT_TRUE(t0.ComesBefore(t1));
- EXPECT_FALSE(t1.ComesBefore(t0));
- EXPECT_TRUE(t0 < t1);
- EXPECT_FALSE(t1 < t0);
- }
-
- // bool operator>=(const MonoTime& lhs, const MonoTime& rhs);
- {
- MonoTime t00 = MonoTime::Now();
- MonoTime t01(t00);
- EXPECT_TRUE(t00.Equals(t00));
- EXPECT_TRUE(t00.Equals(t01));
- EXPECT_TRUE(t01.Equals(t00));
- EXPECT_TRUE(t01.Equals(t01));
- EXPECT_TRUE(t00 >= t00);
- EXPECT_TRUE(t00 >= t01);
- EXPECT_TRUE(t01 >= t00);
- EXPECT_TRUE(t01 >= t01);
-
- MonoTime t1(t00 + MonoDelta::FromMilliseconds(100));
- EXPECT_TRUE(t00.ComesBefore(t1));
- EXPECT_TRUE(t01.ComesBefore(t1));
- EXPECT_FALSE(t1.ComesBefore(t00));
- EXPECT_FALSE(t1.ComesBefore(t01));
- EXPECT_FALSE(t00 >= t1);
- EXPECT_FALSE(t01 >= t1);
- EXPECT_TRUE(t1 >= t00);
- EXPECT_TRUE(t1 >= t01);
- }
-
- // MonoDelta operator-(const MonoTime& t0, const MonoTime& t1);
- {
- const int64_t deltas[] = {100, -100};
-
- MonoTime tmp = MonoTime::Now();
- for (auto d : deltas) {
- MonoDelta delta = MonoDelta::FromMilliseconds(d);
-
- MonoTime start = tmp;
- tmp.AddDelta(delta);
- MonoTime end = tmp;
- MonoDelta delta_o = end - start;
- EXPECT_TRUE(delta.Equals(delta_o));
- }
- }
-
- // MonoTime operator+(const MonoTime& t, const MonoDelta& delta);
- {
- MonoTime start = MonoTime::Now();
-
- MonoDelta delta_0 = MonoDelta::FromMilliseconds(0);
- MonoTime end_0 = start + delta_0;
- EXPECT_TRUE(end_0.Equals(start));
-
- MonoDelta delta_1 = MonoDelta::FromMilliseconds(1);
- MonoTime end_1 = start + delta_1;
- EXPECT_TRUE(end_1 > end_0);
- end_0.AddDelta(delta_1);
- EXPECT_TRUE(end_0.Equals(end_1));
- }
-
- // MonoTime operator-(const MonoTime& t, const MonoDelta& delta);
- {
- MonoTime start = MonoTime::Now();
-
- MonoDelta delta_0 = MonoDelta::FromMilliseconds(0);
- MonoTime end_0 = start - delta_0;
- EXPECT_TRUE(end_0.Equals(start));
-
- MonoDelta delta_1 = MonoDelta::FromMilliseconds(1);
- MonoTime end_1 = start - delta_1;
- EXPECT_TRUE(end_1 < end_0);
- end_1.AddDelta(delta_1);
- EXPECT_TRUE(end_1.Equals(end_0));
- }
-}
-
-TEST(TestMonoTimePerf, TestMonoTimePerf) {
- alarm(360);
- DoTestMonoTimePerf();
- alarm(0);
-}
-
-} // namespace doris
diff --git a/be/test/util/thread_test.cpp b/be/test/util/thread_test.cpp
index 56f191d3a4..47c8b25382 100644
--- a/be/test/util/thread_test.cpp
+++ b/be/test/util/thread_test.cpp
@@ -31,9 +31,9 @@
#include "gutil/ref_counted.h"
#include "util/countdown_latch.h"
#include "util/runtime_profile.h"
+#include "util/time.h"
using std::string;
-
namespace doris {
class ThreadTest : public ::testing::Test {
@@ -46,8 +46,7 @@ public:
// This has to be manually verified.
TEST_F(ThreadTest, TestJoinAndWarn) {
scoped_refptr<Thread> holder;
- Status status =
- Thread::create("test", "sleeper thread", SleepFor, MonoDelta::FromSeconds(1), &holder);
+ Status status = Thread::create("test", "sleeper thread", SleepForMs, 1000, &holder);
EXPECT_TRUE(status.ok());
status = ThreadJoiner(holder.get()).warn_after_ms(10).warn_every_ms(100).join();
EXPECT_TRUE(status.ok());
@@ -55,8 +54,7 @@ TEST_F(ThreadTest, TestJoinAndWarn) {
TEST_F(ThreadTest, TestFailedJoin) {
scoped_refptr<Thread> holder;
- Status status =
- Thread::create("test", "sleeper thread", SleepFor, MonoDelta::FromSeconds(1), &holder);
+ Status status = Thread::create("test", "sleeper thread", SleepForMs, 1000, &holder);
EXPECT_TRUE(status.ok());
status = ThreadJoiner(holder.get()).give_up_after_ms(50).join();
EXPECT_TRUE(status.is_aborted());
@@ -78,8 +76,7 @@ TEST_F(ThreadTest, TestJoinOnSelf) {
TEST_F(ThreadTest, TestDoubleJoinIsNoOp) {
scoped_refptr<Thread> holder;
- Status status =
- Thread::create("test", "sleeper thread", SleepFor, MonoDelta::FromSeconds(0), &holder);
+ Status status = Thread::create("test", "sleeper thread", SleepForMs, 0, &holder);
EXPECT_TRUE(status.ok());
ThreadJoiner joiner(holder.get());
status = joiner.join();
@@ -95,8 +92,7 @@ TEST_F(ThreadTest, ThreadStartBenchmark) {
{
SCOPED_RAW_TIMER(&thread_creation_ns);
for (auto& t : threads) {
- Status status = Thread::create("test", "TestCallOnExit", SleepFor,
- MonoDelta::FromSeconds(0), &t);
+ Status status = Thread::create("test", "TestCallOnExit", SleepForMs, 0, &t);
EXPECT_TRUE(status.ok());
}
}
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index 77f48348ee..eceda73f55 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -45,7 +45,6 @@
#include "util/barrier.h"
#include "util/countdown_latch.h"
#include "util/metrics.h"
-#include "util/monotime.h"
#include "util/random.h"
#include "util/scoped_cleanup.h"
#include "util/spinlock.h"
@@ -155,7 +154,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
EXPECT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(0)
.set_max_threads(3)
- .set_idle_timeout(MonoDelta::FromMilliseconds(1)))
+ .set_idle_timeout(std::chrono::milliseconds(1)))
.ok());
// There are no threads to start with.
@@ -228,7 +227,7 @@ TEST_F(ThreadPoolTest, TestRace) {
EXPECT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(0)
.set_max_threads(1)
- .set_idle_timeout(MonoDelta::FromMicroseconds(1)))
+ .set_idle_timeout(std::chrono::microseconds(1)))
.ok());
for (int i = 0; i < 500; i++) {
@@ -242,7 +241,7 @@ TEST_F(ThreadPoolTest, TestRace) {
l.wait();
// Sleeping a different amount in each iteration makes it more likely to hit
// the bug.
- SleepFor(MonoDelta::FromMicroseconds(i));
+ std::this_thread::sleep_for(std::chrono::microseconds(i));
}
}
@@ -250,7 +249,7 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
EXPECT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(1)
.set_max_threads(4)
- .set_idle_timeout(MonoDelta::FromMilliseconds(1)))
+ .set_idle_timeout(std::chrono::milliseconds(1)))
.ok());
// There is 1 thread to start with.
@@ -354,20 +353,25 @@ class SlowDestructorRunnable : public Runnable {
public:
void run() override {}
- virtual ~SlowDestructorRunnable() { SleepFor(MonoDelta::FromMilliseconds(100)); }
+ virtual ~SlowDestructorRunnable() {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
};
// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks
// in the queue.
TEST_F(ThreadPoolTest, TestSlowDestructor) {
EXPECT_TRUE(rebuild_pool_with_min_max(1, 20).ok());
- MonoTime start = MonoTime::Now();
+ auto start = std::chrono::system_clock::now();
for (int i = 0; i < 100; i++) {
shared_ptr<Runnable> task(new SlowDestructorRunnable());
EXPECT_TRUE(_pool->submit(std::move(task)).ok());
}
_pool->wait();
- EXPECT_LT((MonoTime::Now() - start).ToSeconds(), 5);
+ EXPECT_LT(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
+ start)
+ .count(),
+ 5);
}
// For test cases that should run with both kinds of tokens.
@@ -382,7 +386,7 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) {
std::unique_ptr<ThreadPoolToken> t = _pool->new_token(GetParam());
int i = 0;
Status status = t->submit_func([&]() {
- SleepFor(MonoDelta::FromMilliseconds(1));
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
i++;
});
EXPECT_TRUE(status.ok());
@@ -401,7 +405,7 @@ TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) {
// appends if the submissions did execute in parallel.
int sleep_ms = r.Next() % 5;
Status status = t->submit_func([&result, c, sleep_ms]() {
- SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
result += c;
});
EXPECT_TRUE(status.ok());
@@ -511,7 +515,7 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
int sleep_ms = r.Next() % 5;
auto task = [&v, sleep_ms]() {
- SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
v++;
};
@@ -550,7 +554,7 @@ TEST_F(ThreadPoolTest, TestFuzz) {
int sleep_ms = r.Next() % 5;
EXPECT_TRUE(_pool->submit_func([sleep_ms]() {
// Sleep a little first to increase task overlap.
- SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
}).ok());
} else if (op < 75) {
// submit with a randomly selected token.
@@ -561,7 +565,7 @@ TEST_F(ThreadPoolTest, TestFuzz) {
int token_idx = r.Next() % tokens.size();
Status s = tokens[token_idx]->submit_func([sleep_ms]() {
// Sleep a little first to increase task overlap.
- SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
});
EXPECT_TRUE(s.ok() || s.is_service_unavailable());
} else if (op < 85) {
@@ -687,7 +691,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
// Sleep a bit, otherwise this thread outpaces the other threads and
// nothing interesting happens to most tokens.
- SleepFor(MonoDelta::FromMicroseconds(10));
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
}
total_num_tokens_cycled += num_tokens_cycled;
});
@@ -729,7 +733,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
int sleep_ms = rng.Next() % 5;
Status s = GetRandomToken()->submit_func([sleep_ms]() {
// Sleep a little first so that tasks are running during other events.
- SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
});
CHECK(s.ok() || s.is_service_unavailable());
num_tokens_submitted++;
@@ -738,7 +742,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
});
}
- SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs));
+ std::this_thread::sleep_for(std::chrono::seconds(kTestRuntimeSecs));
latch.count_down();
for (auto& t : threads) {
t.join();
@@ -766,7 +770,7 @@ TEST_F(ThreadPoolTest, TestNormal) {
.set_min_threads(0)
.set_max_threads(5)
.set_max_queue_size(10)
- .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+ .set_idle_timeout(std::chrono::milliseconds(2000))
.build(&thread_pool);
std::unique_ptr<ThreadPoolToken> token1 =
@@ -814,7 +818,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolDynamicAdjustMaximumMinimum) {
EXPECT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(3)
.set_max_threads(3)
- .set_idle_timeout(MonoDelta::FromMilliseconds(1)))
+ .set_idle_timeout(std::chrono::milliseconds(1)))
.ok());
EXPECT_EQ(3, _pool->min_threads());
@@ -865,7 +869,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolDynamicAdjustMaximumMinimum) {
latch_1.count_down();
latch_2.count_down();
latch_3.count_down();
- SleepFor(MonoDelta::FromMilliseconds(500));
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_EQ(4, _pool->num_threads());
EXPECT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_8)).ok());
@@ -881,7 +885,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolDynamicAdjustMaximumMinimum) {
latch_7.count_down();
latch_8.count_down();
latch_9.count_down();
- SleepFor(MonoDelta::FromMilliseconds(500));
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_EQ(2, _pool->num_threads());
_pool->wait();
diff --git a/be/test/util/trace_test.cpp b/be/test/util/trace_test.cpp
index d8f61f49cf..986f9ba08a 100644
--- a/be/test/util/trace_test.cpp
+++ b/be/test/util/trace_test.cpp
@@ -37,7 +37,6 @@
#include "gutil/ref_counted.h"
#include "gutil/walltime.h"
#include "util/countdown_latch.h"
-#include "util/monotime.h"
#include "util/scoped_cleanup.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
@@ -131,7 +130,7 @@ TEST_F(TraceTest, TestTraceMetrics) {
{
ADOPT_TRACE(trace.get());
TRACE_COUNTER_SCOPE_LATENCY_US("test_scope_us");
- SleepFor(MonoDelta::FromMilliseconds(100));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
auto m = trace->metrics()->Get();
EXPECT_GE(m["test_scope_us"], 80 * 1000);
diff --git a/build.sh b/build.sh
index faa2d34350..d3a76e802b 100755
--- a/build.sh
+++ b/build.sh
@@ -158,7 +158,7 @@ else
*) echo "Internal error" ; exit 1 ;;
esac
done
- #only ./build.sh -j xx then build all
+ #only ./build.sh -j xx then build all
if [[ ${PARAMETER_COUNT} -eq 3 ]] && [[ ${PARAMETER_FLAG} -eq 1 ]];then
BUILD_FE=1
BUILD_BE=1
@@ -263,7 +263,7 @@ if [ ${BUILD_HIVE_UDF} -eq 1 ]; then
modules+=("hive-udf")
fi
FE_MODULES=$(IFS=, ; echo "${modules[*]}")
-
+
# Clean and build Backend
if [ ${BUILD_BE} -eq 1 ] ; then
CMAKE_BUILD_TYPE=${BUILD_TYPE:-Release}
@@ -387,7 +387,7 @@ if [ ${BUILD_BE} -eq 1 ]; then
cp -r -p ${DORIS_HOME}/be/output/conf/* ${DORIS_OUTPUT}/be/conf/
cp -r -p ${DORIS_HOME}/be/output/lib/palo_be ${DORIS_OUTPUT}/be/lib/
- if [ "${BUILD_META_TOOL}" = "ON" ] ; then
+ if [ "${BUILD_META_TOOL}" = "ON" ]; then
cp -r -p ${DORIS_HOME}/be/output/lib/meta_tool ${DORIS_OUTPUT}/be/lib/
fi
@@ -397,7 +397,7 @@ if [ ${BUILD_BE} -eq 1 ]; then
if [ "${STRIP_DEBUG_INFO}" = "ON" ]; then
cp -r -p ${DORIS_HOME}/be/output/lib/debug_info ${DORIS_OUTPUT}/be/lib/
fi
-
+
java_udf_path=${DORIS_HOME}/fe/java-udf/target/java-udf-jar-with-dependencies.jar
if [ -f ${java_udf_path} ];then
cp ${java_udf_path} ${DORIS_OUTPUT}/be/lib/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org