You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/08 09:24:33 UTC

[doris] branch branch-2.0-beta updated: [bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0-beta by this push:
     new dd9d8fdc7c [bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437)
dd9d8fdc7c is described below

commit dd9d8fdc7ceeea9468a5cba9861ded61f56a0e3d
Author: plat1ko <pl...@gmail.com>
AuthorDate: Thu Jun 8 15:35:35 2023 +0800

    [bug](cooldown) Fix async_write_cooldown_meta and snapshot cooldowned version not continuous bug (#20437)
---
 be/src/agent/task_worker_pool.cpp           |  4 +-
 be/src/common/status.h                      |  2 +
 be/src/olap/rowset/rowset_meta.h            |  4 --
 be/src/olap/snapshot_manager.cpp            | 45 +++++++++++++++++---
 be/src/olap/tablet.cpp                      | 66 ++++++++++++++++++++++-------
 be/src/olap/tablet.h                        | 21 ++-------
 be/test/io/cache/remote_file_cache_test.cpp |  1 -
 be/test/olap/rowset/beta_rowset_test.cpp    |  3 --
 8 files changed, 99 insertions(+), 47 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index d830d592e2..a30a62e9aa 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1180,7 +1180,9 @@ void TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
                 continue;
             }
             if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
-                                             cooldown_conf.cooldown_replica_id)) {
+                                             cooldown_conf.cooldown_replica_id) &&
+                cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
+                tablet->tablet_meta()->cooldown_meta_id().initialized()) {
                 Tablet::async_write_cooldown_meta(tablet);
             }
         }
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 569d542d25..538b7de24f 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -423,6 +423,8 @@ public:
         return code == _code;
     }
 
+    void set_code(int code) { _code = code; }
+
     bool ok() const { return _code == ErrorCode::OK; }
 
     bool is_io_error() const {
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index b132c40db2..6736c53cac 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -109,10 +109,6 @@ public:
 
     const std::string& resource_id() const { return _rowset_meta_pb.resource_id(); }
 
-    void set_resource_id(std::string resource_id) {
-        _rowset_meta_pb.set_resource_id(std::move(resource_id));
-    }
-
     bool is_local() const { return !_rowset_meta_pb.has_resource_id(); }
 
     RowsetId rowset_id() const { return _rowset_id; }
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index d41d5561c0..d19af18cca 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -347,6 +347,23 @@ Status SnapshotManager::_link_index_and_data_files(
     return res;
 }
 
+// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
+Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) {
+    if (rowsets.size() < 2) {
+        return Status::OK();
+    }
+    auto prev = rowsets.begin();
+    for (auto it = rowsets.begin() + 1; it != rowsets.end(); ++it) {
+        if ((*prev)->end_version() + 1 != (*it)->start_version()) {
+            return Status::InternalError("versions are not continuity: prev={} cur={}",
+                                         (*prev)->version().to_string(),
+                                         (*it)->version().to_string());
+        }
+        prev = it;
+    }
+    return Status::OK();
+}
+
 Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet,
                                                const TSnapshotRequest& request,
                                                string* snapshot_path,
@@ -493,11 +510,29 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
                     }
                     version = request.version;
                 }
