You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/07/27 08:26:48 UTC

[doris] branch master updated: [feature-wip](unique-key-merge-on-write) update delete bitmap while publish version (#11195)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 01e108cb7b [feature-wip](unique-key-merge-on-write) update delete bitmap while publish version (#11195)
01e108cb7b is described below

commit 01e108cb7b0f5085a63a535f8cf571eb293d6d2b
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Wed Jul 27 16:26:42 2022 +0800

    [feature-wip](unique-key-merge-on-write) update delete bitmap while publish version (#11195)
    
    1.make version publish work in version order
    2.update delete bitmap while publish version, load current version rowset
    primary key and search in pre rowsets
    3.speed up publish version task by parallel tablet publish task
    
    Co-authored-by: yixiutt <yi...@selectdb.com>
---
 be/src/agent/task_worker_pool.cpp                  |  10 ++
 be/src/common/config.h                             |   2 +
 be/src/common/status.h                             |   3 +-
 be/src/olap/olap_server.cpp                        |   6 +
 be/src/olap/rowset/segment_v2/segment.cpp          |   6 +-
 be/src/olap/rowset/segment_v2/segment.h            |   5 +-
 be/src/olap/storage_engine.h                       |   6 +
 be/src/olap/tablet.h                               |   5 +
 be/src/olap/task/engine_publish_version_task.cpp   | 146 ++++++++++++++++-----
 be/src/olap/task/engine_publish_version_task.h     |  34 +++++
 be/src/olap/txn_manager.cpp                        | 102 ++++++++++++++
 be/src/olap/txn_manager.h                          |   5 +
 .../olap/engine_storage_migration_task_test.cpp    |   4 +-
 be/test/olap/tablet_clone_test.cpp                 |   4 +-
 be/test/olap/tablet_cooldown_test.cpp              |   4 +-
 15 files changed, 297 insertions(+), 45 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 1380331e7a..40681ce17d 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -711,6 +711,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
             res = _env->storage_engine()->execute_task(&engine_task);
             if (res.ok()) {
                 break;
+            } else if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) {
+                // version not continuous, put to queue and wait pre version publish
+                // task execute
+                std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+                _tasks.push_back(agent_task_req);
+                _worker_thread_condition_variable.notify_one();
+                break;
             } else {
                 LOG(WARNING) << "publish version error, retry. [transaction_id="
                              << publish_version_req.transaction_id
@@ -719,6 +726,9 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
                 std::this_thread::sleep_for(std::chrono::seconds(1));
             }
         }
+        if (res.precise_code() == OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS) {
+            continue;
+        }
 
         TFinishTaskRequest finish_task_request;
         if (!res) {
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ee8698bcb5..9891df2f18 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -81,6 +81,8 @@ CONF_Int32(push_worker_count_normal_priority, "3");
 CONF_Int32(push_worker_count_high_priority, "3");
 // the count of thread to publish version
 CONF_Int32(publish_version_worker_count, "8");
+// the count of tablet thread to publish version
+CONF_Int32(tablet_publish_txn_max_thread, "32");
 // the count of thread to clear transaction task
 CONF_Int32(clear_transaction_task_worker_count, "1");
 // the count of thread to delete
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 830d417a7c..f441ccd759 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -231,7 +231,8 @@ namespace doris {
     M(OLAP_ERR_ROWSET_READ_FAILED, -3111, "", true)                      \
     M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true)         \
     M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true)           \
-    M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true)
+    M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true)                 \
+    M(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS, -3115, "", false)
 
 enum ErrorCode {
 #define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE,
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index fbb1811705..28e9241985 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -137,6 +137,12 @@ Status StorageEngine::start_bg_threads() {
             &_cooldown_tasks_producer_thread));
     LOG(INFO) << "cooldown tasks producer thread started";
 
+    // add tablet publish version thread pool
+    ThreadPoolBuilder("TabletPublishTxnThreadPool")
+            .set_min_threads(config::tablet_publish_txn_max_thread)
+            .set_max_threads(config::tablet_publish_txn_max_thread)
+            .build(&_tablet_publish_txn_thread_pool);
+
     LOG(INFO) << "all storage engine's background threads are started.";
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index 09c29979bf..6b84cd9566 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -78,7 +78,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
         }
     }
 
-    RETURN_IF_ERROR(_load_index());
+    RETURN_IF_ERROR(load_index());
     iter->reset(new SegmentIterator(this->shared_from_this(), schema));
     iter->get()->init(read_options);
     return Status::OK();
@@ -134,7 +134,7 @@ Status Segment::_parse_footer() {
     return Status::OK();
 }
 
