You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/07 10:54:48 UTC

(doris) branch master updated: [enhancement](group_commit) refector wal manager code (#29560)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b731800a09 [enhancement](group_commit) refector wal manager code (#29560)
0b731800a09 is described below

commit 0b731800a09e6761452f456b30c2061a75ad911d
Author: huanghaibin <28...@qq.com>
AuthorDate: Sun Jan 7 18:54:41 2024 +0800

    [enhancement](group_commit) refector wal manager code (#29560)
---
 be/src/olap/wal/wal_manager.cpp                    | 216 ++++++++-------------
 be/src/olap/wal/wal_manager.h                      |  45 ++---
 be/src/olap/wal/wal_reader.cpp                     |   4 +-
 be/src/olap/wal/wal_reader.h                       |   3 +-
 be/src/olap/wal/wal_table.cpp                      | 123 ++++--------
 be/src/olap/wal/wal_table.h                        |  18 +-
 be/src/olap/wal/wal_writer.cpp                     |   8 +
 be/src/runtime/group_commit_mgr.cpp                |  11 +-
 be/src/service/internal_service.cpp                |   4 +-
 be/src/vec/exec/format/wal/wal_reader.cpp          |  49 ++---
 be/src/vec/exec/format/wal/wal_reader.h            |  15 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |   2 +-
 be/src/vec/sink/writer/vwal_writer.cpp             |  12 +-
 be/src/vec/sink/writer/vwal_writer.h               |   3 +
 be/test/vec/exec/vwal_scanner_test.cpp             |   1 -
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  19 +-
 .../org/apache/doris/load/GroupCommitManager.java  |  10 +-
 .../ExternalFileTableValuedFunction.java           |   6 +-
 gensrc/proto/internal_service.proto                |   1 -
 19 files changed, 217 insertions(+), 333 deletions(-)

diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 19b92c38594..b1931f62a62 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -28,9 +28,9 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "gutil/strings/split.h"
 #include "io/fs/local_file_system.h"
 #include "olap/wal/wal_dirs_info.h"
-#include "olap/wal/wal_writer.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/plan_fragment_executor.h"
@@ -40,7 +40,7 @@
 namespace doris {
 WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
         : _exec_env(exec_env), _stop(false), _stop_background_threads_latch(1) {
-    doris::vectorized::WalReader::string_split(wal_dir_list, ";", _wal_dirs);
+    _wal_dirs = strings::Split(wal_dir_list, ";", strings::SkipWhitespace());
     static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
                               .set_min_threads(1)
                               .set_max_threads(config::group_commit_relay_wal_threads)
@@ -109,7 +109,7 @@ Status WalManager::_init_wal_dirs_conf() {
 Status WalManager::_init_wal_dirs() {
     bool exists = false;
     for (auto wal_dir : _wal_dirs) {
-        std::string tmp_dir = wal_dir + "/" + tmp;
+        std::string tmp_dir = wal_dir + "/" + _tmp;
         LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
         RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists));
         if (!exists) {
@@ -160,76 +160,50 @@ Status WalManager::_init_wal_dirs_info() {
             &_update_wal_dirs_info_thread);
 }
 
-void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) {
-    std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+void WalManager::add_wal_queue(int64_t table_id, int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
     LOG(INFO) << "add wal queue "
-              << ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status;
-    auto it = _wal_status_queues.find(table_id);
-    if (it == _wal_status_queues.end()) {
-        std::unordered_map<int64_t, WalStatus> tmp_map;
-        tmp_map.emplace(wal_id, wal_status);
-        _wal_status_queues.emplace(table_id, tmp_map);
+              << ",table_id:" << table_id << ",wal_id:" << wal_id;
+    auto it = _wal_queues.find(table_id);
+    if (it == _wal_queues.end()) {
+        std::set<int64_t> tmp_set;
+        tmp_set.insert(wal_id);
+        _wal_queues.emplace(table_id, tmp_set);
     } else {
-        it->second.emplace(wal_id, wal_status);
+        it->second.insert(wal_id);
     }
 }
 
-Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) {
-    std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
-    auto it = _wal_status_queues.find(table_id);
-    LOG(INFO) << "remove wal queue "
-              << ",table_id:" << table_id << ",wal_id:" << wal_id;
-    if (it == _wal_status_queues.end()) {
-        return Status::InternalError("table_id " + std::to_string(table_id) +
-                                     " not found in wal status queue");
-    } else {
+void WalManager::erase_wal_queue(int64_t table_id, int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
+    auto it = _wal_queues.find(table_id);
+    if (it != _wal_queues.end()) {
+        LOG(INFO) << "remove wal queue "
+                  << ",table_id:" << table_id << ",wal_id:" << wal_id;
         it->second.erase(wal_id);
         if (it->second.empty()) {
-            _wal_status_queues.erase(table_id);
+            _wal_queues.erase(table_id);
         }
     }
-    return Status::OK();
 }
 
-Status WalManager::get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
-                                             PGetWalQueueSizeResponse* response) {
-    std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+size_t WalManager::get_wal_queue_size(int64_t table_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
     size_t count = 0;
-    auto table_id = request->table_id();
-    auto txn_id = request->txn_id();
-    if (table_id > 0 && txn_id > 0) {
-        auto it = _wal_status_queues.find(table_id);
-        if (it == _wal_status_queues.end()) {
-            LOG(INFO) << ("table_id " + std::to_string(table_id) +
-                          " not found in wal status queue");
+    if (table_id > 0) {
+        auto it = _wal_queues.find(table_id);
+        if (it != _wal_queues.end()) {
+            return it->second.size();
         } else {
-            for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) {
-                if (wal_it->first <= txn_id) {
-                    count += 1;
-                }
-            }
+            return 0;
         }
     } else {
-        for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); it++) {
+        //table_id is -1 meaning get all table wal size
+        for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) {
             count += it->second.size();
         }
     }
-    response->set_size(count);
-    if (count > 0) {
-        print_wal_status_queue();
-    }
-    return Status::OK();
-}
-
-void WalManager::print_wal_status_queue() {
-    std::stringstream ss;
-    for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); ++it) {
-        ss << "table_id:" << it->first << std::endl;
-        for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) {
-            ss << "wal_id:" << wal_it->first << ",status:" << wal_it->second << std::endl;
-        }
-    }
-    LOG(INFO) << ss.str();
+    return count;
 }
 
 Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
@@ -239,7 +213,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_
     ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/"
        << std::to_string(wal_id) << "_" << label;
     {
-        std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
         auto it = _wal_path_map.find(wal_id);
         if (it != _wal_path_map.end()) {
             return Status::InternalError("wal_id {} already in wal_path_map", wal_id);
@@ -250,7 +224,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_
 }
 
 Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
-    std::shared_lock rdlock(_wal_lock);
+    std::shared_lock rdlock(_wal_path_lock);
     auto it = _wal_path_map.find(wal_id);
     if (it != _wal_path_map.end()) {
         wal_path = _wal_path_map[wal_id];
@@ -260,35 +234,6 @@ Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
     return Status::OK();
 }
 
-Status WalManager::create_wal_reader(const std::string& wal_path,
-                                     std::shared_ptr<WalReader>& wal_reader) {
-    wal_reader = std::make_shared<WalReader>(wal_path);
-    RETURN_IF_ERROR(wal_reader->init());
-    return Status::OK();
-}
-
-Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) {
-    std::string wal_path;
-    RETURN_IF_ERROR(get_wal_path(wal_id, wal_path));
-    // TODO move the create_dir into wal_writer::init
-    std::vector<std::string> path_element;
-    doris::vectorized::WalReader::string_split(wal_path, "/", path_element);
-    std::stringstream ss;
-    for (int i = 0; i < path_element.size() - 1; i++) {
-        ss << path_element[i] << "/";
-    }
-    std::string base_path = ss.str();
-    bool exists = false;
-    RETURN_IF_ERROR(io::global_local_filesystem()->exists(base_path, &exists));
-    if (!exists) {
-        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
-    }
-    LOG(INFO) << "create wal " << wal_path;
-    wal_writer = std::make_shared<WalWriter>(wal_path);
-    RETURN_IF_ERROR(wal_writer->init());
-    return Status::OK();
-}
-
 Status WalManager::_scan_wals(const std::string& wal_path) {
     size_t count = 0;
     bool exists = true;
@@ -299,7 +244,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
         return st;
     }
     for (const auto& database_id : dbs) {
-        if (database_id.is_file || database_id.file_name == tmp) {
+        if (database_id.is_file || database_id.file_name == _tmp) {
             continue;
         }
         std::vector<io::FileInfo> tables;
@@ -329,7 +274,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
                 auto wal_file = table_path + "/" + wal.file_name;
                 res.emplace_back(wal_file);
                 {
-                    std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+                    std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
                     auto pos = wal.file_name.find("_");
                     try {
                         int64_t wal_id =
@@ -337,7 +282,6 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
                         _wal_path_map.emplace(wal_id, wal_file);
                         int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
                         int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
-                        add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY);
                         if (config::group_commit_wait_replay_wal_finish) {
                             std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
                             std::shared_ptr<std::condition_variable> cv =
@@ -396,6 +340,7 @@ Status WalManager::_replay() {
 
 Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
                                    std::string wal) {
+    add_wal_queue(table_id, wal_id);
     std::lock_guard<std::shared_mutex> wrlock(_table_lock);
     std::shared_ptr<WalTable> table_ptr;
     auto it = _table_map.find(table_id);
@@ -423,23 +368,6 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {
     }
 }
 
-Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) {
-    std::string wal_path;
-    {
-        std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
-        auto it = _wal_path_map.find(wal_id);
-        if (it != _wal_path_map.end()) {
-            wal_path = it->second;
-            RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
-            LOG(INFO) << "delete file=" << wal_path;
-            _wal_path_map.erase(wal_id);
-        }
-    }
-    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
-                                                 block_queue_pre_allocated));
-    return Status::OK();
-}
-
 void WalManager::_stop_relay_wal() {
     std::lock_guard<std::shared_mutex> wrlock(_table_lock);
     for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
@@ -447,32 +375,6 @@ void WalManager::_stop_relay_wal() {
     }
 }
 
-void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index) {
-    std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
-    _wal_column_id_map.emplace(wal_id, column_index);
-    LOG(INFO) << "add " << wal_id << " to wal_column_id_map";
-}
-
-void WalManager::erase_wal_column_index(int64_t wal_id) {
-    std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
-    if (_wal_column_id_map.erase(wal_id)) {
-        LOG(INFO) << "erase " << wal_id << " from wal_column_id_map";
-    } else {
-        LOG(WARNING) << "fail to erase wal " << wal_id << " from wal_column_id_map";
-    }
-}
-
-Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index) {
-    std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
-    auto it = _wal_column_id_map.find(wal_id);
-    if (it != _wal_column_id_map.end()) {
-        column_index = it->second;
-    } else {
-        return Status::InternalError("cannot find wal {} in wal_column_id_map", wal_id);
-    }
-    return Status::OK();
-}
-
 size_t WalManager::get_max_available_size() {
     return _wal_dirs_info->get_max_available_size();
 }
@@ -580,4 +482,54 @@ Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>&
     return Status::OK();
 }
 
+Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated) {
+    std::string wal_path;
+    {
+        std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+        auto it = _wal_path_map.find(wal_id);
+        if (it != _wal_path_map.end()) {
+            wal_path = it->second;
+            auto st = io::global_local_filesystem()->delete_file(wal_path);
+            if (st.ok()) {
+                LOG(INFO) << "delete file=" << wal_path;
+            } else {
+                LOG(WARNING) << "fail to delete file=" << wal_path;
+            }
+            _wal_path_map.erase(wal_id);
+        }
+    }
+    erase_wal_queue(table_id, wal_id);
+    RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
+                                                 block_queue_pre_allocated));
+    return Status::OK();
+}
+
+Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when rename " + wal);
+        }
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
+    }
+    wal_path.append(_tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
+    }
+    bool exists = false;
+    RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists));
+    if (!exists) {
+        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    erase_wal_queue(table_id, wal_id);
+    return Status::OK();
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h
index b0e4d1d1960..44fdef2e6c0 100644
--- a/be/src/olap/wal/wal_manager.h
+++ b/be/src/olap/wal/wal_manager.h
@@ -24,6 +24,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <set>
 #include <shared_mutex>
 #include <thread>
 #include <unordered_map>
@@ -46,13 +47,6 @@ namespace doris {
 class WalManager {
     ENABLE_FACTORY_CREATOR(WalManager);
 
-public:
-    enum WalStatus {
-        PREPARE = 0,
-        REPLAY = 1,
-        CREATE = 2,
-    };
-
 public:
     WalManager(ExecEnv* exec_env, const std::string& wal_dir);
     ~WalManager();
@@ -72,25 +66,14 @@ public:
     Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
                            const std::string& label, std::string& base_path);
     Status get_wal_path(int64_t wal_id, std::string& wal_path);
-    Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
+    Status delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated = 0);
+    Status rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id);
     Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal);
