You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/22 13:50:27 UTC

[doris] branch master updated: [enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 691a988c97 [enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)
691a988c97 is described below

commit 691a988c971f78ed3070d9f97f1a9f4cb521bee0
Author: Xin Liao <li...@126.com>
AuthorDate: Thu Jun 22 21:50:14 2023 +0800

    [enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)
    
    version discontinuity may occur when clone. To deal with this case, add async publish task when version is discontinuous.
---
 be/src/agent/task_worker_pool.cpp                |  9 ++-
 be/src/olap/data_dir.cpp                         | 15 ++++
 be/src/olap/olap_server.cpp                      | 98 ++++++++++++++++++++++++
 be/src/olap/rowset/rowset.h                      |  7 ++
 be/src/olap/storage_engine.cpp                   |  4 +
 be/src/olap/storage_engine.h                     | 12 +++
 be/src/olap/tablet_meta_manager.cpp              | 39 ++++++++++
 be/src/olap/tablet_meta_manager.h                | 12 +++
 be/src/olap/task/engine_clone_task.cpp           | 16 +++-
 be/src/olap/task/engine_publish_version_task.cpp | 63 ++++++++++++++-
 be/src/olap/task/engine_publish_version_task.h   | 30 +++++++-
 gensrc/proto/olap_file.proto                     |  5 ++
 12 files changed, 302 insertions(+), 8 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index a91540b445..463845d2c7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1448,13 +1448,15 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
 
         std::vector<TTabletId> error_tablet_ids;
         std::vector<TTabletId> succ_tablet_ids;
+        // partition_id, tablet_id, publish_version
+        std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
         uint32_t retry_time = 0;
         Status status;
         bool is_task_timeout = false;
         while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
             error_tablet_ids.clear();
             EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
-                                                 &succ_tablet_ids);
+                                                 &succ_tablet_ids, &discontinuous_version_tablets);
             status = _env->storage_engine()->execute_task(&engine_task);
             if (status.ok()) {
                 break;
@@ -1488,6 +1490,11 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
             continue;
         }
 
+        for (auto& item : discontinuous_version_tablets) {
+            StorageEngine::instance()->add_async_publish_task(
+                    std::get<0>(item), std::get<1>(item), std::get<2>(item),
+                    publish_version_req.transaction_id, false);
+        }
         TFinishTaskRequest finish_task_request;
         if (!status) {
             DorisMetrics::instance()->publish_task_failed_total->increment(1);
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index b83ce43635..970138cd0e 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -452,6 +452,21 @@ Status DataDir::load() {
         }
     }
 
+    auto load_pending_publish_info_func = [](int64_t tablet_id, int64_t publish_version,
+                                             const string& info) {
+        PendingPublishInfoPB pending_publish_info_pb;
+        bool parsed = pending_publish_info_pb.ParseFromString(info);
+        if (!parsed) {
+            LOG(WARNING) << "parse pending publish info failed, tablt_id: " << tablet_id
+                         << " publish_version: " << publish_version;
+        }
+        StorageEngine::instance()->add_async_publish_task(
+                pending_publish_info_pb.partition_id(), tablet_id, publish_version,
+                pending_publish_info_pb.transaction_id(), true);
+        return true;
+    };
+    TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func);
+
     // traverse rowset
     // 1. add committed rowset to txn map
     // 2. add visible rowset to tablet
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 2fab94a2bf..57bf40a147 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <gen_cpp/Types_types.h>
+#include <gen_cpp/olap_file.pb.h>
 #include <stdint.h>
 
 #include <algorithm>
@@ -60,7 +61,9 @@
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
+#include "olap/tablet_meta_manager.h"
 #include "olap/tablet_schema.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "olap/task/index_builder.h"
 #include "runtime/client_cache.h"
 #include "service/brpc.h"
