You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/08 09:24:33 UTC
[doris] branch branch-2.0-beta updated: [bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0-beta by this push:
new dd9d8fdc7c [bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437)
dd9d8fdc7c is described below
commit dd9d8fdc7ceeea9468a5cba9861ded61f56a0e3d
Author: plat1ko <pl...@gmail.com>
AuthorDate: Thu Jun 8 15:35:35 2023 +0800
[bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437)
---
be/src/agent/task_worker_pool.cpp | 4 +-
be/src/common/status.h | 2 +
be/src/olap/rowset/rowset_meta.h | 4 --
be/src/olap/snapshot_manager.cpp | 45 +++++++++++++++++---
be/src/olap/tablet.cpp | 66 ++++++++++++++++++++++-------
be/src/olap/tablet.h | 21 ++-------
be/test/io/cache/remote_file_cache_test.cpp | 1 -
be/test/olap/rowset/beta_rowset_test.cpp | 3 --
8 files changed, 99 insertions(+), 47 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index d830d592e2..a30a62e9aa 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1180,7 +1180,9 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
continue;
}
if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
- cooldown_conf.cooldown_replica_id)) {
+ cooldown_conf.cooldown_replica_id) &&
+ cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
+ tablet->tablet_meta()->cooldown_meta_id().initialized()) {
Tablet::async_write_cooldown_meta(tablet);
}
}
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 569d542d25..538b7de24f 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -423,6 +423,8 @@ public:
return code == _code;
}
+ void set_code(int code) { _code = code; }
+
bool ok() const { return _code == ErrorCode::OK; }
bool is_io_error() const {
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index b132c40db2..6736c53cac 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -109,10 +109,6 @@ public:
const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); }
- void set_resource_id(std::string resource_id) {
- _rowset_meta_pb.set_resource_id(std::move(resource_id));
- }
-
bool is_local() const { return !_rowset_meta_pb.has_resource_id(); }
RowsetId rowset_id() const { return _rowset_id; }
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index d41d5561c0..d19af18cca 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -347,6 +347,23 @@ Status SnapshotManager::_link_index_and_data_files(
return res;
}
+// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
+Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) {
+ if (rowsets.size() < 2) {
+ return Status::OK();
+ }
+ auto prev = rowsets.begin();
+ for (auto it = rowsets.begin() + 1; it != rowsets.end(); ++it) {
+ if ((*prev)->end_version() + 1 != (*it)->start_version()) {
+ return Status::InternalError("versions are not continuity: prev={} cur={}",
+ (*prev)->version().to_string(),
+ (*it)->version().to_string());
+ }
+ prev = it;
+ }
+ return Status::OK();
+}
+
Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet,
const TSnapshotRequest& request,
string* snapshot_path,
@@ -493,11 +510,29 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
}
version = request.version;
}
- // get shortest version path
- // it very important!!!!
- // it means 0-version has to be a readable version graph
- res = ref_tablet->capture_consistent_rowsets(Version(0, version),
- &consistent_rowsets);
+ if (ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) {
+ // Tablet has cooldowned data, MUST pick consistent rowsets with continuous cooldowned version
+ // Get max cooldowned version
+ int64_t max_cooldowned_version = -1;
+ for (auto& [v, rs] : ref_tablet->rowset_map()) {
+ if (rs->is_local()) continue;
+ consistent_rowsets.push_back(rs);
+ max_cooldowned_version = std::max(max_cooldowned_version, v.second);
+ }
+ DCHECK_GE(max_cooldowned_version, 1) << "tablet_id=" << ref_tablet->tablet_id();
+ std::sort(consistent_rowsets.begin(), consistent_rowsets.end(),
+ Rowset::comparator);
+ res = check_version_continuity(consistent_rowsets);
+ if (res.ok() && max_cooldowned_version < version) {
+ // Pick consistent rowsets of remaining required version
+ res = ref_tablet->capture_consistent_rowsets(
+ {max_cooldowned_version + 1, version}, &consistent_rowsets);
+ }
+ } else {
+ // get shortest version path
+ res = ref_tablet->capture_consistent_rowsets(Version(0, version),
+ &consistent_rowsets);
+ }
if (!res.ok()) {
LOG(WARNING) << "fail to select versions to span. res=" << res;
break;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 170041d268..1fc69ac677 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -148,15 +148,19 @@ bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute(
struct WriteCooldownMetaExecutors {
WriteCooldownMetaExecutors(size_t executor_nums = 5);
- static WriteCooldownMetaExecutors* GetInstance() {
+ static WriteCooldownMetaExecutors* get_instance() {
static WriteCooldownMetaExecutors instance;
return &instance;
}
void submit(TabletSharedPtr tablet);
- size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; };
+ size_t _get_executor_pos(int64_t tablet_id) const {
+ return std::hash<int64_t>()(tablet_id) % _executor_nums;
+ };
+ // Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
+ // FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
std::vector<std::unique_ptr<ThreadPool>> _executors;
- std::unordered_set<int64_t> _pengding_tablets;
+ std::unordered_set<int64_t> _pending_tablets;
std::mutex _latch;
size_t _executor_nums;
};
@@ -165,7 +169,7 @@ WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums)
: _executor_nums(executor_nums) {
for (size_t i = 0; i < _executor_nums; i++) {
std::unique_ptr<ThreadPool> pool;
- ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor")
+ ThreadPoolBuilder("WriteCooldownMetaExecutor")
.set_min_threads(1)
.set_max_threads(1)
.set_max_queue_size(std::numeric_limits<int>::max())
@@ -187,16 +191,16 @@ void WriteCooldownMetaExecutors::WriteCooldownMetaExecutors::submit(TabletShared
{
// one tablet could at most have one cooldown task to be done
std::unique_lock<std::mutex> lck {_latch};
- if (_pengding_tablets.count(tablet_id) > 0) {
+ if (_pending_tablets.count(tablet_id) > 0) {
return;
}
- _pengding_tablets.insert(tablet_id);
+ _pending_tablets.insert(tablet_id);
}
auto async_write_task = [this, t = std::move(tablet)]() {
{
std::unique_lock<std::mutex> lck {_latch};
- _pengding_tablets.erase(t->tablet_id());
+ _pending_tablets.erase(t->tablet_id());
}
auto s = t->write_cooldown_meta();
if (s.ok()) {
@@ -898,7 +902,7 @@ Status Tablet::capture_consistent_rowsets(const Version& spec_version,
Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>& version_path,
std::vector<RowsetSharedPtr>* rowsets) const {
- DCHECK(rowsets != nullptr && rowsets->empty());
+ DCHECK(rowsets != nullptr);
rowsets->reserve(version_path.size());
for (auto& version : version_path) {
bool is_find = false;
@@ -1964,6 +1968,15 @@ Status Tablet::_cooldown_data() {
if (!old_rowset) {
return Status::InternalError("cannot pick cooldown rowset in tablet {}", tablet_id());
}
+ if (old_rowset->num_segments() < 1) {
+ // Empty rowset, just reset rowset's resource_id
+ std::lock_guard meta_wlock(_meta_lock);
+ old_rowset->rowset_meta()->set_fs(dest_fs);
+ LOG(INFO) << "cooldown empty rowset " << old_rowset->version() << " "
+ << old_rowset->rowset_id().to_string() << " to " << dest_fs->root_path().native()
+ << ", tablet_id=" << tablet_id();
+ return Status::OK();
+ }
RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
add_pending_remote_rowset(new_rowset_id.to_string());
Status st;
@@ -1988,7 +2001,6 @@ Status Tablet::_cooldown_data() {
// gen a new rowset
auto new_rowset_meta = std::make_shared<RowsetMeta>(*old_rowset->rowset_meta());
new_rowset_meta->set_rowset_id(new_rowset_id);
- new_rowset_meta->set_resource_id(dest_fs->id());
new_rowset_meta->set_fs(dest_fs);
new_rowset_meta->set_creation_time(time(nullptr));
UniqueId cooldown_meta_id = UniqueId::gen_uid();
@@ -2006,7 +2018,7 @@ Status Tablet::_cooldown_data() {
}
erase_pending_remote_rowset(new_rowset_id.to_string());
{
- std::unique_lock meta_rlock(_meta_lock);
+ std::shared_lock meta_rlock(_meta_lock);
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
save_meta();
}
@@ -2053,12 +2065,30 @@ Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& rs_metas
// It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes
// one tablet would at most have one async task to be done
void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) {
- WriteCooldownMetaExecutors::GetInstance()->submit(std::move(tablet));
+ WriteCooldownMetaExecutors::get_instance()->submit(std::move(tablet));
+}
+
+bool Tablet::update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) {
+ std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock);
+ if (!wlock.owns_lock()) {
+ LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id();
+ return false;
+ }
+ if (cooldown_term <= _cooldown_term) return false;
+ LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
+ << " cooldown_replica_id: " << _cooldown_replica_id << " -> " << cooldown_replica_id
+ << ", cooldown_term: " << _cooldown_term << " -> " << cooldown_term;
+ _cooldown_replica_id = cooldown_replica_id;
+ _cooldown_term = cooldown_term;
+ return true;
}
-// hold SHARED `cooldown_conf_lock`
Status Tablet::write_cooldown_meta() {
- auto [cooldown_replica_id, cooldown_term] = cooldown_conf();
+ std::shared_lock rlock(_cooldown_conf_lock);
+ if (_cooldown_replica_id != _tablet_meta->replica_id()) {
+ return Status::Aborted("not cooldown replcia({} vs {}) tablet_id={}",
+ _tablet_meta->replica_id(), _cooldown_replica_id, tablet_id());
+ }
std::shared_ptr<io::RemoteFileSystem> fs;
RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
@@ -2080,7 +2110,12 @@ Status Tablet::write_cooldown_meta() {
}
std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), RowsetMeta::comparator);
DCHECK(cooldowned_rs_metas.front()->start_version() == 0);
- RETURN_IF_ERROR(check_version_continuity(cooldowned_rs_metas));
+ // If version not continuous, it must be a bug
+ if (auto st = check_version_continuity(cooldowned_rs_metas); !st.ok()) {
+ DCHECK(st.ok()) << st << " tablet_id=" << tablet_id();
+ st.set_code(ABORTED);
+ return st;
+ }
TabletMetaPB tablet_meta_pb;
auto rs_metas = tablet_meta_pb.mutable_rs_metas();
@@ -2092,8 +2127,9 @@ Status Tablet::write_cooldown_meta() {
tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
std::string remote_meta_path =
- remote_tablet_meta_path(tablet_id(), cooldown_replica_id, cooldown_term);
+ remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term);
io::FileWriterPtr tablet_meta_writer;
+ // FIXME(plat1ko): What if object store permanently unavailable?
RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
auto val = tablet_meta_pb.SerializeAsString();
RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()}));
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 6608f5864d..7ade2fa5a4 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -364,24 +364,8 @@ public:
return {_cooldown_replica_id, _cooldown_term};
}
- // return true if update success
- bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) {
- std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock);
- if (!wlock.owns_lock()) {
- LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id();
- return false;
- }
- if (cooldown_term > _cooldown_term) {
- LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
- << " cooldown_replica_id: " << _cooldown_replica_id << " -> "
- << cooldown_replica_id << ", cooldown_term: " << _cooldown_term << " -> "
- << cooldown_term;
- _cooldown_replica_id = cooldown_replica_id;
- _cooldown_term = cooldown_term;
- return true;
- }
- return false;
- }
+ // Return `true` if update success
+ bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id);
Status remove_all_remote_rowsets();
@@ -403,6 +387,7 @@ public:
std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; }
static void async_write_cooldown_meta(TabletSharedPtr tablet);
+ // Return `ABORTED` if should not to retry again
Status write_cooldown_meta();
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
diff --git a/be/test/io/cache/remote_file_cache_test.cpp b/be/test/io/cache/remote_file_cache_test.cpp
index 5c1b932cc4..18fcb6de14 100644
--- a/be/test/io/cache/remote_file_cache_test.cpp
+++ b/be/test/io/cache/remote_file_cache_test.cpp
@@ -170,7 +170,6 @@ protected:
// io::S3FileSystem::create will call connect, which will fail because s3_conf is empty.
// but it does affect the following unit test
ASSERT_FALSE(st.ok()) << st;
- rowset.rowset_meta()->set_resource_id(resource_id);
rowset.rowset_meta()->set_num_segments(1);
rowset.rowset_meta()->set_fs(fs);
rowset.rowset_meta()->set_tablet_id(tablet_id);
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp
index 09c5019dfb..80e540f8ba 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -269,7 +269,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true));
rowset.rowset_meta()->set_num_segments(1);
- rowset.rowset_meta()->set_resource_id(resource_id);
rowset.rowset_meta()->set_fs(fs);
std::vector<segment_v2::SegmentSharedPtr> segments;
@@ -284,7 +283,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
fs->_client.reset(new S3ClientMockGetError());
rowset.rowset_meta()->set_num_segments(1);
- rowset.rowset_meta()->set_resource_id(resource_id);
rowset.rowset_meta()->set_fs(fs);
std::vector<segment_v2::SegmentSharedPtr> segments;
@@ -299,7 +297,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
fs->_client.reset(new S3ClientMockGetErrorData());
rowset.rowset_meta()->set_num_segments(1);
- rowset.rowset_meta()->set_resource_id(resource_id);
rowset.rowset_meta()->set_fs(fs);
std::vector<segment_v2::SegmentSharedPtr> segments;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org