-    // used for ut
+    void add_wal_queue(int64_t table_id, int64_t wal_id);
+    void erase_wal_queue(int64_t table_id, int64_t wal_id);
+    size_t get_wal_queue_size(int64_t table_id);
+    // fot ut
     size_t get_wal_table_size(int64_t table_id);
-    // TODO util function, should remove
-    Status create_wal_reader(const std::string& wal_path, std::shared_ptr<WalReader>& wal_reader);
-    Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer);
-
-    // for wal status, can be removed
-    Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
-                                     PGetWalQueueSizeResponse* response);
-    void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status);
-    Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
-    void print_wal_status_queue();
-
-    // for _wal_column_id_map, can be removed
-    void add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
-    void erase_wal_column_index(int64_t wal_id);
-    Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
 
     //for test relay
     Status add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock,
@@ -118,12 +101,11 @@ public:
     // used for be ut
     size_t wal_limit_test_bytes;
 
-    const std::string tmp = "tmp";
-
 private:
     ExecEnv* _exec_env = nullptr;
     std::atomic<bool> _stop;
     CountDownLatch _stop_background_threads_latch;
+    const std::string _tmp = "tmp";
 
     // wal back pressure
     std::vector<std::string> _wal_dirs;
@@ -137,16 +119,11 @@ private:
     std::shared_mutex _table_lock;
     std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
 
