You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/03 06:46:03 UTC

(doris) branch master updated: [refactor](wal) refactor some wal code (#29434)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3c9f535dc8 [refactor](wal) refactor some wal code (#29434)
e3c9f535dc8 is described below

commit e3c9f535dc8c350f40b755618a019104659dbb05
Author: abmdocrt <Yu...@gmail.com>
AuthorDate: Wed Jan 3 14:45:57 2024 +0800

    [refactor](wal) refactor some wal code (#29434)
---
 be/src/http/action/http_stream.cpp     | 22 ++++++++++------------
 be/src/http/action/stream_load.cpp     | 24 +++++++++++-------------
 be/src/io/fs/local_file_system.cpp     |  2 +-
 be/src/olap/wal/wal_dirs_info.cpp      | 26 +++++++++++---------------
 be/src/olap/wal/wal_dirs_info.h        |  9 ++++-----
 be/src/olap/wal/wal_manager.cpp        | 22 +++++++++-------------
 be/src/olap/wal/wal_manager.h          | 10 ++++------
 be/src/runtime/group_commit_mgr.cpp    |  6 +++---
 be/test/vec/exec/vwal_scanner_test.cpp |  2 +-
 9 files changed, 54 insertions(+), 69 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp
index b97ce2976eb..f1017703570 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -334,19 +334,17 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
     ctx->label = ctx->put_result.params.import_label;
     ctx->put_result.params.__set_wal_id(ctx->wal_id);
     if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
-        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
-            size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
-            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
-                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
-                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
-                content_length *= 3;
-            }
-            ctx->put_result.params.__set_content_length(content_length);
+        size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+        if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+            ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+            ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+            content_length *= 3;
         }
+        ctx->put_result.params.__set_content_length(content_length);
     }
 
     return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 88e12e19dca..db8e56ec77b 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -633,19 +633,17 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
         return plan_status;
     }
     if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
-        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
-            size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
-            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
-                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
-                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
-                content_length *= 3;
-            }
-            ctx->put_result.params.__set_content_length(content_length);
-        }
+        size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
+        if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
+            ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
+            ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
+            ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+            content_length *= 3;
+        }
+        ctx->put_result.params.__set_content_length(content_length);
     }
 
     VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp
index 78ba222d1ff..1828c7c3d2c 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -186,7 +186,7 @@ Status LocalFileSystem::directory_size(const Path& dir_path, size_t* dir_size) {
                 try {
                     *dir_size += std::filesystem::file_size(entry);
                 } catch (const std::exception& e) {
-                    LOG(INFO) << "{}", e.what();
+                    LOG(INFO) << "failed to get file size, err: {}", e.what();
                 }
             }
         }
diff --git a/be/src/olap/wal/wal_dirs_info.cpp b/be/src/olap/wal/wal_dirs_info.cpp
index 19ad2562778..79bd024e7d0 100644
--- a/be/src/olap/wal/wal_dirs_info.cpp
+++ b/be/src/olap/wal/wal_dirs_info.cpp
@@ -43,13 +43,10 @@ void WalDirInfo::set_used(size_t used) {
     _used = used;
 }
 
-void WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) {
+void WalDirInfo::set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated) {
     std::unique_lock wlock(_lock);
-    if (is_add_pre_allocated) {
-        _pre_allocated += pre_allocated;
-    } else {
-        _pre_allocated -= pre_allocated;
-    }
+    _pre_allocated += increase_pre_allocated;
+    _pre_allocated -= decrease_pre_allocated;
 }
 
 size_t WalDirInfo::available() {
@@ -72,9 +69,6 @@ Status WalDirInfo::update_wal_dir_limit(size_t limit) {
         if (wal_disk_limit <= 0) {
             return Status::InternalError("Disk full! Please check your disk usage!");
         }
-        size_t wal_dir_size = 0;
-        RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size));
-        // TODO should be wal_disk_limit + wal_dir_size
         set_limit(wal_disk_limit);
     }
     return Status::OK();
@@ -91,9 +85,9 @@ Status WalDirInfo::update_wal_dir_used(size_t used) {
     return Status::OK();
 }
 
-Status WalDirInfo::update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) {
-    set_pre_allocated(pre_allocated, is_add_pre_allocated);
-    return Status::OK();
+void WalDirInfo::update_wal_dir_pre_allocated(size_t increase_pre_allocated,
+                                              size_t decrease_pre_allocated) {
+    set_pre_allocated(increase_pre_allocated, decrease_pre_allocated);
 }
 
 Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used,
@@ -178,11 +172,13 @@ Status WalDirsInfo::update_all_wal_dir_used() {
     return Status::OK();
 }
 
-Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated,
-                                                 bool is_add_pre_allocated) {
+Status WalDirsInfo::update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated,
+                                                 size_t decrease_pre_allocated) {
     for (const auto& wal_dir_info : _wal_dirs_info_vec) {
         if (wal_dir_info->get_wal_dir() == wal_dir) {
-            return wal_dir_info->update_wal_dir_pre_allocated(pre_allocated, is_add_pre_allocated);
+            wal_dir_info->update_wal_dir_pre_allocated(increase_pre_allocated,
+                                                       decrease_pre_allocated);
+            return Status::OK();
         }
     }
     return Status::InternalError("Can not find wal dir in wal disks info.");
diff --git a/be/src/olap/wal/wal_dirs_info.h b/be/src/olap/wal/wal_dirs_info.h
index 91a26af2b5d..eda9cc72d30 100644
--- a/be/src/olap/wal/wal_dirs_info.h
+++ b/be/src/olap/wal/wal_dirs_info.h
@@ -42,12 +42,11 @@ public:
     size_t get_limit();
     void set_limit(size_t limit);
     void set_used(size_t used);
-    // TODO increase_pre_allocated and decrease_pre_allocated
-    void set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated);
+    void set_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated);
     size_t available();
     Status update_wal_dir_limit(size_t limit = -1);
     Status update_wal_dir_used(size_t used = -1);