-                // get shortest version path
-                // it very important!!!!
-                // it means 0-version has to be a readable version graph
-                res = ref_tablet->capture_consistent_rowsets(Version(0, version),
-                                                             &consistent_rowsets);
+                if (ref_tablet->tablet_meta()->cooldown_meta_id().initialized()) {
+                    // Tablet has cooldowned data, MUST pick consistent rowsets with continuous cooldowned version
+                    // Get max cooldowned version
+                    int64_t max_cooldowned_version = -1;
+                    for (auto& [v, rs] : ref_tablet->rowset_map()) {
+                        if (rs->is_local()) continue;
+                        consistent_rowsets.push_back(rs);
+                        max_cooldowned_version = std::max(max_cooldowned_version, v.second);
+                    }
+                    DCHECK_GE(max_cooldowned_version, 1) << "tablet_id=" << ref_tablet->tablet_id();
+                    std::sort(consistent_rowsets.begin(), consistent_rowsets.end(),
+                              Rowset::comparator);
+                    res = check_version_continuity(consistent_rowsets);
+                    if (res.ok() && max_cooldowned_version < version) {
+                        // Pick consistent rowsets of remaining required version
+                        res = ref_tablet->capture_consistent_rowsets(
+                                {max_cooldowned_version + 1, version}, &consistent_rowsets);
+                    }
+                } else {
+                    // get shortest version path
+                    res = ref_tablet->capture_consistent_rowsets(Version(0, version),
+                                                                 &consistent_rowsets);
+                }
                 if (!res.ok()) {
                     LOG(WARNING) << "fail to select versions to span. res=" << res;
                     break;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 170041d268..1fc69ac677 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -148,15 +148,19 @@ bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute(
 struct WriteCooldownMetaExecutors {
     WriteCooldownMetaExecutors(size_t executor_nums = 5);
 
-    static WriteCooldownMetaExecutors* GetInstance() {
+    static WriteCooldownMetaExecutors* get_instance() {
         static WriteCooldownMetaExecutors instance;
         return &instance;
     }
 
     void submit(TabletSharedPtr tablet);
-    size_t _get_executor_pos(int64_t tablet_id) const { return tablet_id % _executor_nums; };
+    size_t _get_executor_pos(int64_t tablet_id) const {
+        return std::hash<int64_t>()(tablet_id) % _executor_nums;
+    };
+    // Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
+    // FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
     std::vector<std::unique_ptr<ThreadPool>> _executors;
-    std::unordered_set<int64_t> _pengding_tablets;
+    std::unordered_set<int64_t> _pending_tablets;
     std::mutex _latch;
     size_t _executor_nums;
 };
@@ -165,7 +169,7 @@ WriteCooldownMetaExecutors::WriteCooldownMetaExecutors(size_t executor_nums)
         : _executor_nums(executor_nums) {
     for (size_t i = 0; i < _executor_nums; i++) {
         std::unique_ptr<ThreadPool> pool;
-        ThreadPoolBuilder("AsyncWriteCooldownMetaExecutor")
+        ThreadPoolBuilder("WriteCooldownMetaExecutor")
                 .set_min_threads(1)
                 .set_max_threads(1)
                 .set_max_queue_size(std::numeric_limits<int>::max())
@@ -187,16 +191,16 @@ void WriteCooldownMetaExecutors::WriteCooldownMetaExecutors::submit(TabletShared
     {
         // one tablet could at most have one cooldown task to be done
         std::unique_lock<std::mutex> lck {_latch};
-        if (_pengding_tablets.count(tablet_id) > 0) {
+        if (_pending_tablets.count(tablet_id) > 0) {
             return;
         }
-        _pengding_tablets.insert(tablet_id);
+        _pending_tablets.insert(tablet_id);
     }
 
     auto async_write_task = [this, t = std::move(tablet)]() {
         {
             std::unique_lock<std::mutex> lck {_latch};
-            _pengding_tablets.erase(t->tablet_id());
+            _pending_tablets.erase(t->tablet_id());
         }
         auto s = t->write_cooldown_meta();
         if (s.ok()) {
@@ -898,7 +902,7 @@ Status Tablet::capture_consistent_rowsets(const Version& spec_version,
 
 Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>& version_path,
                                                     std::vector<RowsetSharedPtr>* rowsets) const {
-    DCHECK(rowsets != nullptr && rowsets->empty());
+    DCHECK(rowsets != nullptr);
     rowsets->reserve(version_path.size());
     for (auto& version : version_path) {
         bool is_find = false;
@@ -1964,6 +1968,15 @@ Status Tablet::_cooldown_data() {
     if (!old_rowset) {
         return Status::InternalError("cannot pick cooldown rowset in tablet {}", tablet_id());
     }
+    if (old_rowset->num_segments() < 1) {
+        // Empty rowset, just reset rowset's resource_id
+        std::lock_guard meta_wlock(_meta_lock);
+        old_rowset->rowset_meta()->set_fs(dest_fs);
+        LOG(INFO) << "cooldown empty rowset " << old_rowset->version() << " "
+                  << old_rowset->rowset_id().to_string() << " to " << dest_fs->root_path().native()
+                  << ", tablet_id=" << tablet_id();
+        return Status::OK();
+    }
     RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
     add_pending_remote_rowset(new_rowset_id.to_string());
     Status st;
@@ -1988,7 +2001,6 @@ Status Tablet::_cooldown_data() {
     // gen a new rowset
     auto new_rowset_meta = std::make_shared<RowsetMeta>(*old_rowset->rowset_meta());
     new_rowset_meta->set_rowset_id(new_rowset_id);
-    new_rowset_meta->set_resource_id(dest_fs->id());
     new_rowset_meta->set_fs(dest_fs);
     new_rowset_meta->set_creation_time(time(nullptr));
     UniqueId cooldown_meta_id = UniqueId::gen_uid();
@@ -2006,7 +2018,7 @@ Status Tablet::_cooldown_data() {
     }
     erase_pending_remote_rowset(new_rowset_id.to_string());
     {
-        std::unique_lock meta_rlock(_meta_lock);
+        std::shared_lock meta_rlock(_meta_lock);
         SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
         save_meta();
     }
@@ -2053,12 +2065,30 @@ Status check_version_continuity(const std::vector<RowsetMetaSharedPtr>& rs_metas
 // It's guaranteed the write cooldown meta task would be invoked at the end unless BE crashes
 // one tablet would at most have one async task to be done
 void Tablet::async_write_cooldown_meta(TabletSharedPtr tablet) {
-    WriteCooldownMetaExecutors::GetInstance()->submit(std::move(tablet));
+    WriteCooldownMetaExecutors::get_instance()->submit(std::move(tablet));
+}
+
+bool Tablet::update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) {
+    std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock);
+    if (!wlock.owns_lock()) {
+        LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id();
+        return false;
+    }
+    if (cooldown_term <= _cooldown_term) return false;
+    LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
+              << " cooldown_replica_id: " << _cooldown_replica_id << " -> " << cooldown_replica_id
+              << ", cooldown_term: " << _cooldown_term << " -> " << cooldown_term;
+    _cooldown_replica_id = cooldown_replica_id;
+    _cooldown_term = cooldown_term;
+    return true;
 }
 
-// hold SHARED `cooldown_conf_lock`
 Status Tablet::write_cooldown_meta() {
-    auto [cooldown_replica_id, cooldown_term] = cooldown_conf();
+    std::shared_lock rlock(_cooldown_conf_lock);
+    if (_cooldown_replica_id != _tablet_meta->replica_id()) {
+        return Status::Aborted("not cooldown replcia({} vs {}) tablet_id={}",
+                               _tablet_meta->replica_id(), _cooldown_replica_id, tablet_id());
+    }
 
     std::shared_ptr<io::RemoteFileSystem> fs;
     RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
@@ -2080,7 +2110,12 @@ Status Tablet::write_cooldown_meta() {
     }
     std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), RowsetMeta::comparator);
     DCHECK(cooldowned_rs_metas.front()->start_version() == 0);
-    RETURN_IF_ERROR(check_version_continuity(cooldowned_rs_metas));
+    // If version not continuous, it must be a bug
+    if (auto st = check_version_continuity(cooldowned_rs_metas); !st.ok()) {
+        DCHECK(st.ok()) << st << " tablet_id=" << tablet_id();
+        st.set_code(ABORTED);
+        return st;
+    }
 
     TabletMetaPB tablet_meta_pb;
     auto rs_metas = tablet_meta_pb.mutable_rs_metas();
@@ -2092,8 +2127,9 @@ Status Tablet::write_cooldown_meta() {
     tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
 
     std::string remote_meta_path =
-            remote_tablet_meta_path(tablet_id(), cooldown_replica_id, cooldown_term);
+            remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term);
     io::FileWriterPtr tablet_meta_writer;
+    // FIXME(plat1ko): What if object store permanently unavailable?
     RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
     auto val = tablet_meta_pb.SerializeAsString();
     RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()}));
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 6608f5864d..7ade2fa5a4 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -364,24 +364,8 @@ public:
         return {_cooldown_replica_id, _cooldown_term};
     }
 
-    // return true if update success
-    bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id) {
-        std::unique_lock wlock(_cooldown_conf_lock, std::try_to_lock);
-        if (!wlock.owns_lock()) {
-            LOG(INFO) << "try cooldown_conf_lock failed, tablet_id=" << tablet_id();
-            return false;
-        }
-        if (cooldown_term > _cooldown_term) {
-            LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
-                      << " cooldown_replica_id: " << _cooldown_replica_id << " -> "
-                      << cooldown_replica_id << ", cooldown_term: " << _cooldown_term << " -> "
-                      << cooldown_term;
-            _cooldown_replica_id = cooldown_replica_id;
-            _cooldown_term = cooldown_term;
-            return true;
-        }
-        return false;
-    }
+    // Return `true` if update success
+    bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id);
 
     Status remove_all_remote_rowsets();
 
@@ -403,6 +387,7 @@ public:
     std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; }
 
     static void async_write_cooldown_meta(TabletSharedPtr tablet);
+    // Return `ABORTED` if should not to retry again
     Status write_cooldown_meta();
     ////////////////////////////////////////////////////////////////////////////
     // end cooldown functions
diff --git a/be/test/io/cache/remote_file_cache_test.cpp b/be/test/io/cache/remote_file_cache_test.cpp
index 5c1b932cc4..18fcb6de14 100644
--- a/be/test/io/cache/remote_file_cache_test.cpp
+++ b/be/test/io/cache/remote_file_cache_test.cpp
@@ -170,7 +170,6 @@ protected:
         // io::S3FileSystem::create will call connect, which will fail because s3_conf is empty.
         // but it does affect the following unit test
         ASSERT_FALSE(st.ok()) << st;
-        rowset.rowset_meta()->set_resource_id(resource_id);
         rowset.rowset_meta()->set_num_segments(1);
         rowset.rowset_meta()->set_fs(fs);
         rowset.rowset_meta()->set_tablet_id(tablet_id);
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp
index 09c5019dfb..80e540f8ba 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -269,7 +269,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
                                  Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, true));
 
         rowset.rowset_meta()->set_num_segments(1);
-        rowset.rowset_meta()->set_resource_id(resource_id);
         rowset.rowset_meta()->set_fs(fs);
 
         std::vector<segment_v2::SegmentSharedPtr> segments;
@@ -284,7 +283,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
         fs->_client.reset(new S3ClientMockGetError());
 
         rowset.rowset_meta()->set_num_segments(1);
-        rowset.rowset_meta()->set_resource_id(resource_id);
         rowset.rowset_meta()->set_fs(fs);
 
         std::vector<segment_v2::SegmentSharedPtr> segments;
@@ -299,7 +297,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
         fs->_client.reset(new S3ClientMockGetErrorData());
 
         rowset.rowset_meta()->set_num_segments(1);
-        rowset.rowset_meta()->set_resource_id(resource_id);
         rowset.rowset_meta()->set_fs(fs);
 
         std::vector<segment_v2::SegmentSharedPtr> segments;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org