-    std::shared_mutex _wal_lock;
+    std::shared_mutex _wal_path_lock;
     std::unordered_map<int64_t, std::string> _wal_path_map;
 
-    // TODO Now only used for debug wal status, consider remove it
-    std::shared_mutex _wal_status_lock;
-    std::unordered_map<int64_t, std::unordered_map<int64_t, WalStatus>> _wal_status_queues;
-
-    // TODO should remove
-    std::shared_mutex _wal_column_id_map_lock;
-    std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
+    std::shared_mutex _wal_queue_lock;
+    std::unordered_map<int64_t, std::set<int64_t>> _wal_queues;
 
     // for test relay
     // <lock, condition_variable>
diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp
index 75bbaf10947..36a4c15aa2b 100644
--- a/be/src/olap/wal/wal_reader.cpp
+++ b/be/src/olap/wal/wal_reader.cpp
@@ -70,7 +70,7 @@ Status WalReader::read_block(PBlock& block) {
     return Status::OK();
 }
 
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
+Status WalReader::read_header(std::string& col_ids) {
     size_t bytes_read = 0;
     std::string magic_str;
     magic_str.resize(k_wal_magic_length);
@@ -83,7 +83,7 @@ Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
     RETURN_IF_ERROR(
             file_reader->read_at(_offset, {version_buf, WalWriter::VERSION_SIZE}, &bytes_read));
     _offset += WalWriter::VERSION_SIZE;
-    version = decode_fixed32_le(version_buf);
+    _version = decode_fixed32_le(version_buf);
     uint8_t len_buf[WalWriter::LENGTH_SIZE];
     RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, WalWriter::LENGTH_SIZE}, &bytes_read));
     _offset += WalWriter::LENGTH_SIZE;
