You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/14 09:56:11 UTC

[doris] branch master updated: [Fix](MOW) Fix load data publish timeout when enable unique key MOW (#20720)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b97537b04b [Fix](MOW) Fix load data publish timeout when enable unique key MOW (#20720)
b97537b04b is described below

commit b97537b04b59f69229323aea5beb8e410fccc3cc
Author: abmdocrt <Yu...@gmail.com>
AuthorDate: Wed Jun 14 17:56:02 2023 +0800

    [Fix](MOW) Fix load data publish timeout when enable unique key MOW (#20720)
---
 be/src/olap/delta_writer.cpp | 21 ++++++++++++++-------
 be/src/olap/tablet.cpp       | 31 +++++++++++++++++++++++++++----
 be/src/olap/tablet.h         | 10 +++++++++-
 be/src/olap/txn_manager.cpp  |  5 +++--
 4 files changed, 53 insertions(+), 14 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 919af5cdf1..07e24e2f3f 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -438,13 +438,7 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
         LOG(WARNING) << "fail to build rowset";
         return Status::Error<MEM_ALLOC_FAILED>();
     }
-    Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
-                                                            _req.load_id, _cur_rowset, false);
-    if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
-        LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
-                     << " for rowset: " << _cur_rowset->rowset_id();
-        return res;
-    }
+
     if (_tablet->enable_unique_key_merge_on_write()) {
         auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
         std::vector<segment_v2::SegmentSharedPtr> segments;
@@ -458,7 +452,20 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
             // calculate delete bitmap between segments
             RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments,
                                                                          _delete_bitmap));
+            RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(
+                    _cur_rowset, _rowset_ids, _delete_bitmap, _tablet->max_version().second,
+                    segments, _rowset_writer.get()));
         }
+    }
+    Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
+                                                            _req.load_id, _cur_rowset, false);
+
+    if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
+        LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
+                     << " for rowset: " << _cur_rowset->rowset_id();
+        return res;
+    }
+    if (_tablet->enable_unique_key_merge_on_write()) {
         _storage_engine->txn_manager()->set_txn_related_delete_bitmap(
                 _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(),
                 _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 011ea8e9b6..36f8e0076e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3134,10 +3134,33 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
     return Status::OK();
 }
 
-Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info,
-                                    RowsetWriter* rowset_writer) {
-    DeleteBitmapPtr delete_bitmap = load_info->delete_bitmap;
-    const RowsetIdUnorderedSet& pre_rowset_ids = load_info->rowset_ids;
+Status Tablet::commit_phase_update_delete_bitmap(
+        const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids,
+        DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
+        const std::vector<segment_v2::SegmentSharedPtr>& segments, RowsetWriter* rowset_writer) {
+    RowsetIdUnorderedSet cur_rowset_ids;
+    RowsetIdUnorderedSet rowset_ids_to_add;
+    RowsetIdUnorderedSet rowset_ids_to_del;
+
+    std::shared_lock meta_rlock(_meta_lock);
+    cur_rowset_ids = all_rs_id(cur_version);
+    _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del);
+    if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
+        LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
+                  << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
+    }
+    for (const auto& to_del : rowset_ids_to_del) {
+        delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
+    }
+
+    RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap,
+                                       cur_version, rowset_writer));
+    return Status::OK();
+}
+
+Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
+                                    const RowsetIdUnorderedSet& pre_rowset_ids,
+                                    DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer) {
     RowsetIdUnorderedSet cur_rowset_ids;
     RowsetIdUnorderedSet rowset_ids_to_add;
     RowsetIdUnorderedSet rowset_ids_to_del;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index cd4e21fb79..55a3b676b9 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -447,7 +447,15 @@ public:
 
     Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset);
 
-    Status update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info,
+    Status commit_phase_update_delete_bitmap(
+            const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids,
+            DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
+            const std::vector<segment_v2::SegmentSharedPtr>& segments,
+            RowsetWriter* rowset_writer = nullptr);
+
+    Status update_delete_bitmap(const RowsetSharedPtr& rowset,
+                                const RowsetIdUnorderedSet& pre_rowset_ids,
+                                DeleteBitmapPtr delete_bitmap,
                                 RowsetWriter* rowset_writer = nullptr);
     void calc_compaction_output_rowset_delete_bitmap(
             const std::vector<RowsetSharedPtr>& input_rowsets,
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 1d7d19fb1c..f7e56fd785 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -368,8 +368,9 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
         std::unique_ptr<RowsetWriter> rowset_writer;
         _create_transient_rowset_writer(tablet, rowset, &rowset_writer);
 
-        RETURN_IF_ERROR(
-                tablet->update_delete_bitmap(rowset, &tablet_txn_info, rowset_writer.get()));
+        RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info.rowset_ids,
+                                                     tablet_txn_info.delete_bitmap,
+                                                     rowset_writer.get()));
         if (rowset->tablet_schema()->is_partial_update()) {
             // build rowset writer and merge transient rowset
             RETURN_IF_ERROR(rowset_writer->flush());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org