@@ -245,6 +248,11 @@ Status StorageEngine::start_bg_threads() {
             .set_max_threads(config::calc_delete_bitmap_max_thread)
             .build(&_calc_delete_bitmap_thread_pool);
 
+    RETURN_IF_ERROR(Thread::create(
+            "StorageEngine", "aync_publish_version_thread",
+            [this]() { this->_async_publish_callback(); }, &_async_publish_thread));
+    LOG(INFO) << "async publish thread started";
+
     LOG(INFO) << "all storage engine's background threads are started.";
     return Status::OK();
 }
@@ -1204,4 +1212,94 @@ void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
     } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
 }
 
+void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
+                                           int64_t publish_version, int64_t transaction_id,
+                                           bool is_recovery) {
+    if (!is_recovery) {
+        TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
+        PendingPublishInfoPB pending_publish_info_pb;
+        pending_publish_info_pb.set_partition_id(partition_id);
+        pending_publish_info_pb.set_transaction_id(transaction_id);
+        TabletMetaManager::save_pending_publish_info(tablet->data_dir(), tablet->tablet_id(),
+                                                     publish_version,
+                                                     pending_publish_info_pb.SerializeAsString());
+    }
+    LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
+              << " version: " << publish_version << " txn_id:" << transaction_id
+              << " is_recovery: " << is_recovery;
+    std::lock_guard<std::mutex> lock(_async_publish_mutex);
+    _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
+}
+
+int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
+    std::lock_guard<std::mutex> lock(_async_publish_mutex);
+    auto iter = _async_publish_tasks.find(tablet_id);
+    if (iter == _async_publish_tasks.end()) {
+        return INT64_MAX;
+    }
+    if (iter->second.empty()) {
+        return INT64_MAX;
+    }
+    return iter->second.begin()->first;
+}
+
+void StorageEngine::_async_publish_callback() {
+    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
+        // tablet, publish_version
+        std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
+        {
+            std::lock_guard<std::mutex> lock(_async_publish_mutex);
+            for (auto tablet_iter = _async_publish_tasks.begin();
+                 tablet_iter != _async_publish_tasks.end();) {
+                if (tablet_iter->second.empty()) {
+                    tablet_iter = _async_publish_tasks.erase(tablet_iter);
+                    continue;
+                }
+                int64_t tablet_id = tablet_iter->first;
+                TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
+                if (!tablet) {
+                    LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
+                                 << tablet_id;
+                    // TODO(liaoxin) remove pending publish info from db
+                    tablet_iter = _async_publish_tasks.erase(tablet_iter);
+                    continue;
+                }
+
+                auto task_iter = tablet_iter->second.begin();
+                int64_t version = task_iter->first;
+                int64_t transaction_id = task_iter->second.first;
+                int64_t partition_id = task_iter->second.second;
+                int64_t max_version;
+                {
+                    std::shared_lock rdlock(tablet->get_header_lock());
+                    max_version = tablet->max_version().second;
+                }
+
+                if (version <= max_version) {
+                    need_removed_tasks.emplace_back(tablet, version);
+                    tablet_iter->second.erase(task_iter);
+                    tablet_iter++;
+                    continue;
+                }
+                if (version != max_version + 1) {
+                    tablet_iter++;
+                    continue;
+                }
+
+                auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
+                        tablet, partition_id, transaction_id, version);
+                StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
+                        [=]() { async_publish_task->handle(); });
+                tablet_iter->second.erase(task_iter);
+                need_removed_tasks.emplace_back(tablet, version);
+                tablet_iter++;
+            }
+        }
+        for (auto& [tablet, publish_version] : need_removed_tasks) {
+            TabletMetaManager::remove_pending_publish_info(tablet->data_dir(), tablet->tablet_id(),
+                                                           publish_version);
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 7160af3424..8829cc0770 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -299,6 +299,12 @@ public:
 
     bool check_rowset_segment();
 
+    bool start_publish() {
+        bool expect = false;
+        return _is_publish_running.compare_exchange_strong(expect, true);
+    }
+    void finish_publish() { _is_publish_running.store(false); }
+
     [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
 
 protected:
@@ -337,6 +343,7 @@ protected:
     // rowset state machine
     RowsetStateMachine _rowset_state_machine;
     std::atomic<uint64_t> _delayed_expired_timestamp = 0;
+    std::atomic<bool> _is_publish_running {false};
 };
 
 } // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 94f1bb5a87..d10c11b1c2 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -162,6 +162,9 @@ StorageEngine::~StorageEngine() {
     if (_tablet_meta_checkpoint_thread_pool) {
         _tablet_meta_checkpoint_thread_pool->shutdown();
     }