diff --git a/be/src/olap/wal/wal_reader.h b/be/src/olap/wal/wal_reader.h
index f68a031c09b..7c852ee18f6 100644
--- a/be/src/olap/wal/wal_reader.h
+++ b/be/src/olap/wal/wal_reader.h
@@ -32,7 +32,7 @@ public:
     Status finalize();
 
     Status read_block(PBlock& block);
-    Status read_header(uint32_t& version, std::string& col_ids);
+    Status read_header(std::string& col_ids);
 
 private:
     Status _deserialize(PBlock& block, std::string& buf);
@@ -40,6 +40,7 @@ private:
 
 private:
     std::string _file_name;
+    uint32_t _version = 0;
     size_t _offset;
     io::FileReaderSPtr file_reader;
 };
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 5187e594adc..54500273daa 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -19,6 +19,7 @@
 
 #include <thrift/protocol/TDebugProtocol.h>
 
+#include "gutil/strings/split.h"
 #include "http/action/http_stream.h"
 #include "http/action/stream_load.h"
 #include "http/ev_http_server.h"
@@ -33,12 +34,11 @@
 #include "runtime/plan_fragment_executor.h"
 #include "util/path_util.h"
 #include "util/thrift_rpc_helper.h"
-#include "vec/exec/format/wal/wal_reader.h"
 
 namespace doris {
 
 WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
-        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {
+        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {
     _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
 }
 WalTable::~WalTable() {}
@@ -63,7 +63,8 @@ void WalTable::_pick_relay_wals() {
         if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
             LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id
                          << ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num();
-            auto st = _rename_to_tmp_path(it->first);
+            auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first, _table_id,
+                                                               wal_info->get_wal_id());
             if (!st.ok()) {
                 LOG(WARNING) << "rename " << it->first << " fail"
                              << ",st:" << st.to_string();
@@ -124,7 +125,7 @@ Status WalTable::_relay_wal_one_by_one() {
         }
     }
     for (auto delete_wal_info : need_delete_wals) {
-        auto st = _delete_wal(delete_wal_info->get_wal_id());
+        auto st = _exec_env->wal_mgr()->delete_wal(_table_id, delete_wal_info->get_wal_id());
         if (!st.ok()) {
             LOG(WARNING) << "fail to delete wal " << delete_wal_info->get_wal_path();
         }
@@ -154,33 +155,6 @@ Status WalTable::replay_wals() {
     return Status::OK();
 }
 
-Status WalTable::_rename_to_tmp_path(const std::string wal) {
-    io::Path wal_path = wal;
-    std::list<std::string> path_element;
-    for (int i = 0; i < 3; ++i) {
-        if (!wal_path.has_parent_path()) {
-            return Status::InternalError("parent path is not enough when rename " + wal);
-        }
-        path_element.push_front(wal_path.filename().string());
-        wal_path = wal_path.parent_path();
-    }
-    wal_path.append(_exec_env->wal_mgr()->tmp);
-    for (auto path : path_element) {
-        wal_path.append(path);
-    }
-    bool exists = false;
-    RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists));
-    if (!exists) {
-        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
-    }
-    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
-    if (res < 0) {
-        return Status::InternalError("rename fail on path " + wal);
-    }
-    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
-    return Status::OK();
-}
-
 bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
     if (config::group_commit_wait_replay_wal_finish) {
         return true;
@@ -217,10 +191,9 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
 
 Status WalTable::_replay_wal_internal(const std::string& wal) {
     LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal;
-    std::shared_ptr<std::pair<int64_t, std::string>> pair = nullptr;
-    RETURN_IF_ERROR(_parse_wal_path(wal, pair));
-    auto wal_id = pair->first;
-    auto label = pair->second;
+    int64_t wal_id = 0;
+    std::string label = "";
+    RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
 #ifndef BE_TEST
     if (!config::group_commit_wait_replay_wal_finish) {
         auto st = _try_abort_txn(_db_id, wal_id);
@@ -228,22 +201,18 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
             LOG(WARNING) << "abort txn " << wal_id << " fail";
         }
     }
-    RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
 #endif
     RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label));
     return Status::OK();
 }
 