-Status Segment::_load_index() {
+Status Segment::load_index() {
     return _load_index_once.call([this] {
         // read and parse short key index page
         PageReadOptions opts;
@@ -225,7 +225,7 @@ Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column,
 }
 
 Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) {
-    RETURN_IF_ERROR(_load_index());
+    RETURN_IF_ERROR(load_index());
     DCHECK(_pk_index_reader != nullptr);
     if (!_pk_index_reader->check_present(key)) {
         return Status::NotFound("Can't find key in the segment");
diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h
index e8cb0c3081..5323ace873 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -92,6 +92,8 @@ public:
     // only used by UT
     const SegmentFooterPB& footer() const { return _footer; }
 
+    Status load_index();
+
 private:
     DISALLOW_COPY_AND_ASSIGN(Segment);
     Segment(uint32_t segment_id, const TabletSchema* tablet_schema);
@@ -99,9 +101,6 @@ private:
     Status _open();
     Status _parse_footer();
     Status _create_column_readers();
-    // Load and decode short key index.
-    // May be called multiple times, subsequent calls will no op.
-    Status _load_index();
 
 private:
     friend class SegmentIterator;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 1b6b28b4c4..df018e27d3 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -192,6 +192,10 @@ public:
     Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
     Status submit_quick_compaction_task(TabletSharedPtr tablet);
 
+    std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() {
+        return _tablet_publish_txn_thread_pool;
+    }
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
@@ -382,6 +386,8 @@ private:
     std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
 
+    std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
+
     std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
 
     CompactionPermitLimiter _permit_limiter;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 74aa3a8c8f..08e5fecd15 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -438,6 +438,11 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
 }
 
 inline bool Tablet::enable_unique_key_merge_on_write() const {
+#ifdef BE_TEST
+    if (_tablet_meta == nullptr) {
+        return false;
+    }
+#endif
     return _tablet_meta->enable_unique_key_merge_on_write();
 }
 
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 5c7397c8ad..177f9f89d0 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -17,6 +17,8 @@
 
 #include "olap/task/engine_publish_version_task.h"
 
+#include <util/defer_op.h>
+
 #include <map>
 
 #include "olap/data_dir.h"
@@ -34,13 +36,38 @@ EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publi
           _error_tablet_ids(error_tablet_ids),
           _succ_tablet_ids(succ_tablet_ids) {}
 
+void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
+    std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
+    _error_tablet_ids->push_back(tablet_id);
+}
+
+void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
+    std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
+    _succ_tablet_ids->push_back(tablet_id);
+}
+
+void EnginePublishVersionTask::wait() {
+    std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex);
+    _tablet_finish_sleep_cond.wait_for(lock, std::chrono::milliseconds(10));
+}
+
+void EnginePublishVersionTask::notify() {
+    std::unique_lock<std::mutex> lock(_tablet_finish_sleep_mutex);
+    _tablet_finish_sleep_cond.notify_one();
+}
+
 Status EnginePublishVersionTask::finish() {
     Status res = Status::OK();
     int64_t transaction_id = _publish_version_req.transaction_id;
     VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
 
     // each partition
+    bool meet_version_not_continuous = false;
+    std::atomic<int64_t> total_task_num(0);
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
+        if (meet_version_not_continuous) {
+            break;
+        }
         int64_t partition_id = par_ver_info.partition_id;
         // get all partition related tablets and check whether the tablet have the related version
         std::set<TabletInfo> partition_related_tablet_infos;
@@ -60,7 +87,9 @@ Status EnginePublishVersionTask::finish() {
 
         // each tablet
         for (auto& tablet_rs : tablet_related_rs) {
-            Status publish_status = Status::OK();
+            if (meet_version_not_continuous) {
+                break;
+            }
             TabletInfo tablet_info = tablet_rs.first;
             RowsetSharedPtr rowset = tablet_rs.second;
             VLOG_CRITICAL << "begin to publish version on tablet. "
@@ -86,39 +115,45 @@ Status EnginePublishVersionTask::finish() {
                 res = Status::OLAPInternalError(OLAP_ERR_PUSH_TABLE_NOT_EXIST);
                 continue;
             }
-
-            publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
-                    partition_id, tablet, transaction_id, version);
-            if (publish_status != Status::OK()) {
-                LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id()
-                             << ", tablet_id=" << tablet_info.tablet_id
-                             << ", txn_id=" << transaction_id;
-                _error_tablet_ids->push_back(tablet_info.tablet_id);
-                res = publish_status;
+            Version max_version = tablet->max_version();
+            // in uniq key model with merge-on-write, we should see all
+            // previous version when update delete bitmap, so add a check
+            // here and wait pre version publish or lock timeout
+            if (tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+                tablet->enable_unique_key_merge_on_write() &&
+                version.first != max_version.second + 1) {
+                LOG(INFO) << "uniq key with merge-on-write version not continuous, current max "
+                             "version="
+                          << max_version.second << ", publish_version=" << version.first
+                          << " tablet_id=" << tablet->tablet_id();
+                meet_version_not_continuous = true;
+                res = Status::OLAPInternalError(OLAP_ERR_PUBLISH_VERSION_NOT_CONTINUOUS);
                 continue;
             }
-
-            // add visible rowset to tablet
-            publish_status = tablet->add_inc_rowset(rowset);
-            if (publish_status != Status::OK() &&
-                publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
-                LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id="
-                             << rowset->rowset_id() << ", tablet_id=" << tablet_info.tablet_id
-                             << ", txn_id=" << transaction_id << ", res=" << publish_status;
-                _error_tablet_ids->push_back(tablet_info.tablet_id);
-                res = publish_status;
-                continue;
-            }
-            if (_succ_tablet_ids != nullptr) {
-                _succ_tablet_ids->push_back(tablet_info.tablet_id);
-            }
-            partition_related_tablet_infos.erase(tablet_info);
-            VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name()
-                        << ", transaction_id=" << transaction_id << ", version=" << version.first
-                        << ", res=" << publish_status;
+            total_task_num.fetch_add(1);
+            auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
+                    this, tablet, rowset, partition_id, transaction_id, version, tablet_info,
+                    &total_task_num);
+            auto submit_st =
+                    StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
+                            [=]() { tablet_publish_txn_ptr->handle(); });
+            CHECK(submit_st.ok());
         }
