You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/07 10:54:48 UTC
(doris) branch master updated: [enhancement](group_commit) refector wal manager code (#29560)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0b731800a09 [enhancement](group_commit) refector wal manager code (#29560)
0b731800a09 is described below
commit 0b731800a09e6761452f456b30c2061a75ad911d
Author: huanghaibin <28...@qq.com>
AuthorDate: Sun Jan 7 18:54:41 2024 +0800
[enhancement](group_commit) refector wal manager code (#29560)
---
be/src/olap/wal/wal_manager.cpp | 216 ++++++++-------------
be/src/olap/wal/wal_manager.h | 45 ++---
be/src/olap/wal/wal_reader.cpp | 4 +-
be/src/olap/wal/wal_reader.h | 3 +-
be/src/olap/wal/wal_table.cpp | 123 ++++--------
be/src/olap/wal/wal_table.h | 18 +-
be/src/olap/wal/wal_writer.cpp | 8 +
be/src/runtime/group_commit_mgr.cpp | 11 +-
be/src/service/internal_service.cpp | 4 +-
be/src/vec/exec/format/wal/wal_reader.cpp | 49 ++---
be/src/vec/exec/format/wal/wal_reader.h | 15 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 2 +-
be/src/vec/sink/writer/vwal_writer.cpp | 12 +-
be/src/vec/sink/writer/vwal_writer.h | 3 +
be/test/vec/exec/vwal_scanner_test.cpp | 1 -
.../org/apache/doris/alter/SchemaChangeJobV2.java | 19 +-
.../org/apache/doris/load/GroupCommitManager.java | 10 +-
.../ExternalFileTableValuedFunction.java | 6 +-
gensrc/proto/internal_service.proto | 1 -
19 files changed, 217 insertions(+), 333 deletions(-)
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 19b92c38594..b1931f62a62 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -28,9 +28,9 @@
#include "common/config.h"
#include "common/status.h"
+#include "gutil/strings/split.h"
#include "io/fs/local_file_system.h"
#include "olap/wal/wal_dirs_info.h"
-#include "olap/wal/wal_writer.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
@@ -40,7 +40,7 @@
namespace doris {
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
: _exec_env(exec_env), _stop(false), _stop_background_threads_latch(1) {
- doris::vectorized::WalReader::string_split(wal_dir_list, ";", _wal_dirs);
+ _wal_dirs = strings::Split(wal_dir_list, ";", strings::SkipWhitespace());
static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
.set_min_threads(1)
.set_max_threads(config::group_commit_relay_wal_threads)
@@ -109,7 +109,7 @@ Status WalManager::_init_wal_dirs_conf() {
Status WalManager::_init_wal_dirs() {
bool exists = false;
for (auto wal_dir : _wal_dirs) {
- std::string tmp_dir = wal_dir + "/" + tmp;
+ std::string tmp_dir = wal_dir + "/" + _tmp;
LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists));
if (!exists) {
@@ -160,76 +160,50 @@ Status WalManager::_init_wal_dirs_info() {
&_update_wal_dirs_info_thread);
}
-void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) {
- std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+void WalManager::add_wal_queue(int64_t table_id, int64_t wal_id) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
LOG(INFO) << "add wal queue "
- << ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status;
- auto it = _wal_status_queues.find(table_id);
- if (it == _wal_status_queues.end()) {
- std::unordered_map<int64_t, WalStatus> tmp_map;
- tmp_map.emplace(wal_id, wal_status);
- _wal_status_queues.emplace(table_id, tmp_map);
+ << ",table_id:" << table_id << ",wal_id:" << wal_id;
+ auto it = _wal_queues.find(table_id);
+ if (it == _wal_queues.end()) {
+ std::set<int64_t> tmp_set;
+ tmp_set.insert(wal_id);
+ _wal_queues.emplace(table_id, tmp_set);
} else {
- it->second.emplace(wal_id, wal_status);
+ it->second.insert(wal_id);
}
}
-Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) {
- std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
- auto it = _wal_status_queues.find(table_id);
- LOG(INFO) << "remove wal queue "
- << ",table_id:" << table_id << ",wal_id:" << wal_id;
- if (it == _wal_status_queues.end()) {
- return Status::InternalError("table_id " + std::to_string(table_id) +
- " not found in wal status queue");
- } else {
+void WalManager::erase_wal_queue(int64_t table_id, int64_t wal_id) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
+ auto it = _wal_queues.find(table_id);
+ if (it != _wal_queues.end()) {
+ LOG(INFO) << "remove wal queue "
+ << ",table_id:" << table_id << ",wal_id:" << wal_id;
it->second.erase(wal_id);
if (it->second.empty()) {
- _wal_status_queues.erase(table_id);
+ _wal_queues.erase(table_id);
}
}
- return Status::OK();
}
-Status WalManager::get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
- PGetWalQueueSizeResponse* response) {
- std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+size_t WalManager::get_wal_queue_size(int64_t table_id) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_queue_lock);
size_t count = 0;
- auto table_id = request->table_id();
- auto txn_id = request->txn_id();
- if (table_id > 0 && txn_id > 0) {
- auto it = _wal_status_queues.find(table_id);
- if (it == _wal_status_queues.end()) {
- LOG(INFO) << ("table_id " + std::to_string(table_id) +
- " not found in wal status queue");
+ if (table_id > 0) {
+ auto it = _wal_queues.find(table_id);
+ if (it != _wal_queues.end()) {
+ return it->second.size();
} else {
- for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) {
- if (wal_it->first <= txn_id) {
- count += 1;
- }
- }
+ return 0;
}
} else {
- for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); it++) {
+ //table_id is -1 meaning get all table wal size
+ for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) {
count += it->second.size();
}
}
- response->set_size(count);
- if (count > 0) {
- print_wal_status_queue();
- }
- return Status::OK();
-}
-
-void WalManager::print_wal_status_queue() {
- std::stringstream ss;
- for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); ++it) {
- ss << "table_id:" << it->first << std::endl;
- for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) {
- ss << "wal_id:" << wal_it->first << ",status:" << wal_it->second << std::endl;
- }
- }
- LOG(INFO) << ss.str();
+ return count;
}
Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
@@ -239,7 +213,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_
ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/"
<< std::to_string(wal_id) << "_" << label;
{
- std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
auto it = _wal_path_map.find(wal_id);
if (it != _wal_path_map.end()) {
return Status::InternalError("wal_id {} already in wal_path_map", wal_id);
@@ -250,7 +224,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_
}
Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
- std::shared_lock rdlock(_wal_lock);
+ std::shared_lock rdlock(_wal_path_lock);
auto it = _wal_path_map.find(wal_id);
if (it != _wal_path_map.end()) {
wal_path = _wal_path_map[wal_id];
@@ -260,35 +234,6 @@ Status WalManager::get_wal_path(int64_t wal_id, std::string& wal_path) {
return Status::OK();
}
-Status WalManager::create_wal_reader(const std::string& wal_path,
- std::shared_ptr<WalReader>& wal_reader) {
- wal_reader = std::make_shared<WalReader>(wal_path);
- RETURN_IF_ERROR(wal_reader->init());
- return Status::OK();
-}
-
-Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) {
- std::string wal_path;
- RETURN_IF_ERROR(get_wal_path(wal_id, wal_path));
- // TODO move the create_dir into wal_writer::init
- std::vector<std::string> path_element;
- doris::vectorized::WalReader::string_split(wal_path, "/", path_element);
- std::stringstream ss;
- for (int i = 0; i < path_element.size() - 1; i++) {
- ss << path_element[i] << "/";
- }
- std::string base_path = ss.str();
- bool exists = false;
- RETURN_IF_ERROR(io::global_local_filesystem()->exists(base_path, &exists));
- if (!exists) {
- RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(base_path));
- }
- LOG(INFO) << "create wal " << wal_path;
- wal_writer = std::make_shared<WalWriter>(wal_path);
- RETURN_IF_ERROR(wal_writer->init());
- return Status::OK();
-}
-
Status WalManager::_scan_wals(const std::string& wal_path) {
size_t count = 0;
bool exists = true;
@@ -299,7 +244,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
return st;
}
for (const auto& database_id : dbs) {
- if (database_id.is_file || database_id.file_name == tmp) {
+ if (database_id.is_file || database_id.file_name == _tmp) {
continue;
}
std::vector<io::FileInfo> tables;
@@ -329,7 +274,7 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
auto wal_file = table_path + "/" + wal.file_name;
res.emplace_back(wal_file);
{
- std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+ std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
auto pos = wal.file_name.find("_");
try {
int64_t wal_id =
@@ -337,7 +282,6 @@ Status WalManager::_scan_wals(const std::string& wal_path) {
_wal_path_map.emplace(wal_id, wal_file);
int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
- add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY);
if (config::group_commit_wait_replay_wal_finish) {
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable> cv =
@@ -396,6 +340,7 @@ Status WalManager::_replay() {
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
std::string wal) {
+ add_wal_queue(table_id, wal_id);
std::lock_guard<std::shared_mutex> wrlock(_table_lock);
std::shared_ptr<WalTable> table_ptr;
auto it = _table_map.find(table_id);
@@ -423,23 +368,6 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {
}
}
-Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) {
- std::string wal_path;
- {
- std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
- auto it = _wal_path_map.find(wal_id);
- if (it != _wal_path_map.end()) {
- wal_path = it->second;
- RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
- LOG(INFO) << "delete file=" << wal_path;
- _wal_path_map.erase(wal_id);
- }
- }
- RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
- block_queue_pre_allocated));
- return Status::OK();
-}
-
void WalManager::_stop_relay_wal() {
std::lock_guard<std::shared_mutex> wrlock(_table_lock);
for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
@@ -447,32 +375,6 @@ void WalManager::_stop_relay_wal() {
}
}
-void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index) {
- std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
- _wal_column_id_map.emplace(wal_id, column_index);
- LOG(INFO) << "add " << wal_id << " to wal_column_id_map";
-}
-
-void WalManager::erase_wal_column_index(int64_t wal_id) {
- std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
- if (_wal_column_id_map.erase(wal_id)) {
- LOG(INFO) << "erase " << wal_id << " from wal_column_id_map";
- } else {
- LOG(WARNING) << "fail to erase wal " << wal_id << " from wal_column_id_map";
- }
-}
-
-Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index) {
- std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
- auto it = _wal_column_id_map.find(wal_id);
- if (it != _wal_column_id_map.end()) {
- column_index = it->second;
- } else {
- return Status::InternalError("cannot find wal {} in wal_column_id_map", wal_id);
- }
- return Status::OK();
-}
-
size_t WalManager::get_max_available_size() {
return _wal_dirs_info->get_max_available_size();
}
@@ -580,4 +482,54 @@ Status WalManager::get_lock_and_cv(int64_t wal_id, std::shared_ptr<std::mutex>&
return Status::OK();
}
+Status WalManager::delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated) {
+ std::string wal_path;
+ {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
+ auto it = _wal_path_map.find(wal_id);
+ if (it != _wal_path_map.end()) {
+ wal_path = it->second;
+ auto st = io::global_local_filesystem()->delete_file(wal_path);
+ if (st.ok()) {
+ LOG(INFO) << "delete file=" << wal_path;
+ } else {
+ LOG(WARNING) << "fail to delete file=" << wal_path;
+ }
+ _wal_path_map.erase(wal_id);
+ }
+ }
+ erase_wal_queue(table_id, wal_id);
+ RETURN_IF_ERROR(update_wal_dir_pre_allocated(_get_base_wal_path(wal_path), 0,
+ block_queue_pre_allocated));
+ return Status::OK();
+}
+
+Status WalManager::rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id) {
+ io::Path wal_path = wal;
+ std::list<std::string> path_element;
+ for (int i = 0; i < 3; ++i) {
+ if (!wal_path.has_parent_path()) {
+ return Status::InternalError("parent path is not enough when rename " + wal);
+ }
+ path_element.push_front(wal_path.filename().string());
+ wal_path = wal_path.parent_path();
+ }
+ wal_path.append(_tmp);
+ for (auto path : path_element) {
+ wal_path.append(path);
+ }
+ bool exists = false;
+ RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists));
+ if (!exists) {
+ RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+ }
+ auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+ if (res < 0) {
+ return Status::InternalError("rename fail on path " + wal);
+ }
+ LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+ erase_wal_queue(table_id, wal_id);
+ return Status::OK();
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h
index b0e4d1d1960..44fdef2e6c0 100644
--- a/be/src/olap/wal/wal_manager.h
+++ b/be/src/olap/wal/wal_manager.h
@@ -24,6 +24,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
+#include <set>
#include <shared_mutex>
#include <thread>
#include <unordered_map>
@@ -46,13 +47,6 @@ namespace doris {
class WalManager {
ENABLE_FACTORY_CREATOR(WalManager);
-public:
- enum WalStatus {
- PREPARE = 0,
- REPLAY = 1,
- CREATE = 2,
- };
-
public:
WalManager(ExecEnv* exec_env, const std::string& wal_dir);
~WalManager();
@@ -72,25 +66,14 @@ public:
Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
const std::string& label, std::string& base_path);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
- Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
+ Status delete_wal(int64_t table_id, int64_t wal_id, size_t block_queue_pre_allocated = 0);
+ Status rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t wal_id);
Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal);
- // used for ut
+ void add_wal_queue(int64_t table_id, int64_t wal_id);
+ void erase_wal_queue(int64_t table_id, int64_t wal_id);
+ size_t get_wal_queue_size(int64_t table_id);
+ // fot ut
size_t get_wal_table_size(int64_t table_id);
- // TODO util function, should remove
- Status create_wal_reader(const std::string& wal_path, std::shared_ptr<WalReader>& wal_reader);
- Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer);
-
- // for wal status, can be removed
- Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
- PGetWalQueueSizeResponse* response);
- void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status);
- Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
- void print_wal_status_queue();
-
- // for _wal_column_id_map, can be removed
- void add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
- void erase_wal_column_index(int64_t wal_id);
- Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
//for test relay
Status add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> lock,
@@ -118,12 +101,11 @@ public:
// used for be ut
size_t wal_limit_test_bytes;
- const std::string tmp = "tmp";
-
private:
ExecEnv* _exec_env = nullptr;
std::atomic<bool> _stop;
CountDownLatch _stop_background_threads_latch;
+ const std::string _tmp = "tmp";
// wal back pressure
std::vector<std::string> _wal_dirs;
@@ -137,16 +119,11 @@ private:
std::shared_mutex _table_lock;
std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
- std::shared_mutex _wal_lock;
+ std::shared_mutex _wal_path_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
- // TODO Now only used for debug wal status, consider remove it
- std::shared_mutex _wal_status_lock;
- std::unordered_map<int64_t, std::unordered_map<int64_t, WalStatus>> _wal_status_queues;
-
- // TODO should remove
- std::shared_mutex _wal_column_id_map_lock;
- std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
+ std::shared_mutex _wal_queue_lock;
+ std::unordered_map<int64_t, std::set<int64_t>> _wal_queues;
// for test relay
// <lock, condition_variable>
diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp
index 75bbaf10947..36a4c15aa2b 100644
--- a/be/src/olap/wal/wal_reader.cpp
+++ b/be/src/olap/wal/wal_reader.cpp
@@ -70,7 +70,7 @@ Status WalReader::read_block(PBlock& block) {
return Status::OK();
}
-Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
+Status WalReader::read_header(std::string& col_ids) {
size_t bytes_read = 0;
std::string magic_str;
magic_str.resize(k_wal_magic_length);
@@ -83,7 +83,7 @@ Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
RETURN_IF_ERROR(
file_reader->read_at(_offset, {version_buf, WalWriter::VERSION_SIZE}, &bytes_read));
_offset += WalWriter::VERSION_SIZE;
- version = decode_fixed32_le(version_buf);
+ _version = decode_fixed32_le(version_buf);
uint8_t len_buf[WalWriter::LENGTH_SIZE];
RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, WalWriter::LENGTH_SIZE}, &bytes_read));
_offset += WalWriter::LENGTH_SIZE;
diff --git a/be/src/olap/wal/wal_reader.h b/be/src/olap/wal/wal_reader.h
index f68a031c09b..7c852ee18f6 100644
--- a/be/src/olap/wal/wal_reader.h
+++ b/be/src/olap/wal/wal_reader.h
@@ -32,7 +32,7 @@ public:
Status finalize();
Status read_block(PBlock& block);
- Status read_header(uint32_t& version, std::string& col_ids);
+ Status read_header(std::string& col_ids);
private:
Status _deserialize(PBlock& block, std::string& buf);
@@ -40,6 +40,7 @@ private:
private:
std::string _file_name;
+ uint32_t _version = 0;
size_t _offset;
io::FileReaderSPtr file_reader;
};
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 5187e594adc..54500273daa 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -19,6 +19,7 @@
#include <thrift/protocol/TDebugProtocol.h>
+#include "gutil/strings/split.h"
#include "http/action/http_stream.h"
#include "http/action/stream_load.h"
#include "http/ev_http_server.h"
@@ -33,12 +34,11 @@
#include "runtime/plan_fragment_executor.h"
#include "util/path_util.h"
#include "util/thrift_rpc_helper.h"
-#include "vec/exec/format/wal/wal_reader.h"
namespace doris {
WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
- : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {
+ : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {
_http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
}
WalTable::~WalTable() {}
@@ -63,7 +63,8 @@ void WalTable::_pick_relay_wals() {
if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id
<< ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num();
- auto st = _rename_to_tmp_path(it->first);
+ auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first, _table_id,
+ wal_info->get_wal_id());
if (!st.ok()) {
LOG(WARNING) << "rename " << it->first << " fail"
<< ",st:" << st.to_string();
@@ -124,7 +125,7 @@ Status WalTable::_relay_wal_one_by_one() {
}
}
for (auto delete_wal_info : need_delete_wals) {
- auto st = _delete_wal(delete_wal_info->get_wal_id());
+ auto st = _exec_env->wal_mgr()->delete_wal(_table_id, delete_wal_info->get_wal_id());
if (!st.ok()) {
LOG(WARNING) << "fail to delete wal " << delete_wal_info->get_wal_path();
}
@@ -154,33 +155,6 @@ Status WalTable::replay_wals() {
return Status::OK();
}
-Status WalTable::_rename_to_tmp_path(const std::string wal) {
- io::Path wal_path = wal;
- std::list<std::string> path_element;
- for (int i = 0; i < 3; ++i) {
- if (!wal_path.has_parent_path()) {
- return Status::InternalError("parent path is not enough when rename " + wal);
- }
- path_element.push_front(wal_path.filename().string());
- wal_path = wal_path.parent_path();
- }
- wal_path.append(_exec_env->wal_mgr()->tmp);
- for (auto path : path_element) {
- wal_path.append(path);
- }
- bool exists = false;
- RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists));
- if (!exists) {
- RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
- }
- auto res = std::rename(wal.c_str(), wal_path.string().c_str());
- if (res < 0) {
- return Status::InternalError("rename fail on path " + wal);
- }
- LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
- return Status::OK();
-}
-
bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
if (config::group_commit_wait_replay_wal_finish) {
return true;
@@ -217,10 +191,9 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
Status WalTable::_replay_wal_internal(const std::string& wal) {
LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal;
- std::shared_ptr<std::pair<int64_t, std::string>> pair = nullptr;
- RETURN_IF_ERROR(_parse_wal_path(wal, pair));
- auto wal_id = pair->first;
- auto label = pair->second;
+ int64_t wal_id = 0;
+ std::string label = "";
+ RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
#ifndef BE_TEST
if (!config::group_commit_wait_replay_wal_finish) {
auto st = _try_abort_txn(_db_id, wal_id);
@@ -228,22 +201,18 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
LOG(WARNING) << "abort txn " << wal_id << " fail";
}
}
- RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
#endif
RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label));
return Status::OK();
}
-Status WalTable::_parse_wal_path(const std::string& wal,
- std::shared_ptr<std::pair<int64_t, std::string>>& pair) {
- std::vector<std::string> path_element;
- doris::vectorized::WalReader::string_split(wal, "/", path_element);
- auto pos = path_element[path_element.size() - 1].find("_");
+Status WalTable::_parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label) {
+ io::Path wal_path = wal;
+ auto file_name = wal_path.filename().string();
+ auto pos = file_name.find("_");
try {
- int64_t wal_id = std::strtoll(path_element[path_element.size() - 1].substr(0, pos).c_str(),
- NULL, 10);
- auto label = path_element[path_element.size() - 1].substr(pos + 1);
- pair = std::make_shared<std::pair<int64_t, std::string>>(std::make_pair(wal_id, label));
+ wal_id = std::strtoll(file_name.substr(0, pos).c_str(), NULL, 10);
+ label = file_name.substr(pos + 1);
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
@@ -251,33 +220,29 @@ Status WalTable::_parse_wal_path(const std::string& wal,
}
Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label,
- std::string& sql_str, std::vector<size_t>& index_vector) {
+ std::string& sql_str) {
std::string columns;
RETURN_IF_ERROR(_read_wal_header(wal, columns));
- std::vector<std::string> column_id_element;
- doris::vectorized::WalReader::string_split(columns, ",", column_id_element);
+ std::vector<std::string> column_id_vector =
+ strings::Split(columns, ",", strings::SkipWhitespace());
+ std::map<int64_t, std::string> column_info_map;
+ RETURN_IF_ERROR(_get_column_info(_db_id, _table_id, column_info_map));
std::stringstream ss_name;
- std::stringstream ss_id;
- int index_raw = 0;
- for (auto column_id_str : column_id_element) {
+ for (auto column_id_str : column_id_vector) {
try {
int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
- auto it = _column_id_info_map.find(column_id);
- if (it != _column_id_info_map.end()) {
- ss_name << "`" << it->second->first << "`,";
- ss_id << "c" << std::to_string(it->second->second) << ",";
- index_vector.emplace_back(index_raw);
+ auto it = column_info_map.find(column_id);
+ if (it != column_info_map.end()) {
+ ss_name << "`" << it->second << "`,";
}
- index_raw++;
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
- auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
std::stringstream ss;
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " ("
- << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
+ << name << ") select " << name << " from http_stream(\"format\" = \"wal\", \"table_id\" = \""
<< std::to_string(_table_id) << "\")";
sql_str = ss.str().data();
return Status::OK();
@@ -286,9 +251,7 @@ Status WalTable::_construct_sql_str(const std::string& wal, const std::string& l
Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
const std::string& label) {
std::string sql_str;
- std::vector<size_t> index_vector;
- RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector));
- _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
+ RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str));
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
ctx->sql_str = sql_str;
ctx->wal_id = wal_id;
@@ -304,13 +267,13 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
st = commit_st;
} else if (!ctx->status.ok()) {
- LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
- << ", errmsg=" << ctx->status;
- _exec_env->stream_load_executor()->rollback_txn(ctx.get());
st = ctx->status;
}
}
- _exec_env->wal_mgr()->erase_wal_column_index(wal_id);
+ if (!st.ok()) {
+ LOG(WARNING) << "handle streaming load failed, id=" << ctx->id << ", errmsg=" << st;
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ }
LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string();
return st;
}
@@ -339,9 +302,6 @@ void WalTable::stop() {
do {
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
- if (!this->_stop.load()) {
- this->_stop.store(true);
- }
if (_replay_wal_map.empty() && _replaying_queue.empty()) {
break;
}
@@ -355,10 +315,11 @@ void WalTable::stop() {
size_t WalTable::size() {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
- return _replay_wal_map.size();
+ return _replay_wal_map.size() + _replaying_queue.size();
}
-Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) {
+Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id,
+ std::map<int64_t, std::string>& column_info_map) {
TGetColumnInfoRequest request;
request.__set_db_id(db_id);
request.__set_table_id(tb_id);
@@ -378,33 +339,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) {
return status;
}
std::vector<TColumnInfo> column_element = result.columns;
- int64_t column_index = 1;
- _column_id_info_map.clear();
for (auto column : column_element) {
auto column_name = column.column_name;
auto column_id = column.column_id;
- std::shared_ptr<ColumnInfo> column_pair =
- std::make_shared<ColumnInfo>(std::make_pair(column_name, column_index));
- _column_id_info_map.emplace(column_id, column_pair);
- column_index++;
+ column_info_map.emplace(column_id, column_name);
}
}
return status;
}
Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) {
- std::shared_ptr<doris::WalReader> wal_reader;
- RETURN_IF_ERROR(_exec_env->wal_mgr()->create_wal_reader(wal_path, wal_reader));
- uint32_t version = 0;
- RETURN_IF_ERROR(wal_reader->read_header(version, columns));
+ std::shared_ptr<doris::WalReader> wal_reader = std::make_shared<WalReader>(wal_path);
+ RETURN_IF_ERROR(wal_reader->init());
+ RETURN_IF_ERROR(wal_reader->read_header(columns));
RETURN_IF_ERROR(wal_reader->finalize());
return Status::OK();
}
-Status WalTable::_delete_wal(int64_t wal_id) {
- RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
- RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id));
- return Status::OK();
-}
-
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h
index 66ee4fd3721..07287d8f7e6 100644
--- a/be/src/olap/wal/wal_table.h
+++ b/be/src/olap/wal/wal_table.h
@@ -41,41 +41,31 @@ public:
void stop();
private:
- // <column_name, column_index>
- using ColumnInfo = std::pair<std::string, int64_t>;
-
void _pick_relay_wals();
bool _need_replay(std::shared_ptr<WalInfo>);
Status _relay_wal_one_by_one();
- Status _delete_wal(int64_t wal_id);
- Status _rename_to_tmp_path(const std::string wal);
Status _replay_wal_internal(const std::string& wal);
- // TODO change the param: (wal, int64_t* wal_id, std::string* label)
- Status _parse_wal_path(const std::string& wal,
- std::shared_ptr<std::pair<int64_t, std::string>>&);
+ Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label);
Status _try_abort_txn(int64_t db_id, int64_t wal_id);
- Status _get_column_info(int64_t db_id, int64_t tb_id);
+ Status _get_column_info(int64_t db_id, int64_t tb_id,
+ std::map<int64_t, std::string>& column_info_map);
Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal,
const std::string& label);
Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label);
Status _construct_sql_str(const std::string& wal, const std::string& label,
- std::string& sql_str, std::vector<size_t>& index_vector);
+ std::string& sql_str);
Status _read_wal_header(const std::string& wal, std::string& columns);
private:
ExecEnv* _exec_env;
int64_t _db_id;
int64_t _table_id;
- // TODO the stop is not used?
- std::atomic<bool> _stop;
std::shared_ptr<HttpStreamAction> _http_stream_action;
mutable std::mutex _replay_wal_lock;
// key is wal_path
std::map<std::string, std::shared_ptr<WalInfo>> _replay_wal_map;
std::list<std::shared_ptr<WalInfo>> _replaying_queue;
- // TODO should not use this map
- std::map<int64_t, std::shared_ptr<ColumnInfo>> _column_id_info_map;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp
index 25f7cc0870b..8edc6b3b860 100644
--- a/be/src/olap/wal/wal_writer.cpp
+++ b/be/src/olap/wal/wal_writer.cpp
@@ -35,7 +35,15 @@ WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
WalWriter::~WalWriter() {}
Status WalWriter::init() {
+ io::Path wal_path = _file_name;
+ auto parent_path = wal_path.parent_path();
+ bool exists = false;
+ RETURN_IF_ERROR(io::global_local_filesystem()->exists(parent_path, &exists));
+ if (!exists) {
+ RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(parent_path));
+ }
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_name, &_file_writer));
+ LOG(INFO) << "create wal " << _file_name;
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp
index 2838ebbed51..fc4a2df427f 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -297,8 +297,6 @@ Status GroupCommitTable::_create_group_commit_load(
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
- _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
- WalManager::WalStatus::PREPARE);
//create wal
if (!is_pipeline) {
RETURN_IF_ERROR(load_block_queue->create_wal(
@@ -388,15 +386,16 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
if (status.ok() && st.ok() &&
(result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
if (!config::group_commit_wait_replay_wal_finish) {
- RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(
- txn_id, load_block_queue->block_queue_pre_allocated()));
- RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id));
+ auto delete_st = _exec_env->wal_mgr()->delete_wal(
+ table_id, txn_id, load_block_queue->block_queue_pre_allocated());
+ if (!delete_st.ok()) {
+ LOG(WARNING) << "fail to delete wal " << txn_id;
+ }
}
} else {
std::string wal_path;
RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path));
- _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, WalManager::WalStatus::REPLAY);
}
std::stringstream ss;
ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 4943c0177ea..18a8325e4cb 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2047,7 +2047,9 @@ void PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* c
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
Status st = Status::OK();
- st = _exec_env->wal_mgr()->get_wal_status_queue_size(request, response);
+ auto table_id = request->table_id();
+ auto count = _exec_env->wal_mgr()->get_wal_queue_size(table_id);
+ response->set_size(count);
response->mutable_status()->set_status_code(st.code());
});
if (!ret) {
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp
index 7344b4cbd9b..98bb35c4bc9 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -18,6 +18,7 @@
#include "wal_reader.h"
#include "common/logging.h"
+#include "gutil/strings/split.h"
#include "olap/wal/wal_manager.h"
#include "runtime/runtime_state.h"
#include "vec/data_types/data_type_string.h"
@@ -33,9 +34,11 @@ WalReader::~WalReader() {
}
}
-Status WalReader::init_reader() {
+Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
+ _tuple_descriptor = tuple_descriptor;
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_path(_wal_id, _wal_path));
- RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_reader(_wal_path, _wal_reader));
+ _wal_reader = std::make_shared<doris::WalReader>(_wal_path);
+ RETURN_IF_ERROR(_wal_reader->init());
return Status::OK();
}
@@ -59,14 +62,16 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
vectorized::Block dst_block;
int index = 0;
auto columns = block->get_columns_with_type_and_name();
- for (auto column : columns) {
- auto pos = _column_index[index];
+ CHECK(columns.size() == _tuple_descriptor->slots().size());
+ for (auto slot_desc : _tuple_descriptor->slots()) {
+ auto pos = _column_pos_map[slot_desc->col_unique_id()];
vectorized::ColumnPtr column_ptr = src_block.get_by_position(pos).column;
- if (column_ptr != nullptr && column.column->is_nullable()) {
+ if (column_ptr != nullptr && slot_desc->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
- dst_block.insert(index, vectorized::ColumnWithTypeAndName(std::move(column_ptr),
- column.type, column.name));
+ dst_block.insert(
+ index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type,
+ columns[index].name));
index++;
}
block->swap(dst_block);
@@ -75,24 +80,22 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
-void WalReader::string_split(const std::string& str, const std::string& splits,
- std::vector<std::string>& res) {
- if (str == "") return;
- std::string strs = str + splits;
- size_t pos = strs.find(splits);
- int step = splits.size();
- while (pos != strs.npos) {
- std::string temp = strs.substr(0, pos);
- res.push_back(temp);
- strs = strs.substr(pos + step, strs.size());
- pos = strs.find(splits);
- }
-}
-
Status WalReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
- RETURN_IF_ERROR(_wal_reader->read_header(_version, _col_ids));
- RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_column_index(_wal_id, _column_index));
+ std::string col_ids;
+ RETURN_IF_ERROR(_wal_reader->read_header(col_ids));
+ std::vector<std::string> column_id_vector =
+ strings::Split(col_ids, ",", strings::SkipWhitespace());
+ try {
+ int64_t pos = 0;
+ for (auto col_id_str : column_id_vector) {
+ auto col_id = std::strtoll(col_id_str.c_str(), NULL, 10);
+ _column_pos_map.emplace(col_id, pos);
+ pos++;
+ }
+ } catch (const std::invalid_argument& e) {
+ return Status::InvalidArgument("Invalid format, {}", e.what());
+ }
return Status::OK();
}
diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h
index 78f2c31d925..e65af10016a 100644
--- a/be/src/vec/exec/format/wal/wal_reader.h
+++ b/be/src/vec/exec/format/wal/wal_reader.h
@@ -17,6 +17,7 @@
#pragma once
#include "olap/wal/wal_reader.h"
+#include "runtime/descriptors.h"
#include "vec/exec/format/generic_reader.h"
namespace doris {
@@ -26,23 +27,19 @@ class WalReader : public GenericReader {
public:
WalReader(RuntimeState* state);
~WalReader() override;
- Status init_reader();
+ Status init_reader(const TupleDescriptor* tuple_descriptor);
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
- // TODO move it
- static void string_split(const std::string& str, const std::string& splits,
- std::vector<std::string>& res);
private:
RuntimeState* _state = nullptr;
int64_t _wal_id;
std::string _wal_path;
- std::shared_ptr<doris::WalReader> _wal_reader;
- // TODO version should in olap/wal_reader
- uint32_t _version = 0;
- std::string _col_ids;
- std::vector<size_t> _column_index;
+ std::shared_ptr<doris::WalReader> _wal_reader = nullptr;
+ const TupleDescriptor* _tuple_descriptor = nullptr;
+ // column_id, column_pos
+ std::map<int64_t, int64_t> _column_pos_map;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index c70295e1313..57f079d77af 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -882,7 +882,7 @@ Status VFileScanner::_get_next_reader() {
}
case TFileFormatType::FORMAT_WAL: {
_cur_reader.reset(new WalReader(_state));
- init_status = ((WalReader*)(_cur_reader.get()))->init_reader();
+ init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
break;
}
case TFileFormatType::FORMAT_ARROW: {
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp
index 2dfd32c14f8..c19f56fd079 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -38,8 +38,6 @@ VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id,
VWalWriter::~VWalWriter() {}
Status VWalWriter::init() {
- RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
- _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WalStatus::CREATE);
#ifndef BE_TEST
if (config::group_commit_wait_replay_wal_finish) {
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
@@ -50,6 +48,8 @@ Status VWalWriter::init() {
}
}
#endif
+ RETURN_IF_ERROR(_create_wal_writer(_wal_id, _wal_writer));
+ _wal_manager->add_wal_queue(_tb_id, _wal_id);
std::stringstream ss;
for (auto slot_desc : _slot_descs) {
if (slot_desc.col_unique_id < 0) {
@@ -84,5 +84,13 @@ Status VWalWriter::close() {
}
return Status::OK();
}
+
+Status VWalWriter::_create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer) {
+ std::string wal_path;
+ RETURN_IF_ERROR(_wal_manager->get_wal_path(wal_id, wal_path));
+ wal_writer = std::make_shared<WalWriter>(wal_path);
+ RETURN_IF_ERROR(wal_writer->init());
+ return Status::OK();
+}
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h
index cbd369f7cbf..f22250cb5d4 100644
--- a/be/src/vec/sink/writer/vwal_writer.h
+++ b/be/src/vec/sink/writer/vwal_writer.h
@@ -37,6 +37,9 @@ public:
Status write_wal(vectorized::Block* block);
Status close();
+private:
+ Status _create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer);
+
private:
int64_t _db_id;
int64_t _tb_id;
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp
index cf9e1733643..6f983ca5bf4 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -226,7 +226,6 @@ TEST_F(VWalScannerTest, normal) {
index_vector.emplace_back(0);
index_vector.emplace_back(1);
index_vector.emplace_back(2);
- _env->_wal_manager->add_wal_column_index(txn_id, index_vector);
// config::group_commit_replay_wal_dir = wal_dir;
NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
scan_node._output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 9e7174752a8..d40020fd57e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -537,9 +537,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
return;
}
- long maxWalId = Env.getCurrentGlobalTransactionMgr()
- .getTransactionIDGenerator().getNextTransactionId();
- waitWalFinished(maxWalId);
+ waitWalFinished();
/*
* all tasks are finished. check the integrity.
* we just check whether all new replicas are healthy.
@@ -602,20 +600,22 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
}
- private void waitWalFinished(long maxWalId) {
+ private void waitWalFinished() {
// wait wal done here
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK);
LOG.info("block table {}", tableId);
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
- boolean walFinished = false;
- while (System.currentTimeMillis() < expireTime) {
+ while (true) {
LOG.info("wai for wal queue size to be empty");
- walFinished = Env.getCurrentEnv().getGroupCommitManager()
- .isPreviousWalFinished(tableId, maxWalId, aliveBeIds);
+ boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
+ .isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
LOG.info("all wal is finished");
break;
+ } else if (System.currentTimeMillis() > expireTime) {
+ LOG.warn("waitWalFinished time out");
+ break;
} else {
try {
Thread.sleep(100);
@@ -624,9 +624,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
}
}
- if (!walFinished) {
- LOG.warn("waitWalFinished time out");
- }
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL);
LOG.info("release table {}", tableId);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index ec4e01a0b4d..3b9719b2594 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -60,7 +60,7 @@ public class GroupCommitManager {
/**
* Check the wal before the endTransactionId is finished or not.
*/
- public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) {
+ public boolean isPreviousWalFinished(long tableId, List<Long> aliveBeIds) {
boolean empty = true;
for (int i = 0; i < aliveBeIds.size(); i++) {
Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
@@ -70,9 +70,8 @@ public class GroupCommitManager {
}
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(tableId)
- .setTxnId(endTransactionId)
.build();
- long size = getWallQueueSize(backend, request);
+ long size = getWalQueueSize(backend, request);
if (size > 0) {
LOG.info("backend id:" + backend.getId() + ",wal size:" + size);
empty = false;
@@ -84,16 +83,15 @@ public class GroupCommitManager {
public long getAllWalQueueSize(Backend backend) {
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(-1)
- .setTxnId(-1)
.build();
- long size = getWallQueueSize(backend, request);
+ long size = getWalQueueSize(backend, request);
if (size > 0) {
LOG.info("backend id:" + backend.getId() + ",all wal size:" + size);
}
return size;
}
- public long getWallQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
+ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
PGetWalQueueSizeResponse response = null;
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
long size = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 5fe89a07f13..09de78a5e68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -330,8 +330,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
List<Column> fileColumns = new ArrayList<>();
Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
List<Column> tableColumns = table.getBaseSchema(true);
- for (int i = 1; i <= tableColumns.size(); i++) {
- fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
+ for (int i = 0; i < tableColumns.size(); i++) {
+ Column column = new Column(tableColumns.get(i).getName(), tableColumns.get(i).getType(), true);
+ column.setUniqueId(tableColumns.get(i).getUniqueId());
+ fileColumns.add(column);
}
return fileColumns;
}
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 351bf5caf82..c91a4865ca7 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -804,7 +804,6 @@ message PStreamHeader {
message PGetWalQueueSizeRequest{
optional int64 table_id = 1;
- optional int64 txn_id = 2;
}
message PGetWalQueueSizeResponse{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org