+    if (_calc_delete_bitmap_thread_pool) {
+        _calc_delete_bitmap_thread_pool->shutdown();
+    }
     _clear();
     _s_instance = nullptr;
 }
@@ -556,6 +559,7 @@ void StorageEngine::stop() {
     THREAD_JOIN(_disk_stat_monitor_thread);
     THREAD_JOIN(_fd_cache_clean_thread);
     THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
+    THREAD_JOIN(_async_publish_thread);
 #undef THREAD_JOIN
 
 #define THREADS_JOIN(threads)            \
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 15b1a98a78..0113101565 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -220,6 +220,10 @@ public:
 
     void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
 
+    void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
+                                int64_t transaction_id, bool is_recover);
+    int64_t get_pending_publish_min_version(int64_t tablet_id);
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
@@ -318,6 +322,8 @@ private:
 
     void _gc_binlogs(int64_t tablet_id, int64_t version);
 
+    void _async_publish_callback();
+
 private:
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@@ -454,6 +460,12 @@ private:
     std::mutex _running_cooldown_mutex;
     std::unordered_set<int64_t> _running_cooldown_tablets;
 
+    // tablet_id, publish_version, transaction_id, partition_id
+    std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
+    // aync publish for discontinuous versions of merge_on_write table
+    scoped_refptr<Thread> _async_publish_thread;
+    std::mutex _async_publish_mutex;
+
     DISALLOW_COPY_AND_ASSIGN(StorageEngine);
 };
 
diff --git a/be/src/olap/tablet_meta_manager.cpp b/be/src/olap/tablet_meta_manager.cpp
index ac4deacd14..d3bc41fa19 100644
--- a/be/src/olap/tablet_meta_manager.cpp
+++ b/be/src/olap/tablet_meta_manager.cpp
@@ -175,4 +175,43 @@ Status TabletMetaManager::load_json_meta(DataDir* store, const std::string& meta
     return save(store, tablet_id, schema_hash, meta_binary);
 }
 
+Status TabletMetaManager::save_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                                    int64_t publish_version,
+                                                    const std::string& meta_binary) {
+    std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
+    OlapMeta* meta = store->get_meta();
+    LOG(INFO) << "save pending publish rowset, key:" << key
+              << " meta_size=" << meta_binary.length();
+    return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary);
+}
+
+Status TabletMetaManager::remove_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                                      int64_t publish_version) {
+    std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
+    OlapMeta* meta = store->get_meta();
+    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
+    LOG(INFO) << "remove pending publish_info, key:" << key << ", res:" << res;
+    return res;
+}
+
+Status TabletMetaManager::traverse_pending_publish(
+        OlapMeta* meta, std::function<bool(int64_t, int64_t, const std::string&)> const& func) {
+    auto traverse_header_func = [&func](const std::string& key, const std::string& value) -> bool {
+        std::vector<std::string> parts;
+        // key format: "ppi_" + tablet_id + "_" + publish_version
+        split_string<char>(key, '_', &parts);
+        if (parts.size() != 3) {
+            LOG(WARNING) << "invalid pending publish info key:" << key
+                         << ", split size:" << parts.size();
+            return true;
+        }
+        int64_t tablet_id = std::stol(parts[1], nullptr, 10);
+        int64_t version = std::stol(parts[2], nullptr, 10);
+        return func(tablet_id, version, value);
+    };
+    Status status =
+            meta->iterate(META_COLUMN_FAMILY_INDEX, PENDING_PUBLISH_INFO, traverse_header_func);
+    return status;
+}
+
 } // namespace doris