-Status WalTable::_parse_wal_path(const std::string& wal,
-                                 std::shared_ptr<std::pair<int64_t, std::string>>& pair) {
-    std::vector<std::string> path_element;
-    doris::vectorized::WalReader::string_split(wal, "/", path_element);
-    auto pos = path_element[path_element.size() - 1].find("_");
+Status WalTable::_parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label) {
+    io::Path wal_path = wal;
+    auto file_name = wal_path.filename().string();
+    auto pos = file_name.find("_");
     try {
-        int64_t wal_id = std::strtoll(path_element[path_element.size() - 1].substr(0, pos).c_str(),
-                                      NULL, 10);
-        auto label = path_element[path_element.size() - 1].substr(pos + 1);
-        pair = std::make_shared<std::pair<int64_t, std::string>>(std::make_pair(wal_id, label));
+        wal_id = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
+        label = file_name.substr(pos + 1);
     } catch (const std::invalid_argument& e) {
         return Status::InvalidArgument("Invalid format, {}", e.what());
     }
@@ -251,33 +220,29 @@ Status WalTable::_parse_wal_path(const std::string& wal,
 }
 
 Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label,
-                                    std::string& sql_str, std::vector<size_t>& index_vector) {
+                                    std::string& sql_str) {
     std::string columns;
     RETURN_IF_ERROR(_read_wal_header(wal, columns));
-    std::vector<std::string> column_id_element;
-    doris::vectorized::WalReader::string_split(columns, ",", column_id_element);
+    std::vector<std::string> column_id_vector =
+            strings::Split(columns, ",", strings::SkipWhitespace());
+    std::map<int64_t, std::string> column_info_map;
+    RETURN_IF_ERROR(_get_column_info(_db_id, _table_id, column_info_map));
     std::stringstream ss_name;
-    std::stringstream ss_id;
-    int index_raw = 0;
-    for (auto column_id_str : column_id_element) {
+    for (auto column_id_str : column_id_vector) {
         try {
             int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
-            auto it = _column_id_info_map.find(column_id);
-            if (it != _column_id_info_map.end()) {
-                ss_name << "`" << it->second->first << "`,";
-                ss_id << "c" << std::to_string(it->second->second) << ",";
-                index_vector.emplace_back(index_raw);
+            auto it = column_info_map.find(column_id);
+            if (it != column_info_map.end()) {
+                ss_name << "`" << it->second << "`,";
             }
-            index_raw++;
         } catch (const std::invalid_argument& e) {
             return Status::InvalidArgument("Invalid format, {}", e.what());
         }
     }
     auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
-    auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
     std::stringstream ss;
     ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
-       << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
+       << name << ") select " << name << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
        << std::to_string(_table_id) << "\")";
     sql_str = ss.str().data();
     return Status::OK();
@@ -286,9 +251,7 @@ Status WalTable::_construct_sql_str(const std::string& wal, const std::string& l
 Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
                                      const std::string& label) {
     std::string sql_str;
-    std::vector<size_t> index_vector;
-    RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector));
-    _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
+    RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str));
     std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
     ctx->sql_str = sql_str;
     ctx->wal_id = wal_id;
@@ -304,13 +267,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
             auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
             st = commit_st;
         } else if (!ctx->status.ok()) {
-            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
-                         << ", errmsg=" << ctx->status;
-            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
             st = ctx->status;
         }
     }
-    _exec_env->wal_mgr()->erase_wal_column_index(wal_id);
+    if (!st.ok()) {
+        LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << st;
+        _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+    }
     LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string();
     return st;
 }
@@ -339,9 +302,6 @@ void WalTable::stop() {
     do {
         {
             std::lock_guard<std::mutex> lock(_replay_wal_lock);
-            if (!this->_stop.load()) {
-                this->_stop.store(true);
-            }
             if (_replay_wal_map.empty() && _replaying_queue.empty()) {
                 break;
             }
@@ -355,10 +315,11 @@ void WalTable::stop() {
 
 size_t WalTable::size() {
     std::lock_guard<std::mutex> lock(_replay_wal_lock);
-    return _replay_wal_map.size();
+    return _replay_wal_map.size() + _replaying_queue.size();
 }
 
-Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) {
+Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id,
+                                  std::map<int64_t, std::string>& column_info_map) {
     TGetColumnInfoRequest request;
     request.__set_db_id(db_id);
     request.__set_table_id(tb_id);
@@ -378,33 +339,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) {
             return status;
         }
         std::vector<TColumnInfo> column_element = result.columns;
-        int64_t column_index = 1;
-        _column_id_info_map.clear();
         for (auto column : column_element) {
             auto column_name = column.column_name;
             auto column_id = column.column_id;
-            std::shared_ptr<ColumnInfo> column_pair =
-                    std::make_shared<ColumnInfo>(std::make_pair(column_name, column_index));
-            _column_id_info_map.emplace(column_id, column_pair);
-            column_index++;
+            column_info_map.emplace(column_id, column_name);
         }
     }
     return status;
 }
 
 Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) {
-    std::shared_ptr<doris::WalReader> wal_reader;
-    RETURN_IF_ERROR(_exec_env->wal_mgr()->create_wal_reader(wal_path, wal_reader));
-    uint32_t version = 0;
-    RETURN_IF_ERROR(wal_reader->read_header(version, columns));
+    std::shared_ptr<doris::WalReader> wal_reader = std::make_shared<WalReader>(wal_path);
+    RETURN_IF_ERROR(wal_reader->init());
+    RETURN_IF_ERROR(wal_reader->read_header(columns));
     RETURN_IF_ERROR(wal_reader->finalize());
     return Status::OK();
 }
 
