You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 15:29:23 UTC

[incubator-doris] branch master updated: [fix](storage) Disable compaction before schema change is actually executed(#9032) (#9065)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 47dfdd8e09 [fix](storage) Disable compaction before schema change is actually executed(#9032) (#9065)
47dfdd8e09 is described below

commit 47dfdd8e0959cb221b6a66b1a87161094e64d649
Author: Lijia Liu <li...@yeah.net>
AuthorDate: Wed Jun 1 23:29:18 2022 +0800

    [fix](storage) Disable compaction before schema change is actually executed(#9032) (#9065)
    
    As in issue, the combination and schema change at the same time may lead to version intersection.
    Describe the overview of changes.
    1. Do not do compaction before schema change is actually executed.
    2. Set tablet as bad when it has version intersection.
    3. Do not do schema change when it can not find appropriate versions to delete in new tablet.
    4. Do not change rowsets after compaction if the rowsets of the tablet has changed.
---
 be/src/olap/compaction.cpp                   |  8 ++---
 be/src/olap/compaction.h                     |  2 +-
 be/src/olap/cumulative_compaction_policy.cpp |  1 -
 be/src/olap/schema_change.cpp                | 41 ++++++++++++++++++-----
 be/src/olap/schema_change.h                  |  8 ++++-
 be/src/olap/tablet.cpp                       | 50 ++++++++++++++++++++++++----
 be/src/olap/tablet.h                         |  8 ++---
 7 files changed, 92 insertions(+), 26 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 14569881a7..ff98cece3f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -131,7 +131,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     TRACE("check correctness finished");
 
     // 4. modify rowsets in memory
-    modify_rowsets();
+    RETURN_NOT_OK(modify_rowsets());
     TRACE("modify rowsets finished");
 
     // 5. update last success compaction time
@@ -179,13 +179,13 @@ Status Compaction::construct_input_rowset_readers() {
     return Status::OK();
 }
 
-void Compaction::modify_rowsets() {
+Status Compaction::modify_rowsets() {
     std::vector<RowsetSharedPtr> output_rowsets;
     output_rowsets.push_back(_output_rowset);
-
     std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
-    _tablet->modify_rowsets(output_rowsets, _input_rowsets);
+    RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
     _tablet->save_meta();
+    return Status::OK();
 }
 
 void Compaction::gc_output_rowset() {
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 7306bbacea..b7a6bbf20b 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -63,7 +63,7 @@ protected:
     Status do_compaction(int64_t permits);
     Status do_compaction_impl(int64_t permits);
 
-    void modify_rowsets();
+    Status modify_rowsets();
     void gc_output_rowset();
 
     Status construct_output_rowset_writer();
diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp
index bc1dee3148..a77eb44f73 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -180,7 +180,6 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         // check base rowset
         if (rs_meta->start_version() == 0) {
             base_rowset_exist = true;
-            // _calc_promotion_size(rs_meta, &promotion_size);
         }
         if (rs_meta->end_version() < point) {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 08646bd01e..f7d86d8d86 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1459,15 +1459,15 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
         reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
 
         do {
+            RowsetSharedPtr max_rowset;
             // get history data to be converted and it will check if there is hold in base tablet
-            res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed);
+            res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset);
             if (!res.ok()) {
                 LOG(WARNING) << "fail to get version to be changed. res=" << res;
                 break;
             }
 
             // should check the max_version >= request.alter_version, if not the convert is useless
-            RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version();
             if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) {
                 LOG(WARNING) << "base tablet's max version="
                              << (max_rowset == nullptr ? 0 : max_rowset->end_version())
@@ -1482,9 +1482,23 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             std::vector<RowsetSharedPtr> rowsets_to_delete;
             std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
             new_tablet->acquire_version_and_rowsets(&version_rowsets);
+            std::sort(version_rowsets.begin(), version_rowsets.end(),
+                      [](const std::pair<Version, RowsetSharedPtr>& l,
+                         const std::pair<Version, RowsetSharedPtr>& r) {
+                          return l.first.first < r.first.first;
+                      });
             for (auto& pair : version_rowsets) {
                 if (pair.first.second <= max_rowset->end_version()) {
                     rowsets_to_delete.push_back(pair.second);
+                } else if (pair.first.first <= max_rowset->end_version()) {
+                    // If max version is [X-10] and new tablet has version [7-9][10-12],
+                    // we only can remove [7-9] from new tablet. If we add [X-10] to new tablet, it will has version
+                    // cross: [X-10] [10-12].
+                    // So, we should return OLAP_ERR_VERSION_ALREADY_MERGED for fast fail.
+                    LOG(WARNING) << "New tablet has a version " << pair.first
+                                 << " crossing base tablet's max_version="
+                                 << max_rowset->end_version();
+                    Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
                 }
             }
             std::vector<RowsetSharedPtr> empty_vec;
@@ -1579,8 +1593,15 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
                         std::make_pair(item.column_name, mv_param));
             }
         }
-
+        {
+            std::lock_guard<std::shared_mutex> wrlock(_mutex);
+            _tablet_ids_in_converting.insert(new_tablet->tablet_id());
+        }
         res = _convert_historical_rowsets(sc_params);
+        {
+            std::lock_guard<std::shared_mutex> wrlock(_mutex);
+            _tablet_ids_in_converting.erase(new_tablet->tablet_id());
+        }
         if (!res.ok()) {
             break;
         }
@@ -1609,6 +1630,11 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
     return res;
 }
 
+bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) {
+    std::shared_lock rdlock(_mutex);
+    return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end();
+}
+
 Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
                                                    TabletSharedPtr new_tablet,
                                                    RowsetSharedPtr* base_rowset,
@@ -1726,18 +1752,17 @@ SCHEMA_VERSION_CONVERT_ERR:
 }
 
 Status SchemaChangeHandler::_get_versions_to_be_changed(
-        TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed) {
+        TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed,
+        RowsetSharedPtr* max_rowset) {
     RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
     if (rowset == nullptr) {
         LOG(WARNING) << "Tablet has no version. base_tablet=" << base_tablet->full_name();
         return Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
     }
+    *max_rowset = rowset;
 
-    std::vector<Version> span_versions;
     RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second),
-                                                           &span_versions));
-    versions_to_be_changed->insert(versions_to_be_changed->end(), span_versions.begin(),
-                                   span_versions.end());
+                                                           versions_to_be_changed));
 
     return Status::OK();
 }
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index dc382ececd..5c8757613d 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -188,6 +188,8 @@ public:
     // schema change v2, it will not set alter task in base tablet
     Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
 
+    bool tablet_in_converting(int64_t tablet_id);
+
 private:
     // Check the status of schema change and clear information between "a pair" of Schema change tables
     // Since A->B's schema_change information for A will be overwritten in subsequent processing (no extra cleanup here)
@@ -198,7 +200,8 @@ private:
                                                const TAlterTabletReq& request);
 
     Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
-                                       std::vector<Version>* versions_to_be_changed);
+                                       std::vector<Version>* versions_to_be_changed,
+                                       RowsetSharedPtr* max_rowset);
 
     struct AlterMaterializedViewParam {
         std::string column_name;
@@ -235,6 +238,9 @@ private:
     virtual ~SchemaChangeHandler();
     SchemaChangeHandler(const SchemaChangeHandler&) = delete;
     SchemaChangeHandler& operator=(const SchemaChangeHandler&) = delete;
+
+    std::shared_mutex _mutex;
+    std::unordered_set<int64_t> _tablet_ids_in_converting;
 };
 
 using RowBlockDeleter = std::function<void(RowBlock*)>;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 668c9ce67f..a457e3eb42 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -37,6 +37,7 @@
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_meta_manager.h"
+#include "olap/schema_change.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_meta_manager.h"
 #include "util/path_util.h"
@@ -239,8 +240,8 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
     return Status::OK();
 }
 
-void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
-                            std::vector<RowsetSharedPtr>& to_delete) {
+Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
+                              std::vector<RowsetSharedPtr>& to_delete, bool check_delete) {
     // the compaction process allow to compact the single version, eg: version[4-4].
     // this kind of "single version compaction" has same "input version" and "output version".
     // which means "to_add->version()" equals to "to_delete->version()".
@@ -267,6 +268,22 @@ void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
         same_version = false;
     }
 