diff --git a/be/src/olap/tablet_meta_manager.h b/be/src/olap/tablet_meta_manager.h
index 15d6763477..6ba1d76757 100644
--- a/be/src/olap/tablet_meta_manager.h
+++ b/be/src/olap/tablet_meta_manager.h
@@ -34,6 +34,8 @@ const std::string OLD_HEADER_PREFIX = "hdr_";
 
 const std::string HEADER_PREFIX = "tabletmeta_";
 
+const std::string PENDING_PUBLISH_INFO = "ppi_";
+
 // Helper Class for managing tablet headers of one root path.
 class TabletMetaManager {
 public:
@@ -57,6 +59,16 @@ public:
                                    const string& header_prefix = "tabletmeta_");
 
     static Status load_json_meta(DataDir* store, const std::string& meta_path);
+
+    static Status save_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                            int64_t publish_version,
+                                            const std::string& meta_binary);
+
+    static Status remove_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                              int64_t publish_version);
+
+    static Status traverse_pending_publish(
+            OlapMeta* meta, std::function<bool(int64_t, int64_t, const std::string&)> const& func);
 };
 
 } // namespace doris
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index 912b3aa4e5..845cbe1117 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -130,7 +130,19 @@ Status EngineCloneTask::_do_clone() {
         auto local_data_path = fmt::format("{}/{}", tablet->tablet_path(), CLONE_PREFIX);
         bool allow_incremental_clone = false;
 
-        tablet->calc_missed_versions(_clone_req.committed_version, &missed_versions);
+        int64_t specified_version = _clone_req.committed_version;
+        if (tablet->enable_unique_key_merge_on_write()) {
+            int64_t min_pending_ver =
+                    StorageEngine::instance()->get_pending_publish_min_version(tablet->tablet_id());
+            if (min_pending_ver - 1 < specified_version) {
+                LOG(INFO) << "use min pending publish version for clone, min_pending_ver: "
+                          << min_pending_ver
+                          << " committed_version: " << _clone_req.committed_version;
+                specified_version = min_pending_ver - 1;
+            }
+        }
+
+        tablet->calc_missed_versions(specified_version, &missed_versions);
 
         // if missed version size is 0, then it is useless to clone from remote be, it means local data is
         // completed. Or remote be will just return header not the rowset files. clone will failed.
@@ -153,7 +165,7 @@ Status EngineCloneTask::_do_clone() {
         RETURN_IF_ERROR(_make_and_download_snapshots(*(tablet->data_dir()), local_data_path,
                                                      &src_host, &src_file_path, missed_versions,
                                                      &allow_incremental_clone));
-        RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, _clone_req.committed_version,
+        RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, specified_version,
                                       allow_incremental_clone));
     } else {
         LOG(INFO) << "clone tablet not exist, begin clone a new tablet from remote be. "
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 748706e421..02af6bf674 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -70,11 +70,13 @@ void TabletPublishStatistics::record_in_bvar() {
 
 EnginePublishVersionTask::EnginePublishVersionTask(
         const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids,
-        std::vector<TTabletId>* succ_tablet_ids)
+        std::vector<TTabletId>* succ_tablet_ids,
+        std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
         : _total_task_num(0),
           _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
-          _succ_tablet_ids(succ_tablet_ids) {}
+          _succ_tablet_ids(succ_tablet_ids),
+          _discontinuous_version_tablets(discontinuous_version_tablets) {}
 
 void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
     std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
@@ -178,6 +180,8 @@ Status EnginePublishVersionTask::finish() {
                     // publish failed
                     if (!tablet->check_version_exist(version)) {
                         add_error_tablet_id(tablet_info.tablet_id);
+                        _discontinuous_version_tablets->emplace_back(
+                                partition_id, tablet_info.tablet_id, version.first);
                         res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>();
                     }
                     continue;
@@ -250,7 +254,14 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
 
 void TabletPublishTxnTask::handle() {
     _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
+    if (!_rowset->start_publish()) {
+        LOG(WARNING) << "publish is running. rowset_id=" << _rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
+        _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
+        return;
+    }
     Defer defer {[&] {
+        _rowset->finish_publish();
         if (_engine_publish_version_task->finish_task() == 1) {
             _engine_publish_version_task->notify();
         }
@@ -289,4 +300,52 @@ void TabletPublishTxnTask::handle() {
               << (cost_us > 500 * 1000 ? _stats.to_string() : "");
 }
 
+void AsyncTabletPublishTask::handle() {
+    _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+            _transaction_id, _partition_id, &tablet_related_rs);
+    auto iter = tablet_related_rs.find(
+            TabletInfo(_tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid()));
+    if (iter == tablet_related_rs.end()) {
+        return;
+    }
+    RowsetSharedPtr rowset = iter->second;
+    if (!rowset->start_publish()) {
+        LOG(WARNING) << "publish is running. rowset_id=" << rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
+        return;
+    }
+    Defer defer {[&] { rowset->finish_publish(); }};
+    Version version(_version, _version);
+    auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
+            _partition_id, _tablet, _transaction_id, version, &_stats);
+    if (publish_status != Status::OK()) {
+        LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
+                     << ", res=" << publish_status;
+        return;
+    }
+
+    // add visible rowset to tablet
+    int64_t t1 = MonotonicMicros();
+    publish_status = _tablet->add_inc_rowset(rowset);
+    _stats.add_inc_rowset_us = MonotonicMicros() - t1;
+    if (publish_status != Status::OK() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
+        LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
+                     << ", res=" << publish_status;
+        return;
+    }
+    int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
+    // print stats if publish cost > 500ms
+    g_tablet_publish_latency << cost_us;
+    _stats.record_in_bvar();
+    LOG(INFO) << "async publish version successfully on tablet, table_id=" << _tablet->table_id()
+              << ", tablet=" << _tablet->full_name() << ", transaction_id=" << _transaction_id
+              << ", version=" << _version << ", num_rows=" << rowset->num_rows()
+              << ", res=" << publish_status << ", cost: " << cost_us << "(us) "
+              << (cost_us > 500 * 1000 ? _stats.to_string() : "");
+}
+
 } // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index bd6907b913..c8a68dedea 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -82,9 +82,10 @@ private:
 
 class EnginePublishVersionTask : public EngineTask {
 public:
-    EnginePublishVersionTask(const TPublishVersionRequest& publish_version_req,
-                             vector<TTabletId>* error_tablet_ids,
-                             std::vector<TTabletId>* succ_tablet_ids = nullptr);
+    EnginePublishVersionTask(
+            const TPublishVersionRequest& publish_version_req, vector<TTabletId>* error_tablet_ids,
+            std::vector<TTabletId>* succ_tablet_ids,
+            std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets);
     ~EnginePublishVersionTask() {}
 
     virtual Status finish() override;
@@ -103,11 +104,34 @@ private:
     std::mutex _tablet_ids_mutex;
     vector<TTabletId>* _error_tablet_ids;
     vector<TTabletId>* _succ_tablet_ids;
+    std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
 
     std::mutex _tablet_finish_mutex;
     std::condition_variable _tablet_finish_cond;
 };
 
+class AsyncTabletPublishTask {
+public:
+    AsyncTabletPublishTask(TabletSharedPtr tablet, int64_t partition_id, int64_t transaction_id,
+                           int64_t version)
+            : _tablet(tablet),
+              _partition_id(partition_id),
+              _transaction_id(transaction_id),
+              _version(version) {
+        _stats.submit_time_us = MonotonicMicros();
+    }
+    ~AsyncTabletPublishTask() = default;
+
+    void handle();
+
+private:
+    TabletSharedPtr _tablet;
+    int64_t _partition_id;
+    int64_t _transaction_id;
+    int64_t _version;
+    TabletPublishStatistics _stats;
+};
+
 } // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_PUBLISH_VERSION_TASK_H
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 6a982fb58c..fe894dba9d 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -333,3 +333,8 @@ message BinlogMetaEntryPB {
     optional int64 creation_time = 5;
     optional string rowset_id_v2 = 6;
 }
+
+message PendingPublishInfoPB {
+    optional int64 partition_id = 1;
+    optional int64 transaction_id = 2;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org