-Status WalTable::_delete_wal(int64_t wal_id) {
-    RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
-    RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
-    return Status::OK();
-}
-
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h
index 66ee4fd3721..07287d8f7e6 100644
--- a/be/src/olap/wal/wal_table.h
+++ b/be/src/olap/wal/wal_table.h
@@ -41,41 +41,31 @@ public:
     void stop();
 
 private:
-    // <column_name, column_index>
-    using ColumnInfo = std::pair<std::string, int64_t>;
-
     void _pick_relay_wals();
     bool _need_replay(std::shared_ptr<WalInfo>);
     Status _relay_wal_one_by_one();
-    Status _delete_wal(int64_t wal_id);
-    Status _rename_to_tmp_path(const std::string wal);
 
     Status _replay_wal_internal(const std::string& wal);
-    // TODO change the param: (wal, int64_t* wal_id, std::string* label)
-    Status _parse_wal_path(const std::string& wal,
-                           std::shared_ptr<std::pair<int64_t, std::string>>&);
+    Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label);
     Status _try_abort_txn(int64_t db_id, int64_t wal_id);
-    Status _get_column_info(int64_t db_id, int64_t tb_id);
+    Status _get_column_info(int64_t db_id, int64_t tb_id,
+                            std::map<int64_t, std::string>& column_info_map);
 
     Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal,
                                            const std::string& label);
     Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label);
     Status _construct_sql_str(const std::string& wal, const std::string& label,
-                              std::string& sql_str, std::vector<size_t>& index_vector);
+                              std::string& sql_str);
     Status _read_wal_header(const std::string& wal, std::string& columns);
 
 private:
     ExecEnv* _exec_env;
     int64_t _db_id;
     int64_t _table_id;
-    // TODO the stop is not used?
-    std::atomic<bool> _stop;
     std::shared_ptr<HttpStreamAction> _http_stream_action;
     mutable std::mutex _replay_wal_lock;
     // key is wal_path
     std::map<std::string, std::shared_ptr<WalInfo>> _replay_wal_map;
     std::list<std::shared_ptr<WalInfo>> _replaying_queue;
-    // TODO should not use this map
-    std::map<int64_t, std::shared_ptr<ColumnInfo>> _column_id_info_map;
 };
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp
index 25f7cc0870b..8edc6b3b860 100644
--- a/be/src/olap/wal/wal_writer.cpp
+++ b/be/src/olap/wal/wal_writer.cpp
@@ -35,7 +35,15 @@ WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
 WalWriter::~WalWriter() {}
 
 Status WalWriter::init() {
+    io::Path wal_path = _file_name;
+    auto parent_path = wal_path.parent_path();
+    bool exists = false;
+    RETURN_IF_ERROR(io::global_local_filesystem()->exists(parent_path, &exists));
+    if (!exists) {
+        RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(parent_path));
+    }
     RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_name, &_file_writer));
+    LOG(INFO) << "create wal " << _file_name;
     return Status::OK();
 }
 
diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp
index 2838ebbed51..fc4a2df427f 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -297,8 +297,6 @@ Status GroupCommitTable::_create_group_commit_load(
         std::unique_lock l(_lock);
         _load_block_queues.emplace(instance_id, load_block_queue);
         _need_plan_fragment = false;
-        _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
-                                                   WalManager::WalStatus::PREPARE);
         //create wal
         if (!is_pipeline) {
             RETURN_IF_ERROR(load_block_queue->create_wal(
@@ -388,15 +386,16 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
     if (status.ok() && st.ok() &&
         (result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
         if (!config::group_commit_wait_replay_wal_finish) {
-            RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(
-                    txn_id, load_block_queue->block_queue_pre_allocated()));
-            RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id));
+            auto delete_st = _exec_env->wal_mgr()->delete_wal(
+                    table_id, txn_id, load_block_queue->block_queue_pre_allocated());
+            if (!delete_st.ok()) {
+                LOG(WARNING) << "fail to delete wal " << txn_id;
+            }
         }
     } else {
         std::string wal_path;
         RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
         RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path));
-        _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, WalManager::WalStatus::REPLAY);
     }
     std::stringstream ss;
     ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 4943c0177ea..18a8325e4cb 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2047,7 +2047,9 @@ void PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* c
     bool ret = _light_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
         Status st = Status::OK();
-        st = _exec_env->wal_mgr()->get_wal_status_queue_size(request, response);
+        auto table_id = request->table_id();
+        auto count = _exec_env->wal_mgr()->get_wal_queue_size(table_id);
+        response->set_size(count);
         response->mutable_status()->set_status_code(st.code());
     });
     if (!ret) {
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp
index 7344b4cbd9b..98bb35c4bc9 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -18,6 +18,7 @@
 #include "wal_reader.h"
 
 #include "common/logging.h"
+#include "gutil/strings/split.h"
 #include "olap/wal/wal_manager.h"
 #include "runtime/runtime_state.h"
 #include "vec/data_types/data_type_string.h"
@@ -33,9 +34,11 @@ WalReader::~WalReader() {
     }
 }
 