+    }
+    // wait for all publish txn finished
+    while (total_task_num.load() != 0) {
+        wait();
+    }
+
+    // check if the related tablet remained all have the version
+    for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
+        int64_t partition_id = par_ver_info.partition_id;
+        // get all partition related tablets and check whether the tablet have the related version
+        std::set<TabletInfo> partition_related_tablet_infos;
+        StorageEngine::instance()->tablet_manager()->get_partition_related_tablets(
+                partition_id, &partition_related_tablet_infos);
 
-        // check if the related tablet remained all have the version
+        Version version(par_ver_info.version, par_ver_info.version);
         for (auto& tablet_info : partition_related_tablet_infos) {
             // has to use strict mode to check if check all tablets
             if (!_publish_version_req.strict_mode) {
@@ -127,11 +162,11 @@ Status EnginePublishVersionTask::finish() {
             TabletSharedPtr tablet =
                     StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id);
             if (tablet == nullptr) {
-                _error_tablet_ids->push_back(tablet_info.tablet_id);
+                add_error_tablet_id(tablet_info.tablet_id);
             } else {
                 // check if the version exist, if not exist, then set publish failed
                 if (!tablet->check_version_exist(version)) {
-                    _error_tablet_ids->push_back(tablet_info.tablet_id);
+                    add_error_tablet_id(tablet_info.tablet_id);
                 }
             }
         }
@@ -143,4 +178,51 @@ Status EnginePublishVersionTask::finish() {
     return res;
 }
 
+TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task,
+                                           TabletSharedPtr tablet, RowsetSharedPtr rowset,
+                                           int64_t partition_id, int64_t transaction_id,
+                                           Version version, const TabletInfo& tablet_info,
+                                           std::atomic<int64_t>* total_task_num)
+        : _engine_publish_version_task(engine_task),
+          _tablet(tablet),
+          _rowset(rowset),
+          _partition_id(partition_id),
+          _transaction_id(transaction_id),
+          _version(version),
+          _tablet_info(tablet_info),
+          _total_task_num(total_task_num) {}
+
+void TabletPublishTxnTask::handle() {
+    Defer defer {[&] {
+        if (_total_task_num->fetch_sub(1) == 1) {
+            _engine_publish_version_task->notify();
+        }
+    }};
+    auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
+            _partition_id, _tablet, _transaction_id, _version);
+    if (publish_status != Status::OK()) {
+        LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
+                     << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id;
+        _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
+        return;
+    }
+
+    // add visible rowset to tablet
+    publish_status = _tablet->add_inc_rowset(_rowset);
+    if (publish_status != Status::OK() &&
+        publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
+        LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
+                     << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
+                     << ", res=" << publish_status;
+        _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
+        return;
+    }
+    _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id);
+    VLOG_NOTICE << "publish version successfully on tablet. tablet=" << _tablet->full_name()
+                << ", transaction_id=" << _transaction_id << ", version=" << _version.first
+                << ", res=" << publish_status;
+
+    return;
+}
+
 } // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index 4086f466d3..959584d116 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -24,6 +24,30 @@
 
 namespace doris {
 
+class EnginePublishVersionTask;
+class TabletPublishTxnTask {
+public:
+    TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet,
+                         RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id,
+                         Version version, const TabletInfo& tablet_info,
+                         std::atomic<int64_t>* total_task_num);
+    ~TabletPublishTxnTask() {}
+
+    void handle();
+
+private:
+    EnginePublishVersionTask* _engine_publish_version_task;
+
+    TabletSharedPtr _tablet;
+    RowsetSharedPtr _rowset;
+    int64_t _partition_id;
+    int64_t _transaction_id;
+    Version _version;
+    TabletInfo _tablet_info;
+
+    std::atomic<int64_t>* _total_task_num;
+};
+
 class EnginePublishVersionTask : public EngineTask {
 public:
     EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
@@ -33,10 +57,20 @@ public:
 
     virtual Status finish() override;
 
+    void add_error_tablet_id(int64_t tablet_id);
+    void add_succ_tablet_id(int64_t tablet_id);
+
+    void notify();
+    void wait();
+
 private:
     const TPublishVersionRequest& _publish_version_req;
+    std::mutex _tablet_ids_mutex;
     vector<TTabletId>* _error_tablet_ids;
     vector<TTabletId>* _succ_tablet_ids;
+
+    std::mutex _tablet_finish_sleep_mutex;
+    std::condition_variable _tablet_finish_sleep_cond;
 };
 
 } // namespace doris
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 0cf09c0876..68d3c57a98 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -44,6 +44,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
 #include "olap/utils.h"