-    Status update_wal_dir_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated = true);
+    void update_wal_dir_pre_allocated(size_t increase_pre_allocated, size_t decrease_pre_allocated);
 
 private:
     std::string _wal_dir;
@@ -70,8 +69,8 @@ public:
     Status update_all_wal_dir_limit();
     Status update_wal_dir_used(std::string wal_dir, size_t used = -1);
     Status update_all_wal_dir_used();
-    Status update_wal_dir_pre_allocated(std::string wal_dir, size_t pre_allocated,
-                                        bool is_add_pre_allocated);
+    Status update_wal_dir_pre_allocated(std::string wal_dir, size_t increase_pre_allocated,
+                                        size_t decrease_pre_allocated);
     Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes);
 
 private:
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 9af91cf7b55..9d92f5c9e6a 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -232,8 +232,8 @@ void WalManager::print_wal_status_queue() {
     LOG(INFO) << ss.str();
 }
 
-Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
-                                const std::string& label, std::string& base_path) {
+Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
+                                   const std::string& label, std::string& base_path) {
     base_path = _wal_dirs_info->get_available_random_wal_dir();
     std::stringstream ss;
     ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/"
@@ -286,11 +286,6 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>&
     LOG(INFO) << "create wal " << wal_path;
     wal_writer = std::make_shared<WalWriter>(wal_path);
     RETURN_IF_ERROR(wal_writer->init());
-    {
-        // TODO no use, should remove it
-        std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
-        _wal_id_to_writer_map.emplace(wal_id, wal_writer);
-    }
     return Status::OK();
 }
 
@@ -431,8 +426,8 @@ Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated)
             _wal_path_map.erase(wal_id);
         }
     }
-    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path),
-                                                 block_queue_pre_allocated, false));
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
+                                                 block_queue_pre_allocated));
     return Status::OK();
 }
 
@@ -481,10 +476,11 @@ Status WalManager::update_wal_dir_used(const std::string& wal_dir, size_t used)
     return _wal_dirs_info->update_wal_dir_used(wal_dir, used);
 }
 
-Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
-                                                bool is_add_pre_allocated) {
-    return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, pre_allocated,
-                                                        is_add_pre_allocated);
+Status WalManager::update_wal_dir_pre_allocated(const std::string& wal_dir,
+                                                size_t increase_pre_allocated,
+                                                size_t decrease_pre_allocated) {
+    return _wal_dirs_info->update_wal_dir_pre_allocated(wal_dir, increase_pre_allocated,
+                                                        decrease_pre_allocated);
 }
 
 Status WalManager::_update_wal_dir_info_thread() {
diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h
index f4e28445f8f..f6a3bfef798 100644
--- a/be/src/olap/wal/wal_manager.h
+++ b/be/src/olap/wal/wal_manager.h
@@ -63,14 +63,14 @@ public:
     // wal back pressure
     Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
     Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
-    Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
-                                        bool is_add_pre_allocated);
+    Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t increase_pre_allocated,
+                                        size_t decrease_pre_allocated);
     Status get_wal_dir_available_size(const std::string& wal_dir, size_t* available_bytes);
     size_t get_max_available_size();
 
     // replay wal
-    Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label,
-                        std::string& base_path);
+    Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
+                           const std::string& label, std::string& base_path);
     Status get_wal_path(int64_t wal_id, std::string& wal_path);
     Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
     Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal);
@@ -130,8 +130,6 @@ private:
 
     std::shared_mutex _wal_lock;
     std::unordered_map<int64_t, std::string> _wal_path_map;
-    // TODO no use? need remove it. And the map dose not clear
-    std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map;
 
     // TODO Now only used for debug wal status, consider remove it
     std::shared_mutex _wal_status_lock;
diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp
index 2971138d5b6..8a81388ecd2 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -491,8 +491,8 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
 Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
                                   const std::string& import_label, WalManager* wal_manager,
                                   std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
-    RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id, tb_id, wal_id,
-                                                                    import_label, _wal_base_path));
+    RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(
+            db_id, tb_id, wal_id, import_label, _wal_base_path));
     _v_wal_writer = std::make_shared<vectorized::VWalWriter>(
             tb_id, wal_id, import_label, wal_manager, slot_desc, be_exe_version);
     return _v_wal_writer->init();
@@ -515,7 +515,7 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t pre_allocated) {
         }
     }
     if (pre_allocated < available_bytes) {
-        Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, true);
+        Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path, pre_allocated, 0);
         if (!st.ok()) {
             LOG(WARNING) << "update wal dir pre_allocated failed, reason: " << st.to_string();
         }
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp
index 2d786679d4e..cf9e1733643 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -218,7 +218,7 @@ void VWalScannerTest::init() {
     _env->_wal_manager = WalManager::create_shared(_env, wal_dir);
     std::string base_path;
     auto st = _env->_wal_manager->_init_wal_dirs_info();
-    st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label, base_path);
+    st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path);
 }
 
 TEST_F(VWalScannerTest, normal) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org