-Status WalReader::init_reader() {
+Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
+    _tuple_descriptor = tuple_descriptor;
     RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path));
-    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader));
+    _wal_reader = std::make_shared<doris::WalReader>(_wal_path);
+    RETURN_IF_ERROR(_wal_reader->init());
     return Status::OK();
 }
 
@@ -59,14 +62,16 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
     vectorized::Block dst_block;
     int index = 0;
     auto columns = block->get_columns_with_type_and_name();
-    for (auto column : columns) {
-        auto pos = _column_index[index];
+    CHECK(columns.size() == _tuple_descriptor->slots().size());
+    for (auto slot_desc : _tuple_descriptor->slots()) {
+        auto pos = _column_pos_map[slot_desc->col_unique_id()];
         vectorized::ColumnPtr column_ptr = src_block.get_by_position(pos).column;
-        if (column_ptr != nullptr && column.column->is_nullable()) {
+        if (column_ptr != nullptr && slot_desc->is_nullable()) {
             column_ptr = make_nullable(column_ptr);
         }
-        dst_block.insert(index, vectorized::ColumnWithTypeAndName(std::move(column_ptr),
-                                                                  column.type, column.name));
+        dst_block.insert(
+                index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type,
+                                                         columns[index].name));
         index++;
     }
     block->swap(dst_block);
@@ -75,24 +80,22 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
     return Status::OK();
 }
 