+#include "rowset/beta_rowset.h"
 #include "util/doris_metrics.h"
 #include "util/pretty_printer.h"
 #include "util/time.h"
@@ -308,8 +309,109 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
                 _clear_txn_partition_map_unlocked(transaction_id, partition_id);
             }
         }
+    }
+    auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+#ifdef BE_TEST
+    if (tablet == nullptr) {
+        return Status::OK();
+    }
+#endif
+    // Check if have to build extra delete bitmap for table of UNIQUE_KEY model
+    if (!tablet->enable_unique_key_merge_on_write() ||
+        tablet->tablet_meta()->preferred_rowset_type() != RowsetTypePB::BETA_ROWSET ||
+        rowset_ptr->keys_type() != KeysType::UNIQUE_KEYS) {
         return Status::OK();
     }
+    CHECK(version.first == version.second) << "impossible: " << version;
+
+    // For each key in current set, check if it overwrites any previously
+    // written keys
+    OlapStopWatch watch;
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    std::vector<segment_v2::SegmentSharedPtr> pre_segments;
+    auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
+    Status st = beta_rowset->load_segments(&segments);
+    if (!st.ok()) return st;
+    // lock tablet meta to modify delete bitmap
+    std::lock_guard<std::shared_mutex> meta_wrlock(tablet->get_header_lock());
+    for (auto& seg : segments) {
+        seg->load_index(); // We need index blocks to iterate
+        auto pk_idx = seg->get_primary_key_index();
+        int cnt = 0;
+        int total = pk_idx->num_rows();
+        int32_t remaining = total;
+        bool exact_match = false;
+        std::string last_key;
+        int batch_size = 1024;
+        MemPool pool;
+        while (remaining > 0) {
+            std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
+            RETURN_IF_ERROR(pk_idx->new_iterator(&iter));
+
+            size_t num_to_read = std::min(batch_size, remaining);
+            std::unique_ptr<ColumnVectorBatch> cvb;
+            RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, pk_idx->type_info(),
+                                                      nullptr, &cvb));
+            ColumnBlock block(cvb.get(), &pool);
+            ColumnBlockView column_block_view(&block);
+            Slice last_key_slice(last_key);
+            RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match));
+
+            size_t num_read = num_to_read;
+            RETURN_IF_ERROR(iter->next_batch(&num_read, &column_block_view));
+            DCHECK(num_to_read == num_read);
+            last_key = (reinterpret_cast<const Slice*>(cvb->cell_ptr(num_read - 1)))->to_string();
+
+            // exclude last_key, last_key will be read in next batch.
+            if (num_read == batch_size && num_read != remaining) {
+                num_read -= 1;
+            }
+            for (size_t i = 0; i < num_read; i++) {
+                const Slice* key = reinterpret_cast<const Slice*>(cvb->cell_ptr(i));
+                // first check if exist in pre segment
+                bool find = _check_pk_in_pre_segments(pre_segments, *key, tablet, version);
+                if (find) {
+                    cnt++;
+                    continue;
+                }
+                RowLocation loc;
+                st = tablet->lookup_row_key(*key, &loc, version.first - 1);
+                CHECK(st.ok() || st.is_not_found());
+                if (st.is_not_found()) continue;
+                ++cnt;
+                // TODO: we can just set a bitmap onece we are done while iteration
+                tablet->tablet_meta()->delete_bitmap().add(
+                        {loc.rowset_id, loc.segment_id, version.first}, loc.row_id);
+            }
+            remaining -= num_read;
+        }
+
+        LOG(INFO) << "construct delete bitmap tablet: " << tablet->tablet_id()
+                  << " rowset: " << beta_rowset->rowset_id() << " segment: " << seg->id()
+                  << " version: " << version << " delete: " << cnt << "/" << total;
+        pre_segments.emplace_back(seg);
+    }
+    tablet->save_meta();
+    LOG(INFO) << "finished to update delete bitmap, tablet: " << tablet->tablet_id()
+              << " version: " << version << ", elapse(us): " << watch.get_elapse_time_us();
+    return Status::OK();
+}
+
+bool TxnManager::_check_pk_in_pre_segments(
+        const std::vector<segment_v2::SegmentSharedPtr>& pre_segments, const Slice& key,
+        TabletSharedPtr tablet, const Version& version) {
+    for (auto it = pre_segments.rbegin(); it != pre_segments.rend(); ++it) {
+        RowLocation loc;
+        auto st = (*it)->lookup_row_key(key, &loc);
+        CHECK(st.ok() || st.is_not_found());
+        if (st.is_not_found()) {
+            continue;
+        }
+        tablet->tablet_meta()->delete_bitmap().add({loc.rowset_id, loc.segment_id, version.first},
+                                                   loc.row_id);
+        return true;
+    }
+    return false;
 }
 
 // txn could be rollbacked if it does not have related rowset
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 2dd6dfb753..a755f5dc79 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -42,6 +42,7 @@
 #include "olap/options.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/segment_v2/segment.h"
 #include "olap/tablet.h"
 #include "util/time.h"
 