+    if (check_delete) {
+        for (auto& rs : to_delete) {
+            auto find_rs = _rs_version_map.find(rs->version());
+            if (find_rs == _rs_version_map.end()) {
+                LOG(WARNING) << "try to delete not exist version " << rs->version() << " from "
+                             << full_name();
+                return Status::OLAPInternalError(OLAP_ERR_DELETE_VERSION_ERROR);
+            } else if (find_rs->second->rowset_id() != rs->rowset_id()) {
+                LOG(WARNING) << "try to delete version " << rs->version() << " from " << full_name()
+                             << ", but rowset id changed, delete rowset id is " << rs->rowset_id()
+                             << ", exists rowsetid is" << find_rs->second->rowset_id();
+                return Status::OLAPInternalError(OLAP_ERR_DELETE_VERSION_ERROR);
+            }
+        }
+    }
+
     std::vector<RowsetMetaSharedPtr> rs_metas_to_delete;
     for (auto& rs : to_delete) {
         rs_metas_to_delete.push_back(rs->rowset_meta());
@@ -303,6 +320,7 @@ void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
             StorageEngine::instance()->add_unused_rowset(rs);
         }
     }
+    return Status::OK();
 }
 
 // snapshot manager may call this api to check if version exists, so that
@@ -707,6 +725,15 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
             return false;
         }
     }
+
+    if (tablet_state() == TABLET_NOTREADY) {
+        // Before doing schema change, tablet's rowsets that versions smaller than max converting version will be
+        // removed. So, we only need to do the compaction when it is being converted.
+        // After being converted, tablet's state will be changed to TABLET_RUNNING.
+        auto schema_change_handler = SchemaChangeHandler::instance();
+        return schema_change_handler->tablet_in_converting(tablet_id());
+    }
+
     return true;
 }
 
@@ -798,13 +825,15 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
 }
 
 void Tablet::max_continuous_version_from_beginning(Version* version, Version* max_version) {
+    bool has_version_cross;
     std::shared_lock rdlock(_meta_lock);
-    _max_continuous_version_from_beginning_unlocked(version, max_version);
+    _max_continuous_version_from_beginning_unlocked(version, max_version, &has_version_cross);
 }
 
-void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version,
-                                                             Version* max_version) const {
+void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version,
+                                                             bool* has_version_cross) const {
     std::vector<Version> existing_versions;
+    *has_version_cross = false;
     for (auto& rs : _tablet_meta->all_rs_metas()) {
         existing_versions.emplace_back(rs->version());
     }
@@ -816,10 +845,12 @@ void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version,
                   return left.first < right.first;
               });
 
-    Version max_continuous_version = {-1, 0};
+    Version max_continuous_version = {-1, -1};
     for (int i = 0; i < existing_versions.size(); ++i) {
         if (existing_versions[i].first > max_continuous_version.second + 1) {
             break;
+        } else if (existing_versions[i].first <= max_continuous_version.second) {
+            *has_version_cross = true;
         }
         max_continuous_version = existing_versions[i];
     }
@@ -1248,7 +1279,8 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
     // We start from the initial version and traverse backwards until we meet a discontinuous version.
     Version cversion;
     Version max_version;
-    _max_continuous_version_from_beginning_unlocked(&cversion, &max_version);
+    bool has_version_cross;
+    _max_continuous_version_from_beginning_unlocked(&cversion, &max_version, &has_version_cross);
     tablet_info->__set_version_miss(cversion.second < max_version.second);
     // find rowset with max version
     auto iter = _rs_version_map.find(max_version);
@@ -1265,6 +1297,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
         // and perform state modification operations.
     }
 
+    if (has_version_cross && tablet_state() == TABLET_RUNNING) {
+        tablet_info->__set_used(false);
+    }
+
     // the report version is the largest continuous version, same logic as in FE side
     tablet_info->version = cversion.second;
     // Useless but it is a required filed in TTabletInfo
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 3fb3144af1..8f08833c33 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -100,8 +100,8 @@ public:
     // operation in rowsets
     Status add_rowset(RowsetSharedPtr rowset);
     Status create_initial_rowset(const int64_t version);
-    void modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
-                        std::vector<RowsetSharedPtr>& to_delete);
+    Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
+                          std::vector<RowsetSharedPtr>& to_delete, bool check_delete = false);
 
     // _rs_version_map and _stale_rs_version_map should be protected by _meta_lock
     // The caller must call hold _meta_lock when call this two function.
@@ -279,8 +279,8 @@ private:
     // Returns:
     // version: the max continuous version from beginning
     // max_version: the max version of this tablet
-    void _max_continuous_version_from_beginning_unlocked(Version* version,
-                                                         Version* max_version) const;
+    void _max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version,
+                                                         bool* has_version_cross) const;
     RowsetSharedPtr _rowset_with_largest_size();
     /// Delete stale rowset by version. This method not only delete the version in expired rowset map,
     /// but also delete the version in rowset meta vector.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org