-void WalReader::string_split(const std::string& str, const std::string& splits,
-                             std::vector<std::string>& res) {
-    if (str == "") return;
-    std::string strs = str + splits;
-    size_t pos = strs.find(splits);
-    int step = splits.size();
-    while (pos != strs.npos) {
-        std::string temp = strs.substr(0, pos);
-        res.push_back(temp);
-        strs = strs.substr(pos + step, strs.size());
-        pos = strs.find(splits);
-    }
-}
-
 Status WalReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                               std::unordered_set<std::string>* missing_cols) {
-    RETURN_IF_ERROR(_wal_reader->read_header(_version, _col_ids));
-    RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_column_index(_wal_id, _column_index));
+    std::string col_ids;
+    RETURN_IF_ERROR(_wal_reader->read_header(col_ids));
+    std::vector<std::string> column_id_vector =
+            strings::Split(col_ids, ",", strings::SkipWhitespace());
+    try {
+        int64_t pos = 0;
+        for (auto col_id_str : column_id_vector) {
+            auto col_id = std::strtoll(col_id_str.c_str(), NULL, 10);
+            _column_pos_map.emplace(col_id, pos);
+            pos++;
+        }
+    } catch (const std::invalid_argument& e) {
+        return Status::InvalidArgument("Invalid format, {}", e.what());
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h
index 78f2c31d925..e65af10016a 100644
--- a/be/src/vec/exec/format/wal/wal_reader.h
+++ b/be/src/vec/exec/format/wal/wal_reader.h
@@ -17,6 +17,7 @@
 
 #pragma once
 #include "olap/wal/wal_reader.h"
+#include "runtime/descriptors.h"
 #include "vec/exec/format/generic_reader.h"
 
 namespace doris {
@@ -26,23 +27,19 @@ class WalReader : public GenericReader {
 public:
     WalReader(RuntimeState* state);
     ~WalReader() override;
-    Status init_reader();
+    Status init_reader(const TupleDescriptor* tuple_descriptor);
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
-    // TODO move it
-    static void string_split(const std::string& str, const std::string& splits,
-                             std::vector<std::string>& res);
 
 private:
     RuntimeState* _state = nullptr;
     int64_t _wal_id;
     std::string _wal_path;
-    std::shared_ptr<doris::WalReader> _wal_reader;
-    // TODO version should in olap/wal_reader
-    uint32_t _version = 0;
-    std::string _col_ids;
-    std::vector<size_t> _column_index;
+    std::shared_ptr<doris::WalReader> _wal_reader = nullptr;
+    const TupleDescriptor* _tuple_descriptor = nullptr;
+    // column_id, column_pos
+    std::map<int64_t, int64_t> _column_pos_map;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index c70295e1313..57f079d77af 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -882,7 +882,7 @@ Status VFileScanner::_get_next_reader() {
         }
         case TFileFormatType::FORMAT_WAL: {
             _cur_reader.reset(new WalReader(_state));
-            init_status = ((WalReader*)(_cur_reader.get()))->init_reader();
+            init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
             break;
         }
         case TFileFormatType::FORMAT_ARROW: {
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp
index 2dfd32c14f8..c19f56fd079 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -38,8 +38,6 @@ VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
 VWalWriter::~VWalWriter() {}
 
 Status VWalWriter::init() {
-    RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
-    _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WalStatus::CREATE);
 #ifndef BE_TEST
     if (config::group_commit_wait_replay_wal_finish) {
         std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
@@ -50,6 +48,8 @@ Status VWalWriter::init() {
         }
     }
 #endif
+    RETURN_IF_ERROR(_create_wal_writer(_wal_id, _wal_writer));
+    _wal_manager->add_wal_queue(_tb_id, _wal_id);
     std::stringstream ss;
     for (auto slot_desc : _slot_descs) {
         if (slot_desc.col_unique_id < 0) {
@@ -84,5 +84,13 @@ Status VWalWriter::close() {
     }
     return Status::OK();
 }
+
+Status VWalWriter::_create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) {
+    std::string wal_path;
+    RETURN_IF_ERROR(_wal_manager->get_wal_path(wal_id, wal_path));
+    wal_writer = std::make_shared<WalWriter>(wal_path);
+    RETURN_IF_ERROR(wal_writer->init());
+    return Status::OK();
+}
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h
index cbd369f7cbf..f22250cb5d4 100644
--- a/be/src/vec/sink/writer/vwal_writer.h
+++ b/be/src/vec/sink/writer/vwal_writer.h
@@ -37,6 +37,9 @@ public:
     Status write_wal(vectorized::Block* block);
     Status close();
 
+private:
+    Status _create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer);
+
 private:
     int64_t _db_id;
     int64_t _tb_id;
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp
index cf9e1733643..6f983ca5bf4 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -226,7 +226,6 @@ TEST_F(VWalScannerTest, normal) {
     index_vector.emplace_back(0);
     index_vector.emplace_back(1);
     index_vector.emplace_back(2);
-    _env->_wal_manager->add_wal_column_index(txn_id, index_vector);
     //    config::group_commit_replay_wal_dir = wal_dir;
     NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
     scan_node._output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 9e7174752a8..d40020fd57e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -537,9 +537,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             }
             return;
         }
-        long maxWalId = Env.getCurrentGlobalTransactionMgr()
-                .getTransactionIDGenerator().getNextTransactionId();
-        waitWalFinished(maxWalId);
+        waitWalFinished();
         /*
          * all tasks are finished. check the integrity.
          * we just check whether all new replicas are healthy.
@@ -602,20 +600,22 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
     }
 
-    private void waitWalFinished(long maxWalId) {
+    private void waitWalFinished() {
         // wait wal done here
         Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK);
         LOG.info("block table {}", tableId);
         List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
         long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
-        boolean walFinished = false;
-        while (System.currentTimeMillis() < expireTime) {
+        while (true) {
             LOG.info("wai for wal queue size to be empty");
-            walFinished = Env.getCurrentEnv().getGroupCommitManager()
-                    .isPreviousWalFinished(tableId, maxWalId, aliveBeIds);
+            boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
+                    .isPreviousWalFinished(tableId, aliveBeIds);
             if (walFinished) {
                 LOG.info("all wal is finished");
                 break;
+            } else if (System.currentTimeMillis() > expireTime) {
+                LOG.warn("waitWalFinished time out");
+                break;
             } else {
                 try {
                     Thread.sleep(100);
@@ -624,9 +624,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 }
             }
         }
-        if (!walFinished) {
-            LOG.warn("waitWalFinished time out");
-        }
         Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL);
         LOG.info("release table {}", tableId);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index ec4e01a0b4d..3b9719b2594 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -60,7 +60,7 @@ public class GroupCommitManager {
     /**
      * Check the wal before the endTransactionId is finished or not.
      */
-    public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) {
+    public boolean isPreviousWalFinished(long tableId, List<Long> aliveBeIds) {
         boolean empty = true;
         for (int i = 0; i < aliveBeIds.size(); i++) {
             Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
@@ -70,9 +70,8 @@ public class GroupCommitManager {
             }
             PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
                     .setTableId(tableId)
-                    .setTxnId(endTransactionId)
                     .build();
-            long size = getWallQueueSize(backend, request);
+            long size = getWalQueueSize(backend, request);
             if (size > 0) {
                 LOG.info("backend id:" + backend.getId() + ",wal size:" + size);
                 empty = false;
@@ -84,16 +83,15 @@ public class GroupCommitManager {
     public long getAllWalQueueSize(Backend backend) {
         PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
                 .setTableId(-1)
-                .setTxnId(-1)
                 .build();
-        long size = getWallQueueSize(backend, request);
+        long size = getWalQueueSize(backend, request);
         if (size > 0) {
             LOG.info("backend id:" + backend.getId() + ",all wal size:" + size);
         }
         return size;
     }
 
-    public long getWallQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
+    public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
         PGetWalQueueSizeResponse response = null;
         long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
         long size = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 5fe89a07f13..09de78a5e68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -330,8 +330,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
             List<Column> fileColumns = new ArrayList<>();
             Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
             List<Column> tableColumns = table.getBaseSchema(true);
-            for (int i = 1; i <= tableColumns.size(); i++) {
-                fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
+            for (int i = 0; i < tableColumns.size(); i++) {
+                Column column = new Column(tableColumns.get(i).getName(), tableColumns.get(i).getType(), true);
+                column.setUniqueId(tableColumns.get(i).getUniqueId());
+                fileColumns.add(column);
             }
             return fileColumns;
         }
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 351bf5caf82..c91a4865ca7 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -804,7 +804,6 @@ message PStreamHeader {
 
 message PGetWalQueueSizeRequest{
     optional int64 table_id = 1;
-    optional int64 txn_id = 2;
 }
 
 message PGetWalQueueSizeResponse{


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org