You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/03 06:46:03 UTC
(doris) branch master updated: [refactor](wal) refactor some wal code (#29434)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e3c9f535dc8 [refactor](wal) refactor some wal code (#29434)
e3c9f535dc8 is described below
commit e3c9f535dc8c350f40b755618a019104659dbb05
Author: abmdocrt <Yu...@gmail.com>
AuthorDate: Wed Jan 3 14:45:57 2024 +0800
[refactor](wal) refactor some wal code (#29434)
---
be/src/http/action/http_stream.cpp | 22 ++++++++++------------
be/src/http/action/stream_load.cpp | 24 +++++++++++-------------
be/src/io/fs/local_file_system.cpp | 2 +-
be/src/olap/wal/wal_dirs_info.cpp | 26 +++++++++++---------------
be/src/olap/wal/wal_dirs_info.h | 9 ++++-----
be/src/olap/wal/wal_manager.cpp | 22 +++++++++-------------
be/src/olap/wal/wal_manager.h | 10 ++++------
be/src/runtime/group_commit_mgr.cpp | 6 +++---
be/test/vec/exec/vwal_scanner_test.cpp | 2 +-
9 files changed, 54 insertions(+), 69 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp
index b97ce2976eb..f1017703570 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -334,19 +334,17 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
- if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
- size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
- if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
- ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
- ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
- content_length *= 3;
- }
- ctx->put_result.params.__set_content_length(content_length);
+ size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+ if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+ ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+ ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+ content_length *= 3;
}
+ ctx->put_result.params.__set_content_length(content_length);
}
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 88e12e19dca..db8e56ec77b 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -633,19 +633,17 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return plan_status;
}
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
- if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
- size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
- if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
- ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
- ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
- content_length *= 3;
- }
- ctx->put_result.params.__set_content_length(content_length);
- }
+ size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+ if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+ ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+ ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+ ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+ content_length *= 3;
+ }
+ ctx->put_result.params.__set_content_length(content_length);
}
VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp
index 78ba222d1ff..1828c7c3d2c 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -186,7 +186,7 @@ Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) {
try {
*dir_size += std::filesystem::file_size(entry);
} catch (const std::exception& e) {
- LOG(INFO) << "{}", e.what();
+ LOG(INFO) << "failed to get file size, err: {}", e.what();
}
}
}
diff --git a/be/src/olap/wal/wal_dirs_info.cpp b/be/src/olap/wal/wal_dirs_info.cpp
index 19ad2562778..79bd024e7d0 100644
--- a/be/src/olap/wal/wal_dirs_info.cpp
+++ b/be/src/olap/wal/wal_dirs_info.cpp
@@ -43,13 +43,10 @@ void WalDirInfo::set_used(size_t used) {
_used = used;
}
-void WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) {
+void WalDirInfo::set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated) {
std::unique_lock wlock(_lock);
- if (is_add_pre_allocated) {
- _pre_allocated += pre_allocated;
- } else {
- _pre_allocated -= pre_allocated;
- }
+ _pre_allocated += increase_pre_allocated;
+ _pre_allocated -= decrease_pre_allocated;
}
size_t WalDirInfo::available() {
@@ -72,9 +69,6 @@ Status WalDirInfo::update_wal_dir_limit(size_t limit) {
if (wal_disk_limit <= 0) {
return Status::InternalError("Disk full! Please check your disk usage!");
}
- size_t wal_dir_size = 0;
- RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size));
- // TODO should be wal_disk_limit + wal_dir_size
set_limit(wal_disk_limit);
}
return Status::OK();
@@ -91,9 +85,9 @@ Status WalDirInfo::update_wal_dir_used(size_t used) {
return Status::OK();
}
-Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) {
- set_pre_allocated(pre_allocated, is_add_pre_allocated);
- return Status::OK();
+void WalDirInfo::update_wal_dir_pre_allocated(size_t increase_pre_allocated,
+ size_t decrease_pre_allocated) {
+ set_pre_allocated(increase_pre_allocated, decrease_pre_allocated);
}
Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used,
@@ -178,11 +172,13 @@ Status WalDirsInfo::update_all_wal_dir_used() {
return Status::OK();
}
-Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated,
- bool is_add_pre_allocated) {
+Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated,
+ size_t decrease_pre_allocated) {
for (const auto& wal_dir_info : _wal_dirs_info_vec) {
if (wal_dir_info->get_wal_dir() == wal_dir) {
- return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated, is_add_pre_allocated);
+ wal_dir_info->update_wal_dir_pre_allocated(increase_pre_allocated,
+ decrease_pre_allocated);
+ return Status::OK();
}
}
return Status::InternalError("Can not find wal dir in wal disks info.");
diff --git a/be/src/olap/wal/wal_dirs_info.h b/be/src/olap/wal/wal_dirs_info.h
index 91a26af2b5d..eda9cc72d30 100644
--- a/be/src/olap/wal/wal_dirs_info.h
+++ b/be/src/olap/wal/wal_dirs_info.h
@@ -42,12 +42,11 @@ public:
size_t get_limit();
void set_limit(size_t limit);
void set_used(size_t used);
- // TODO increase_pre_allocated and decrease_pre_allocated
- void set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated);
+ void set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated);
size_t available();
Status update_wal_dir_limit(size_t limit = -1);
Status update_wal_dir_used(size_t used = -1);
- Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true);
+ void update_wal_dir_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated);
private:
std::string _wal_dir;
@@ -70,8 +69,8 @@ public:
Status update_all_wal_dir_limit();
Status update_wal_dir_used(std::string wal_dir, size_t used = -1);
Status update_all_wal_dir_used();
- Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated,
- bool is_add_pre_allocated);
+ Status update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated,
+ size_t decrease_pre_allocated);
Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes);
private:
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 9af91cf7b55..9d92f5c9e6a 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -232,8 +232,8 @@ void WalManager::print_wal_status_queue() {
LOG(INFO) << ss.str();
}
-Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
- const std::string& label, std::string& base_path) {
+Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
+ const std::string& label, std::string& base_path) {
base_path = _wal_dirs_info->get_available_random_wal_dir();
std::stringstream ss;
ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/"
@@ -286,11 +286,6 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>&
LOG(INFO) << "create wal " << wal_path;
wal_writer = std::make_shared<WalWriter>(wal_path);
RETURN_IF_ERROR(wal_writer->init());
- {
- // TODO no use, should remove it
- std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
- _wal_id_to_writer_map.emplace(wal_id, wal_writer);
- }
return Status::OK();
}
@@ -431,8 +426,8 @@ Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated)
_wal_path_map.erase(wal_id);
}
}
- RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path),
- block_queue_pre_allocated, false));
+ RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
+ block_queue_pre_allocated));
return Status::OK();
}
@@ -481,10 +476,11 @@ Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used)
return _wal_dirs_info->update_wal_dir_used(wal_dir, used);
}
-Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
- bool is_add_pre_allocated) {
- return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated,
- is_add_pre_allocated);
+Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir,
+ size_t increase_pre_allocated,
+ size_t decrease_pre_allocated) {
+ return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, increase_pre_allocated,
+ decrease_pre_allocated);
}
Status WalManager::_update_wal_dir_info_thread() {
diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h
index f4e28445f8f..f6a3bfef798 100644
--- a/be/src/olap/wal/wal_manager.h
+++ b/be/src/olap/wal/wal_manager.h
@@ -63,14 +63,14 @@ public:
// wal back pressure
Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
- Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
- bool is_add_pre_allocated);
+ Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t increase_pre_allocated,
+ size_t decrease_pre_allocated);
Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes);
size_t get_max_available_size();
// replay wal
- Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label,
- std::string& base_path);
+ Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
+ const std::string& label, std::string& base_path);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal);
@@ -130,8 +130,6 @@ private:
std::shared_mutex _wal_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
- // TODO no use? need remove it. And the map dose not clear
- std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map;
// TODO Now only used for debug wal status, consider remove it
std::shared_mutex _wal_status_lock;
diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp
index 2971138d5b6..8a81388ecd2 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -491,8 +491,8 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
- RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, tb_id, wal_id,
- import_label, _wal_base_path));
+ RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(
+ db_id, tb_id, wal_id, import_label, _wal_base_path));
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version);
return _v_wal_writer->init();
@@ -515,7 +515,7 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
}
}
if (pre_allocated < available_bytes) {
- Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, true);
+ Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, 0);
if (!st.ok()) {
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string();
}
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp
index 2d786679d4e..cf9e1733643 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -218,7 +218,7 @@ void VWalScannerTest::init() {
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
- st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label, base_path);
+ st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path);
}
TEST_F(VWalScannerTest, normal) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org