@@ -172,6 +173,10 @@ private:
     void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
     void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
 
+    bool _check_pk_in_pre_segments(const std::vector<segment_v2::SegmentSharedPtr>& pre_segments,
+                                   const Slice& key, TabletSharedPtr tablet,
+                                   const Version& version);
+
 private:
     const int32_t _txn_map_shard_size;
 
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp
index 927b257d43..b5b3b4d988 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -207,8 +207,8 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
     for (auto& tablet_rs : tablet_related_rs) {
         RowsetSharedPtr rowset = tablet_rs.second;
         res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
-                                                   write_req.tablet_id, write_req.schema_hash,
-                                                   tablet_rs.first.tablet_uid, version);
+                                                   tablet->tablet_id(), tablet->schema_hash(),
+                                                   tablet->tablet_uid(), version);
         EXPECT_EQ(Status::OK(), res);
         res = tablet->add_inc_rowset(rowset);
         EXPECT_EQ(Status::OK(), res);
diff --git a/be/test/olap/tablet_clone_test.cpp b/be/test/olap/tablet_clone_test.cpp
index 135ed47c77..51124f87cf 100644
--- a/be/test/olap/tablet_clone_test.cpp
+++ b/be/test/olap/tablet_clone_test.cpp
@@ -201,8 +201,8 @@ TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) {
         RowsetSharedPtr rowset = tablet_rs.second;
         rowset->rowset_meta()->set_resource_id(kResourceId);
         st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
-                                                  write_req.tablet_id, write_req.schema_hash,
-                                                  tablet_rs.first.tablet_uid, version);
+                                                  tablet->tablet_id(), tablet->schema_hash(),
+                                                  tablet->tablet_uid(), version);
         ASSERT_EQ(Status::OK(), st);
         st = tablet->add_inc_rowset(rowset);
         ASSERT_EQ(Status::OK(), st);
diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp
index a7308062dd..f20eccd376 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -200,8 +200,8 @@ TEST_F(TabletCooldownTest, normal) {
     for (auto& tablet_rs : tablet_related_rs) {
         RowsetSharedPtr rowset = tablet_rs.second;
         st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
-                                                  write_req.tablet_id, write_req.schema_hash,
-                                                  tablet_rs.first.tablet_uid, version);
+                                                  tablet->tablet_id(), tablet->schema_hash(),
+                                                  tablet->tablet_uid(), version);
         ASSERT_EQ(Status::OK(), st);
         st = tablet->add_inc_rowset(rowset);
         ASSERT_EQ(Status::OK(), st);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org