You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/10 13:26:26 UTC
(doris) branch master updated: [feature](merge-cloud) Decouple Tablet/TabletManager/TxnManager from global StorageEngine instance (#29736)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 98938168c5d [feature](merge-cloud) Decouple Tablet/TabletManager/TxnManager from global StorageEngine instance (#29736)
98938168c5d is described below
commit 98938168c5dbfd509e225beebd2953eb7871d2ce
Author: plat1ko <pl...@gmail.com>
AuthorDate: Wed Jan 10 21:26:18 2024 +0800
[feature](merge-cloud) Decouple Tablet/TabletManager/TxnManager from global StorageEngine instance (#29736)
---
be/src/olap/cold_data_compaction.cpp | 2 +-
be/src/olap/olap_server.cpp | 167 +++++++++++++-
be/src/olap/storage_engine.cpp | 5 +-
be/src/olap/storage_engine.h | 1 +
be/src/olap/tablet.cpp | 244 ++++-----------------
be/src/olap/tablet.h | 27 ++-
be/src/olap/tablet_manager.cpp | 33 ++-
be/src/olap/tablet_manager.h | 4 +-
be/src/olap/task/engine_clone_task.cpp | 2 +-
be/src/olap/txn_manager.cpp | 14 +-
be/src/olap/txn_manager.h | 4 +-
.../compaction_delete_bitmap_calculator_test.cpp | 5 +-
be/test/olap/cumulative_compaction_policy_test.cpp | 67 ++++--
...mulative_compaction_time_series_policy_test.cpp | 58 +++--
be/test/olap/ordered_data_compaction_test.cpp | 2 +-
be/test/olap/path_gc_test.cpp | 2 +-
be/test/olap/rowid_conversion_test.cpp | 2 +-
be/test/olap/segcompaction_test.cpp | 2 +-
be/test/olap/tablet_test.cpp | 6 +-
be/test/olap/txn_manager_test.cpp | 6 +-
be/test/vec/olap/vertical_compaction_test.cpp | 2 +-
21 files changed, 356 insertions(+), 299 deletions(-)
diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp
index 656ead0a1d4..b4bd95ea80f 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -61,7 +61,7 @@ Status ColdDataCompaction::execute_compact_impl() {
SCOPED_ATTACH_TASK(_mem_tracker);
int64_t permits = get_compaction_permits();
std::shared_lock cooldown_conf_rlock(_tablet->get_cooldown_conf_lock());
- if (_tablet->cooldown_conf_unlocked().first != _tablet->replica_id()) {
+ if (_tablet->cooldown_conf_unlocked().cooldown_replica_id != _tablet->replica_id()) {
return Status::Aborted<false>("this replica is not cooldown replica");
}
RETURN_IF_ERROR(do_compaction(permits));
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 60c309584a2..1fa99fcd31b 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -39,6 +39,7 @@
#include <utility>
#include <vector>
+#include "agent/utils.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
@@ -59,6 +60,7 @@
#include "olap/schema_change.h"
#include "olap/single_replica_compaction.h"
#include "olap/storage_engine.h"
+#include "olap/storage_policy.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
@@ -1130,7 +1132,168 @@ void StorageEngine::_remove_unused_remote_files_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
LOG(INFO) << "begin to remove unused remote files";
- Tablet::remove_unused_remote_files();
+ do_remove_unused_remote_files();
+ }
+}
+
+void StorageEngine::do_remove_unused_remote_files() {
+ auto tablets = tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING &&
+ t->cooldown_conf_unlocked().cooldown_replica_id == t->replica_id();
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>, std::vector<io::FileInfo>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer, &num_files_in_buffer, this](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" << storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ std::shared_ptr<io::RemoteFileSystem> fs;
+ auto st = get_remote_file_system(t->storage_policy_id(), &fs);
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
+ << t->tablet_id() << " : " << st;
+ return;
+ }
+
+ std::vector<io::FileInfo> files;
+ // FIXME(plat1ko): What if user reset resource in storage policy to another resource?
+ // Maybe we should also list files in previously uploaded resources.
+ bool exists = true;
+ st = dest_fs->list(io::Path(remote_tablet_path(t->tablet_id())), true, &files, &exists);
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
+ << t->tablet_id() << " : " << st;
+ return;
+ }
+ if (!exists || files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ RowsetIdUnorderedSet cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->get_header_lock());
+ for (auto&& rs_meta : t->tablet_meta()->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+ cooldowned_rowsets.insert(rs_meta->rowset_id());
+ }
+ }
+ if (cooldowned_rowsets.empty()) {
+ return;
+ }
+ cooldown_meta_id = t->tablet_meta()->cooldown_meta_id();
+ }
+ auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf();
+ if (cooldown_replica_id != t->replica_id()) {
+ return;
+ }
+ // {cooldown_replica_id}.{cooldown_term}.meta
+ std::string remote_meta_path =
+ fmt::format("{}.{}.meta", cooldown_replica_id, cooldown_term);
+ // filter out the paths that should be reserved
+ auto filter = [&, this](io::FileInfo& info) {
+ std::string_view filename = info.file_name;
+ if (filename.ends_with(".meta")) {
+ return filename == remote_meta_path;
+ }
+ auto rowset_id = extract_rowset_id(filename);
+ if (rowset_id.hi == 0) {
+ return false;
+ }
+ return cooldowned_rowsets.contains(rowset_id) ||
+ pending_remote_rowsets().contains(rowset_id);
+ };
+ files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end());
+ if (files.empty()) {
+ return;
+ }
+ files.shrink_to_fit();
+ num_files_in_buffer += files.size();
+ buffer.insert({t->tablet_id(), {std::move(dest_fs), std::move(files)}});
+ auto& info = req.confirm_list.emplace_back();
+ info.__set_tablet_id(t->tablet_id());
+ info.__set_cooldown_replica_id(cooldown_replica_id);
+ info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
+ };
+
+ auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
+ TConfirmUnusedRemoteFilesResult result;
+ LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size()
+ << " num_files=" << num_files_in_buffer;
+ auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
+ if (!st.ok()) {
+ LOG(WARNING) << st;
+ return;
+ }
+ for (auto id : result.confirmed_tablets) {
+ if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
+ auto& fs = it->second.first;
+ auto& files = it->second.second;
+ std::vector<io::Path> paths;
+ paths.reserve(files.size());
+ // delete unused files
+ LOG(INFO) << "delete unused files. root_path=" << fs->root_path()
+ << " tablet_id=" << id;
+ io::Path dir = remote_tablet_path(id);
+ for (auto& file : files) {
+ auto file_path = dir / file.file_name;
+ LOG(INFO) << "delete unused file: " << file_path.native();
+ paths.push_back(std::move(file_path));
+ }
+ st = fs->batch_delete(paths);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : "
+ << st;
+ }
+ buffer.erase(it);
+ }
+ }
+ };
+
+ // batch confirm to reduce FE's overhead
+ auto next_confirm_time = std::chrono::steady_clock::now() +
+ std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+ for (auto& t : tablets) {
+ if (t.use_count() <= 1 // this means tablet has been dropped
+ || t->cooldown_conf_unlocked().cooldown_replica_id != t->replica_id() ||
+ t->tablet_state() != TABLET_RUNNING) {
+ continue;
+ }
+ calc_unused_remote_files(t.get());
+ if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer ||
+ std::chrono::steady_clock::now() > next_confirm_time)) {
+ confirm_and_remove_files();
+ buffer.clear();
+ req.confirm_list.clear();
+ num_files_in_buffer = 0;
+ next_confirm_time =
+ std::chrono::steady_clock::now() +
+ std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+ }
+ }
+ if (num_files_in_buffer > 0) {
+ confirm_and_remove_files();
}
}
@@ -1166,7 +1329,7 @@ void StorageEngine::_cold_data_compaction_producer_callback() {
tablet_to_follow.reserve(n + 1);
for (auto& t : tablets) {
- if (t->replica_id() == t->cooldown_conf_unlocked().first) {
+ if (t->replica_id() == t->cooldown_conf_unlocked().cooldown_replica_id) {
auto score = t->calc_cold_data_compaction_score();
if (score < 4) {
continue;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 33844e89929..60a0622ff90 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -123,13 +123,12 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_segment_meta_mem_tracker(std::make_shared<MemTracker>(
"SegmentMeta", ExecEnv::GetInstance()->experimental_mem_tracker())),
_stop_background_threads_latch(1),
- _tablet_manager(new TabletManager(config::tablet_map_shard_size)),
- _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)),
+ _tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)),
+ _txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)),
_rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
_memtable_flush_executor(nullptr),
_calc_delete_bitmap_executor(nullptr),
_default_rowset_type(BETA_ROWSET),
- _heartbeat_flags(nullptr),
_stream_load_recorder(nullptr) {
REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() {
// std::lock_guard<std::mutex> lock(_gc_mutex);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 5d71a99617f..e424068f6a7 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -311,6 +311,7 @@ private:
void _cooldown_tasks_producer_callback();
void _remove_unused_remote_files_callback();
+ void do_remove_unused_remote_files();
void _cold_data_compaction_producer_callback();
Status _handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8b10e5e6fdb..ef0c41a01f3 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -255,9 +255,10 @@ void WriteCooldownMetaExecutors::WriteCooldownMetaExecutors::submit(TabletShared
[task = std::move(async_write_task)]() { task(); });
}
-Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
+Tablet::Tablet(StorageEngine& engine, TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
const std::string_view& cumulative_compaction_type)
: BaseTablet(std::move(tablet_meta)),
+ _engine(engine),
_data_dir(data_dir),
_is_bad(false),
_last_cumu_compaction_failure_millis(0),
@@ -416,7 +417,7 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add,
}
// clear stale rowset
for (auto& [v, rs] : _stale_rs_version_map) {
- StorageEngine::instance()->add_unused_rowset(rs);
+ _engine.add_unused_rowset(rs);
}
_stale_rs_version_map.clear();
_tablet_meta->clear_stale_rowset();
@@ -556,7 +557,7 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
// delete rowset in "to_delete" directly
for (auto& rs : to_delete) {
LOG(INFO) << "add unused rowset " << rs->rowset_id() << " because of same version";
- StorageEngine::instance()->add_unused_rowset(rs);
+ _engine.add_unused_rowset(rs);
}
}
return Status::OK();
@@ -595,7 +596,7 @@ void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool
} else {
for (auto& rs : to_delete) {
_timestamped_version_tracker.delete_version(rs->version());
- StorageEngine::instance()->add_unused_rowset(rs);
+ _engine.add_unused_rowset(rs);
}
}
}
@@ -832,7 +833,7 @@ void Tablet::delete_expired_stale_rowset() {
auto it = _stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
// delete rowset
- StorageEngine::instance()->add_unused_rowset(it->second);
+ _engine.add_unused_rowset(it->second);
_stale_rs_version_map.erase(it);
VLOG_NOTICE << "delete stale rowset tablet=" << tablet_id() << " version["
<< timestampedVersion->version().first << ","
@@ -1507,8 +1508,7 @@ void Tablet::get_compaction_status(std::string* json_result) {
std::string dummp_token;
rapidjson::Value fetch_addr;
if (tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
- StorageEngine::instance()->get_peer_replica_info(tablet_id(), &replica_info,
- &dummp_token)) {
+ _engine.get_peer_replica_info(tablet_id(), &replica_info, &dummp_token)) {
std::string addr = replica_info.host + ":" + std::to_string(replica_info.brpc_port);
fetch_addr.SetString(addr.c_str(), addr.length(), root.GetAllocator());
} else {
@@ -1790,12 +1790,12 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
tablet_info->__set_replica_id(replica_id());
tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size());
if (_tablet_meta->cooldown_meta_id().initialized()) { // has cooldowned data
- tablet_info->__set_cooldown_term(_cooldown_term);
+ tablet_info->__set_cooldown_term(_cooldown_conf.term);
tablet_info->__set_cooldown_meta_id(_tablet_meta->cooldown_meta_id().to_thrift());
}
if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id() > 0) {
// tablet may not have cooldowned data, but the storage policy is set
- tablet_info->__set_cooldown_term(_cooldown_term);
+ tablet_info->__set_cooldown_term(_cooldown_conf.term);
}
}
@@ -1986,7 +1986,7 @@ Status Tablet::create_initial_rowset(const int64_t req_version) {
Result<std::unique_ptr<RowsetWriter>> Tablet::create_rowset_writer(RowsetWriterContext& context,
bool vertical) {
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
+ context.rowset_id = _engine.next_rowset_id();
_init_context_common_fields(context);
std::unique_ptr<RowsetWriter> rowset_writer;
if (auto st = RowsetFactory::create_rowset_writer(context, vertical, &rowset_writer); !st.ok())
@@ -2011,7 +2011,7 @@ Status Tablet::create_transient_rowset_writer(
context.enable_segcompaction = false;
// ATTN: context.tablet is a shared_ptr, can't simply set it's value to `this`. We should
// get the shared_ptr from tablet_manager.
- auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
+ auto tablet = _engine.tablet_manager()->get_tablet(tablet_id());
if (!tablet) {
LOG(WARNING) << "cant find tablet by tablet_id=" << tablet_id();
return Status::NotFound(fmt::format("cant find tablet by tablet_id= {}", tablet_id()));
@@ -2044,7 +2044,7 @@ void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
// Alpha Rowset will be removed in the future, so that if the tablet's default rowset type is
// alpah rowset, then set the newly created rowset to storage engine's default rowset.
if (context.rowset_type == ALPHA_ROWSET) {
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
+ context.rowset_type = _engine.default_rowset_type();
}
if (context.fs != nullptr && context.fs->type() != io::FileSystemType::LOCAL) {
context.rowset_dir = remote_tablet_path(tablet_id());
@@ -2077,11 +2077,11 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) {
return Status::Error<TRY_LOCK_FAILED>("try cumu_compaction_lock failed");
}
std::shared_lock cooldown_conf_rlock(_cooldown_conf_lock);
- if (_cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
+ if (_cooldown_conf.cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
return Status::InternalError("invalid cooldown_replica_id");
}
- if (_cooldown_replica_id == replica_id()) {
+ if (_cooldown_conf.cooldown_replica_id == replica_id()) {
// this replica is cooldown replica
RETURN_IF_ERROR(_cooldown_data(std::move(rowset)));
} else {
@@ -2097,7 +2097,7 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) {
// hold SHARED `cooldown_conf_lock`
Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
- DCHECK(_cooldown_replica_id == replica_id());
+ DCHECK(_cooldown_conf.cooldown_replica_id == replica_id());
std::shared_ptr<io::RemoteFileSystem> dest_fs;
RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &dest_fs));
@@ -2122,8 +2122,8 @@ Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
return Status::OK();
}
- RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
- auto pending_rs_guard = StorageEngine::instance()->pending_remote_rowsets().add(new_rowset_id);
+ RowsetId new_rowset_id = _engine.next_rowset_id();
+ auto pending_rs_guard = _engine.pending_remote_rowsets().add(new_rowset_id);
Status st;
Defer defer {[&] {
if (!st.ok()) {
@@ -2171,7 +2171,7 @@ Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
// Upload cooldowned rowset meta to remote fs
// ATTN: Even if it is an empty rowset, in order for the followers to synchronize, the coolown meta must be
// uploaded, otherwise followers may never completely cooldown.
- if (auto t = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
+ if (auto t = _engine.tablet_manager()->get_tablet(tablet_id());
t != nullptr) { // `t` can be nullptr if it has been dropped
async_write_cooldown_meta(std::move(t));
}
@@ -2181,8 +2181,8 @@ Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
// hold SHARED `cooldown_conf_lock`
Status Tablet::_read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
TabletMetaPB* tablet_meta_pb) {
- std::string remote_meta_path =
- remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term);
+ std::string remote_meta_path = remote_tablet_meta_path(
+ tablet_id(), _cooldown_conf.cooldown_replica_id, _cooldown_conf.term);
io::FileReaderSPtr tablet_meta_reader;
RETURN_IF_ERROR(fs->open_file(remote_meta_path, &tablet_meta_reader));
auto file_size = tablet_meta_reader->size();
@@ -2226,23 +2226,24 @@ bool Tablet::update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replic
LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id();
return false;
}
- if (cooldown_term <= _cooldown_term) {
+ if (cooldown_term <= _cooldown_conf.term) {
return false;
}
LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
- << " cooldown_replica_id: " << _cooldown_replica_id << " -> " << cooldown_replica_id
- << ", cooldown_term: " << _cooldown_term << " -> " << cooldown_term;
- _cooldown_replica_id = cooldown_replica_id;
- _cooldown_term = cooldown_term;
+ << " cooldown_replica_id: " << _cooldown_conf.cooldown_replica_id << " -> "
+ << cooldown_replica_id << ", cooldown_term: " << _cooldown_conf.term << " -> "
+ << cooldown_term;
+ _cooldown_conf.cooldown_replica_id = cooldown_replica_id;
+ _cooldown_conf.term = cooldown_term;
return true;
}
Status Tablet::write_cooldown_meta() {
std::shared_lock rlock(_cooldown_conf_lock);
- if (_cooldown_replica_id != _tablet_meta->replica_id()) {
+ if (_cooldown_conf.cooldown_replica_id != _tablet_meta->replica_id()) {
return Status::Aborted<false>("not cooldown replica({} vs {}) tablet_id={}",
- _tablet_meta->replica_id(), _cooldown_replica_id,
- tablet_id());
+ _tablet_meta->replica_id(),
+ _cooldown_conf.cooldown_replica_id, tablet_id());
}
std::shared_ptr<io::RemoteFileSystem> fs;
@@ -2281,8 +2282,8 @@ Status Tablet::write_cooldown_meta() {
tablet_meta_pb.mutable_cooldown_meta_id()->set_hi(cooldown_meta_id.hi);
tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
- std::string remote_meta_path =
- remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term);
+ std::string remote_meta_path = remote_tablet_meta_path(
+ tablet_id(), _cooldown_conf.cooldown_replica_id, _cooldown_conf.term);
io::FileWriterPtr tablet_meta_writer;
// FIXME(plat1ko): What if object store permanently unavailable?
RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
@@ -2293,9 +2294,9 @@ Status Tablet::write_cooldown_meta() {
// hold SHARED `cooldown_conf_lock`
Status Tablet::_follow_cooldowned_data() {
- DCHECK(_cooldown_replica_id != replica_id());
+ DCHECK(_cooldown_conf.cooldown_replica_id != replica_id());
LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
- << " cooldown_replica_id=" << _cooldown_replica_id
+ << " cooldown_replica_id=" << _cooldown_conf.cooldown_replica_id
<< " local replica=" << replica_id();
std::shared_ptr<io::RemoteFileSystem> fs;
@@ -2495,165 +2496,6 @@ Status Tablet::remove_all_remote_rowsets() {
gc_pb.SerializeAsString());
}
-void Tablet::remove_unused_remote_files() {
- auto tablets = StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
- return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
- t->tablet_state() == TABLET_RUNNING && t->_cooldown_replica_id == t->replica_id();
- });
- TConfirmUnusedRemoteFilesRequest req;
- req.__isset.confirm_list = true;
- // tablet_id -> [fs, unused_remote_files]
- using unused_remote_files_buffer_t = std::unordered_map<
- int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>, std::vector<io::FileInfo>>>;
- unused_remote_files_buffer_t buffer;
- int64_t num_files_in_buffer = 0;
- // assume a filename is 0.1KB, buffer size should not larger than 100MB
- constexpr int64_t max_files_in_buffer = 1000000;
-
- auto calc_unused_remote_files = [&req, &buffer, &num_files_in_buffer](Tablet* t) {
- auto storage_policy = get_storage_policy(t->storage_policy_id());
- if (storage_policy == nullptr) {
- LOG(WARNING) << "could not find storage_policy, storage_policy_id="
- << t->storage_policy_id();
- return;
- }
- auto resource = get_storage_resource(storage_policy->resource_id);
- auto dest_fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
- if (dest_fs == nullptr) {
- LOG(WARNING) << "could not find resource, resouce_id=" << storage_policy->resource_id;
- return;
- }
- DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
- DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
-
- std::shared_ptr<io::RemoteFileSystem> fs;
- auto st = get_remote_file_system(t->storage_policy_id(), &fs);
- if (!st.ok()) {
- LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
- << t->tablet_id() << " : " << st;
- return;
- }
-
- std::vector<io::FileInfo> files;
- // FIXME(plat1ko): What if user reset resource in storage policy to another resource?
- // Maybe we should also list files in previously uploaded resources.
- bool exists = true;
- st = dest_fs->list(io::Path(remote_tablet_path(t->tablet_id())), true, &files, &exists);
- if (!st.ok()) {
- LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
- << t->tablet_id() << " : " << st;
- return;
- }
- if (!exists || files.empty()) {
- return;
- }
- // get all cooldowned rowsets
- RowsetIdUnorderedSet cooldowned_rowsets;
- UniqueId cooldown_meta_id;
- {
- std::shared_lock rlock(t->_meta_lock);
- for (auto&& rs_meta : t->_tablet_meta->all_rs_metas()) {
- if (!rs_meta->is_local()) {
- cooldowned_rowsets.insert(rs_meta->rowset_id());
- }
- }
- if (cooldowned_rowsets.empty()) {
- return;
- }
- cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
- }
- auto [cooldown_replica_id, cooldown_term] = t->cooldown_conf();
- if (cooldown_replica_id != t->replica_id()) {
- return;
- }
- // {cooldown_replica_id}.{cooldown_term}.meta
- std::string remote_meta_path =
- fmt::format("{}.{}.meta", cooldown_replica_id, cooldown_term);
- // filter out the paths that should be reserved
- auto filter = [&](io::FileInfo& info) {
- std::string_view filename = info.file_name;
- if (filename.ends_with(".meta")) {
- return filename == remote_meta_path;
- }
- auto rowset_id = extract_rowset_id(filename);
- if (rowset_id.hi == 0) {
- return false;
- }
- return cooldowned_rowsets.contains(rowset_id) ||
- StorageEngine::instance()->pending_remote_rowsets().contains(rowset_id);
- };
- files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end());
- if (files.empty()) {
- return;
- }
- files.shrink_to_fit();
- num_files_in_buffer += files.size();
- buffer.insert({t->tablet_id(), {std::move(dest_fs), std::move(files)}});
- auto& info = req.confirm_list.emplace_back();
- info.__set_tablet_id(t->tablet_id());
- info.__set_cooldown_replica_id(cooldown_replica_id);
- info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
- };
-
- auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
- TConfirmUnusedRemoteFilesResult result;
- LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size()
- << " num_files=" << num_files_in_buffer;
- auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
- if (!st.ok()) {
- LOG(WARNING) << st;
- return;
- }
- for (auto id : result.confirmed_tablets) {
- if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
- auto& fs = it->second.first;
- auto& files = it->second.second;
- std::vector<io::Path> paths;
- paths.reserve(files.size());
- // delete unused files
- LOG(INFO) << "delete unused files. root_path=" << fs->root_path()
- << " tablet_id=" << id;
- io::Path dir = remote_tablet_path(id);
- for (auto& file : files) {
- auto file_path = dir / file.file_name;
- LOG(INFO) << "delete unused file: " << file_path.native();
- paths.push_back(std::move(file_path));
- }
- st = fs->batch_delete(paths);
- if (!st.ok()) {
- LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : "
- << st;
- }
- buffer.erase(it);
- }
- }
- };
-
- // batch confirm to reduce FE's overhead
- auto next_confirm_time = std::chrono::steady_clock::now() +
- std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
- for (auto& t : tablets) {
- if (t.use_count() <= 1 // this means tablet has been dropped
- || t->_cooldown_replica_id != t->replica_id() || t->tablet_state() != TABLET_RUNNING) {
- continue;
- }
- calc_unused_remote_files(t.get());
- if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer ||
- std::chrono::steady_clock::now() > next_confirm_time)) {
- confirm_and_remove_files();
- buffer.clear();
- req.confirm_list.clear();
- num_files_in_buffer = 0;
- next_confirm_time =
- std::chrono::steady_clock::now() +
- std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
- }
- }
- if (num_files_in_buffer > 0) {
- confirm_and_remove_files();
- }
-}
-
void Tablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) {
std::lock_guard wrlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
@@ -3130,19 +2972,18 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
}
OlapStopWatch watch;
- doris::TabletSharedPtr tablet_ptr =
- StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
+ doris::TabletSharedPtr tablet_ptr = _engine.tablet_manager()->get_tablet(tablet_id());
if (tablet_ptr == nullptr) {
return Status::InternalError("Can't find tablet id: {}, maybe already dropped.",
tablet_id());
}
- for (size_t i = 0; i < segments.size(); i++) {
- auto& seg = segments[i];
+ for (const auto& segment : segments) {
+ auto& seg = segment;
if (token != nullptr) {
RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, specified_rowsets, end_version,
delete_bitmap, rowset_writer));
} else {
- RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i], specified_rowsets,
+ RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segment, specified_rowsets,
delete_bitmap, end_version, rowset_writer));
}
}
@@ -3317,7 +3158,7 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
std::vector<RowsetSharedPtr> specified_rowsets = get_rowset_by_ids(&cur_rowset_ids);
OlapStopWatch watch;
- auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
+ auto token = _engine.calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, token.get()));
RETURN_IF_ERROR(token->wait());
@@ -3422,7 +3263,7 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
}
auto t3 = watch.get_elapse_time_us();
- auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
+ auto token = _engine.calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, token.get(), rowset_writer));
RETURN_IF_ERROR(token->wait());
@@ -3459,10 +3300,9 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
// update version without write lock, compaction and publish_txn
// will update delete bitmap, handle compaction with _rowset_update_lock
// and publish_txn runs sequential so no need to lock here
- for (auto iter = delete_bitmap->delete_bitmap.begin();
- iter != delete_bitmap->delete_bitmap.end(); ++iter) {
- _tablet_meta->delete_bitmap().merge(
- {std::get<0>(iter->first), std::get<1>(iter->first), cur_version}, iter->second);
+ for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
+ _tablet_meta->delete_bitmap().merge({std::get<0>(key), std::get<1>(key), cur_version},
+ bitmap);
}
return Status::OK();
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d55662d75df..068cbae6978 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -19,10 +19,10 @@
#include <butil/macros.h>
#include <glog/logging.h>
-#include <stddef.h>
-#include <stdint.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
#include <functional>
#include <limits>
#include <list>
@@ -87,7 +87,7 @@ static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD = std::chrono::seconds(
class Tablet final : public BaseTablet {
public:
- Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
+ Tablet(StorageEngine& engine, TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
const std::string_view& cumulative_compaction_type = "");
DataDir* data_dir() const { return _data_dir; }
@@ -363,14 +363,17 @@ public:
RowsetSharedPtr need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
- std::pair<int64_t, int64_t> cooldown_conf() const {
+ struct CooldownConf {
+ int64_t term = -1;
+ int64_t cooldown_replica_id = -1;
+ };
+
+ CooldownConf cooldown_conf() const {
std::shared_lock rlock(_cooldown_conf_lock);
- return {_cooldown_replica_id, _cooldown_term};
+ return _cooldown_conf;
}
- std::pair<int64_t, int64_t> cooldown_conf_unlocked() const {
- return {_cooldown_replica_id, _cooldown_term};
- }
+ CooldownConf cooldown_conf_unlocked() const { return _cooldown_conf; }
// Return `true` if update success
bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id);
@@ -380,8 +383,6 @@ public:
void record_unused_remote_rowset(const RowsetId& rowset_id, const std::string& resource,
int64_t num_segments);
- static void remove_unused_remote_files();
-
uint32_t calc_cold_data_compaction_score() const;
std::mutex& get_cold_compaction_lock() { return _cold_compaction_lock; }
@@ -610,6 +611,7 @@ public:
static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
private:
+ StorageEngine& _engine;
DataDir* _data_dir = nullptr;
TimestampedVersionTracker _timestamped_version_tracker;
@@ -676,8 +678,7 @@ private:
int64_t _skip_base_compaction_ts;
// cooldown related
- int64_t _cooldown_replica_id = -1;
- int64_t _cooldown_term = -1;
+ CooldownConf _cooldown_conf;
// `_cooldown_conf_lock` is used to serialize update cooldown conf and all operations that:
// 1. read cooldown conf
// 2. upload rowsets to remote storage
@@ -690,8 +691,6 @@ private:
// `_alter_failed` is used to indicate whether the tablet failed to perform a schema change
std::atomic<bool> _alter_failed = false;
- DISALLOW_COPY_AND_ASSIGN(Tablet);
-
int64_t _io_error_times = 0;
};
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index b5eacce1a38..52fdec5b14b 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -85,8 +85,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_meta_mem_consumption, MetricUnit::BYTE
bvar::Adder<int64_t> g_tablet_meta_schema_columns_count("tablet_meta_schema_columns_count");
-TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
- : _tablet_meta_mem_tracker(std::make_shared<MemTracker>(
+TabletManager::TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size)
+ : _engine(engine),
+ _tablet_meta_mem_tracker(std::make_shared<MemTracker>(
"TabletMeta", ExecEnv::GetInstance()->experimental_mem_tracker())),
_tablets_shards_size(tablet_map_lock_shard_size),
_tablets_shards_mask(tablet_map_lock_shard_size - 1) {
@@ -507,7 +508,8 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << ", tablet "
<< tablet_meta->tablet_id();
}
- TabletSharedPtr new_tablet = std::make_shared<Tablet>(std::move(tablet_meta), data_dir);
+ TabletSharedPtr new_tablet =
+ std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir);
COUNTER_UPDATE(ADD_CHILD_TIMER(profile, "CreateTabletFromMeta", parent_timer_name),
static_cast<int64_t>(watch.reset()));
return new_tablet;
@@ -836,7 +838,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_
LOG(WARNING) << "tablet=" << tablet_id << " load from meta but partition id eq 0";
}
- TabletSharedPtr tablet = std::make_shared<Tablet>(std::move(tablet_meta), data_dir);
+ TabletSharedPtr tablet = std::make_shared<Tablet>(_engine, std::move(tablet_meta), data_dir);
// NOTE: method load_tablet_from_meta could be called by two cases as below
// case 1: BE start;
@@ -998,7 +1000,7 @@ Status TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
// build the expired txn map first, outside the tablet map lock
std::map<TabletInfo, std::vector<int64_t>> expire_txn_map;
- StorageEngine::instance()->txn_manager()->build_expire_txn_map(&expire_txn_map);
+ _engine.txn_manager()->build_expire_txn_map(&expire_txn_map);
LOG(INFO) << "find expired transactions for " << expire_txn_map.size() << " tablets";
HistogramStat tablet_version_num_hist;
@@ -1319,8 +1321,7 @@ Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& reque
col_idx_to_unique_id);
if (request.__isset.storage_format) {
if (request.storage_format == TStorageFormat::DEFAULT) {
- (*tablet_meta)
- ->set_preferred_rowset_type(StorageEngine::instance()->default_rowset_type());
+ (*tablet_meta)->set_preferred_rowset_type(_engine.default_rowset_type());
} else if (request.storage_format == TStorageFormat::V1) {
(*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET);
} else if (request.storage_format == TStorageFormat::V2) {
@@ -1388,7 +1389,7 @@ TabletManager::tablets_shard& TabletManager::_get_tablets_shard(TTabletId tablet
void TabletManager::get_tablets_distribution_on_different_disks(
std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk,
std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& tablets_info_on_disk) {
- std::vector<DataDir*> data_dirs = StorageEngine::instance()->get_stores();
+ std::vector<DataDir*> data_dirs = _engine.get_stores();
std::map<int64_t, std::set<TabletInfo>> partition_tablet_map;
{
// When drop tablet, '_partition_tablet_map_lock' is locked in 'tablet_shard_lock'.
@@ -1397,25 +1398,23 @@ void TabletManager::get_tablets_distribution_on_different_disks(
std::shared_lock rdlock(_partition_tablet_map_lock);
partition_tablet_map = _partition_tablet_map;
}
- std::map<int64_t, std::set<TabletInfo>>::iterator partition_iter = partition_tablet_map.begin();
- for (; partition_iter != partition_tablet_map.end(); ++partition_iter) {
+ for (auto& [partition_id, tablet_infos] : partition_tablet_map) {
std::map<DataDir*, int64_t> tablets_num;
std::map<DataDir*, std::vector<TabletSize>> tablets_info;
- for (int i = 0; i < data_dirs.size(); i++) {
- tablets_num[data_dirs[i]] = 0;
+ for (auto* data_dir : data_dirs) {
+ tablets_num[data_dir] = 0;
}
- int64_t partition_id = partition_iter->first;
- std::set<TabletInfo>::iterator tablet_info_iter = (partition_iter->second).begin();
- for (; tablet_info_iter != (partition_iter->second).end(); ++tablet_info_iter) {
+
+ for (const auto& tablet_info : tablet_infos) {
// get_tablet() will hold 'tablet_shard_lock'
- TabletSharedPtr tablet = get_tablet(tablet_info_iter->tablet_id);
+ TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id);
if (tablet == nullptr) {
continue;
}
DataDir* data_dir = tablet->data_dir();
size_t tablet_footprint = tablet->tablet_footprint();
tablets_num[data_dir]++;
- TabletSize tablet_size(tablet_info_iter->tablet_id, tablet_footprint);
+ TabletSize tablet_size(tablet_info.tablet_id, tablet_footprint);
tablets_info[data_dir].push_back(tablet_size);
}
tablets_num_on_disk[partition_id] = tablets_num;
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 04131b8d3bb..04f79b4f0f0 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -54,7 +54,7 @@ class TTabletInfo;
// please uniformly name the method in "xxx_unlocked()" mode
class TabletManager {
public:
- TabletManager(int32_t tablet_map_lock_shard_size);
+ TabletManager(StorageEngine& engine, int32_t tablet_map_lock_shard_size);
~TabletManager();
bool check_tablet_id_exist(TTabletId tablet_id);
@@ -226,6 +226,8 @@ private:
std::set<int64_t> tablets_under_clone;
};
+ StorageEngine& _engine;
+
// TODO: memory size of TabletSchema cannot be accurately tracked.
// trace the memory use by meta of tablet
std::shared_ptr<MemTracker> _tablet_meta_mem_tracker;
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index dc851928577..75509bb2635 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -816,7 +816,7 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
}
{
std::shared_lock cooldown_conf_rlock(tablet->get_cooldown_conf_lock());
- if (tablet->cooldown_conf_unlocked().first == tablet->replica_id()) {
+ if (tablet->cooldown_conf_unlocked().cooldown_replica_id == tablet->replica_id()) {
// If this replica is cooldown replica, MUST generate a new `cooldown_meta_id` to avoid use `cooldown_meta_id`
// generated in old cooldown term which may lead to such situation:
// Replica A is cooldown replica, cooldown_meta_id=2,
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 5cec68d75be..161c39fdd20 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -103,8 +103,10 @@ struct TabletTxnInfo {
}
};
-TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size)
- : _txn_map_shard_size(txn_map_shard_size), _txn_shard_size(txn_shard_size) {
+TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size)
+ : _engine(engine),
+ _txn_map_shard_size(txn_map_shard_size),
+ _txn_shard_size(txn_shard_size) {
DCHECK_GT(_txn_map_shard_size, 0);
DCHECK_GT(_txn_shard_size, 0);
DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0);
@@ -401,8 +403,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr);
load_info->pending_rs_guard = std::move(guard);
if (is_recovery) {
- TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
- tablet_info.tablet_id, tablet_info.tablet_uid);
+ TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_info.tablet_id,
+ tablet_info.tablet_uid);
if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
load_info->unique_key_merge_on_write = true;
load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id()));
@@ -427,7 +429,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id, TTabletId tablet_id,
TabletUid tablet_uid, const Version& version,
TabletPublishStatistics* stats) {
- auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+ auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
return Status::OK();
}
@@ -648,7 +650,7 @@ Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
} else {
static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
#ifndef BE_TEST
- StorageEngine::instance()->add_unused_rowset(rowset);
+ _engine.add_unused_rowset(rowset);
#endif
VLOG_NOTICE << "delete transaction from engine successfully."
<< " partition_id: " << key.first << ", transaction_id: " << key.second
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 25b8da90b71..f7e67d0a462 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -77,7 +77,7 @@ using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>;
// txn manager is used to manage mapping between tablet and txns
class TxnManager {
public:
- TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size);
+ TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size);
~TxnManager() {
delete[] _txn_tablet_maps;
@@ -230,6 +230,8 @@ private:
};
private:
+ StorageEngine& _engine;
+
const int32_t _txn_map_shard_size;
const int32_t _txn_shard_size;
diff --git a/be/test/olap/compaction_delete_bitmap_calculator_test.cpp b/be/test/olap/compaction_delete_bitmap_calculator_test.cpp
index 4399553f25e..41dbd42f61a 100644
--- a/be/test/olap/compaction_delete_bitmap_calculator_test.cpp
+++ b/be/test/olap/compaction_delete_bitmap_calculator_test.cpp
@@ -154,7 +154,7 @@ public:
void SetUp() override {
config::max_runnings_transactions_per_txn_map = 500;
- _txn_mgr.reset(new TxnManager(1, 1));
+ _txn_mgr.reset(new TxnManager(*k_engine, 1, 1));
config::tablet_map_shard_size = 1;
config::txn_map_shard_size = 1;
@@ -213,7 +213,8 @@ public:
static_cast<void>(_tablet_meta->add_rs_meta(rowset_meta1));
static_cast<void>(_tablet_meta->add_rs_meta(rowset_meta2));
static_cast<void>(_tablet_meta->add_rs_meta(rowset_meta3));
- _tablet = std::make_shared<Tablet>(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY);
+ _tablet = std::make_shared<Tablet>(*k_engine, _tablet_meta, nullptr,
+ CUMULATIVE_SIZE_BASED_POLICY);
static_cast<void>(_tablet->init());
}
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp
index d715fd7b9ae..08d479727e1 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -27,6 +27,7 @@
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_meta.h"
+#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "util/uid_util.h"
@@ -35,16 +36,17 @@ namespace doris {
class TestSizeBasedCumulativeCompactionPolicy : public testing::Test {
public:
- TestSizeBasedCumulativeCompactionPolicy() {}
+ TestSizeBasedCumulativeCompactionPolicy() : _engine(StorageEngine({})) {}
+
void SetUp() {
config::compaction_promotion_size_mbytes = 1024;
config::compaction_promotion_ratio = 0.05;
config::compaction_promotion_min_size_mbytes = 64;
config::compaction_min_size_mbytes = 64;
- _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
- 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
- TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+ _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
_json_rowset_meta = R"({
"rowset_id": 540081,
@@ -326,6 +328,9 @@ public:
protected:
std::string _json_rowset_meta;
TabletMetaSharedPtr _tablet_meta;
+
+private:
+ StorageEngine _engine;
};
TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score) {
@@ -336,7 +341,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -358,7 +364,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -379,7 +386,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calculate_cumulative_point_big_b
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -395,7 +403,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calculate_cumulative_point_overl
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -411,7 +420,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -428,7 +438,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base)
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -445,7 +456,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -472,7 +484,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -500,7 +513,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -527,7 +541,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -554,7 +569,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -581,7 +597,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -610,7 +627,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete_in_cum
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -641,7 +659,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -670,7 +689,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_big) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -686,7 +706,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
_tablet->calculate_cumulative_point();
@@ -702,7 +723,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
config::compaction_promotion_size_mbytes = 1024;
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
@@ -724,7 +746,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
static_cast<void>(_tablet->init());
;
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 3df0e1946db..1390f1deb14 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -27,6 +27,7 @@
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_meta.h"
+#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "util/time.h"
@@ -36,11 +37,11 @@ namespace doris {
class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
public:
- TestTimeSeriesCumulativeCompactionPolicy() = default;
+ TestTimeSeriesCumulativeCompactionPolicy() : _engine(StorageEngine({})) {}
void SetUp() {
- _tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
- 1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
- TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+ _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
_tablet_meta->set_compaction_policy(std::string(CUMULATIVE_TIME_SERIES_POLICY));
_tablet_meta->set_time_series_compaction_goal_size_mbytes(100);
@@ -292,6 +293,9 @@ public:
protected:
std::string _json_rowset_meta;
TabletMetaSharedPtr _tablet_meta;
+
+private:
+ StorageEngine _engine;
};
TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calculate_cumulative_point_overlap) {
@@ -302,7 +306,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calculate_cumulative_point_over
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -317,7 +323,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calculate_cumulative_point_big_
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -332,7 +340,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -353,7 +363,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
@@ -373,7 +385,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_candidate_rowsets) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -389,7 +403,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_candidate_rowsets_big_rows
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -405,7 +421,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_input_rowsets_goal_size) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -432,7 +450,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_input_rowsets_file_count)
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -459,7 +479,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_input_rowsets_time_interva
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
int64_t now = UnixMillis();
@@ -488,7 +510,9 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_input_rowsets_empty) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ StorageEngine engine({});
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
int64_t now = UnixMillis();
@@ -517,7 +541,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_input_rowsets_delete) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
@@ -545,7 +570,8 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, _pick_missing_version_cumulativ
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
+ TabletSharedPtr _tablet(
+ new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
// has miss version
diff --git a/be/test/olap/ordered_data_compaction_test.cpp b/be/test/olap/ordered_data_compaction_test.cpp
index 77a7ec59807..922a10d155e 100644
--- a/be/test/olap/ordered_data_compaction_test.cpp
+++ b/be/test/olap/ordered_data_compaction_test.cpp
@@ -347,7 +347,7 @@ protected:
UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F, 0, enable_unique_key_merge_on_write));
- TabletSharedPtr tablet(new Tablet(tablet_meta, _data_dir.get()));
+ TabletSharedPtr tablet(new Tablet(*k_engine, tablet_meta, _data_dir.get()));
static_cast<void>(tablet->init());
if (has_delete_handler) {
// delete data with key < 1000
diff --git a/be/test/olap/path_gc_test.cpp b/be/test/olap/path_gc_test.cpp
index 985ac63257a..7a52b28d82a 100644
--- a/be/test/olap/path_gc_test.cpp
+++ b/be/test/olap/path_gc_test.cpp
@@ -58,7 +58,7 @@ TEST(PathGcTest, GcTabletAndRowset) {
tablet_meta->set_tablet_uid({tablet_id, 0});
tablet_meta->set_shard_id(tablet_id % 4);
tablet_meta->_schema_hash = tablet_id;
- auto tablet = std::make_shared<Tablet>(std::move(tablet_meta), &data_dir);
+ auto tablet = std::make_shared<Tablet>(engine, std::move(tablet_meta), &data_dir);
auto& tablet_map = engine.tablet_manager()->_get_tablet_map(tablet_id);
tablet_map[tablet_id] = tablet;
return tablet;
diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp
index b77fe432997..d28e9f7dfe9 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -280,7 +280,7 @@ protected:
UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F, 0, enable_unique_key_merge_on_write));
- TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
+ TabletSharedPtr tablet(new Tablet(*k_engine, tablet_meta, nullptr));
static_cast<void>(tablet->init());
return tablet;
}
diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp
index f76afef10b0..f612694e7e3 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -197,7 +197,7 @@ protected:
tablet_meta->_tablet_id = 1;
tablet_meta->set_partition_id(10000);
tablet_meta->_schema = tablet_schema;
- auto tablet = std::make_shared<Tablet>(tablet_meta, data_dir.get(), "test_str");
+ auto tablet = std::make_shared<Tablet>(*l_engine, tablet_meta, data_dir.get(), "test_str");
char* tmp_str = (char*)malloc(20);
strncpy(tmp_str, "test_tablet_name", 20);
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index ee773c6242c..f5778d47982 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -243,7 +243,7 @@ TEST_F(TestTablet, delete_expired_stale_rowset) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr));
+ TabletSharedPtr _tablet(new Tablet(*k_engine, _tablet_meta, nullptr));
static_cast<void>(_tablet->init());
for (auto ptr : expired_rs_metas) {
@@ -280,7 +280,7 @@ TEST_F(TestTablet, pad_rowset) {
}
static_cast<void>(_data_dir->init());
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, _data_dir.get()));
+ TabletSharedPtr _tablet(new Tablet(*k_engine, _tablet_meta, _data_dir.get()));
static_cast<void>(_tablet->init());
Version version(5, 5);
@@ -324,7 +324,7 @@ TEST_F(TestTablet, cooldown_policy) {
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
}
- TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr));
+ TabletSharedPtr _tablet(new Tablet(*k_engine, _tablet_meta, nullptr));
static_cast<void>(_tablet->init());
constexpr int64_t storage_policy_id = 10000;
_tablet->set_storage_policy_id(storage_policy_id);
diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp
index d8777159e6e..d33570e8a8d 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -103,14 +103,14 @@ public:
void create_tablet() {
auto tablet_meta = std::make_shared<TabletMeta>();
tablet_meta->set_tablet_uid(_tablet_uid);
- auto tablet = std::make_shared<Tablet>(std::move(tablet_meta), nullptr);
+ auto tablet = std::make_shared<Tablet>(*k_engine, std::move(tablet_meta), nullptr);
auto& tablet_map = k_engine->tablet_manager()->_get_tablet_map(tablet_id);
tablet_map[tablet_id] = std::move(tablet);
}
virtual void SetUp() {
config::max_runnings_transactions_per_txn_map = 500;
- _txn_mgr.reset(new TxnManager(64, 1024));
+ _txn_mgr.reset(new TxnManager(*k_engine, 64, 1024));
config::tablet_map_shard_size = 1;
config::txn_map_shard_size = 1;
@@ -344,7 +344,7 @@ TEST_F(TxnManagerTest, DeleteCommittedTxn) {
}
TEST_F(TxnManagerTest, TabletVersionCache) {
- std::unique_ptr<TxnManager> txn_mgr = std::make_unique<TxnManager>(64, 1024);
+ std::unique_ptr<TxnManager> txn_mgr = std::make_unique<TxnManager>(*k_engine, 64, 1024);
txn_mgr->update_tablet_version_txn(123, 100, 456);
txn_mgr->update_tablet_version_txn(124, 100, 567);
int64_t tx1 = txn_mgr->get_txn_by_tablet_version(123, 100);
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp
index acc28969581..1eb023a01ac 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -332,7 +332,7 @@ protected:
UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F, 0, enable_unique_key_merge_on_write));
- TabletSharedPtr tablet(new Tablet(tablet_meta, _data_dir));
+ TabletSharedPtr tablet(new Tablet(*k_engine, tablet_meta, _data_dir));
static_cast<void>(tablet->init());
bool exists = false;
auto res = io::global_local_filesystem()->exists(tablet->tablet_path(), &exists);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org