You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 16:07:38 UTC

[incubator-doris] 01/05: [fix](storage) Disable compaction before schema change is actually executed(#9032) (#9065)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit c78cda0aef160dfd177fd52d87d81ca0d742ca9e
Author: Lijia Liu <li...@yeah.net>
AuthorDate: Wed Jun 1 23:29:18 2022 +0800

    [fix](storage) Disable compaction before schema change is actually executed(#9032) (#9065)
    
    As in issue, the combination and schema change at the same time may lead to version intersection.
    Describe the overview of changes.
    1. Do not do compaction before schema change is actually executed.
    2. Set tablet as bad when it has version intersection.
    3. Do not do schema change when it can not find appropriate versions to delete in new tablet.
    4. Do not change rowsets after compaction if the rowsets of the tablet has changed.
---
 be/src/olap/compaction.cpp                   |  7 ++--
 be/src/olap/compaction.h                     |  2 +-
 be/src/olap/cumulative_compaction_policy.cpp |  1 -
 be/src/olap/schema_change.cpp                | 40 ++++++++++++++++++-----
 be/src/olap/schema_change.h                  |  6 +++-
 be/src/olap/tablet.cpp                       | 49 ++++++++++++++++++++++++----
 be/src/olap/tablet.h                         |  8 ++---
 7 files changed, 89 insertions(+), 24 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a2947cc515..c484fd5703 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -122,7 +122,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
     TRACE("check correctness finished");
 
     // 4. modify rowsets in memory
-    modify_rowsets();
+    RETURN_NOT_OK(modify_rowsets());
     TRACE("modify rowsets finished");
 
     // 5. update last success compaction time
@@ -191,13 +191,14 @@ OLAPStatus Compaction::construct_input_rowset_readers() {
     return OLAP_SUCCESS;
 }
 
-void Compaction::modify_rowsets() {
+OLAPStatus Compaction::modify_rowsets() {
     std::vector<RowsetSharedPtr> output_rowsets;
     output_rowsets.push_back(_output_rowset);
 
     WriteLock wrlock(_tablet->get_header_lock());
-    _tablet->modify_rowsets(output_rowsets, _input_rowsets);
+    RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
     _tablet->save_meta();
+    return OLAP_SUCCESS;
 }
 
 void Compaction::gc_output_rowset() {
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 6e7985a4d1..e537997b04 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -63,7 +63,7 @@ protected:
     OLAPStatus do_compaction(int64_t permits);
     OLAPStatus do_compaction_impl(int64_t permits);
 
-    void modify_rowsets();
+    OLAPStatus modify_rowsets();
     void gc_output_rowset();
 
     OLAPStatus construct_output_rowset_writer();
diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp
index d1a712c274..2a18a1f1ac 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -179,7 +179,6 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Table
         // check base rowset
         if (rs_meta->start_version() == 0) {
             base_rowset_exist = true;
-            // _calc_promotion_size(rs_meta, &promotion_size);
         }
         if (rs_meta->end_version() < point) {
             // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index b27fea5f71..95f4944e45 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1511,15 +1511,15 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
                                                          + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::TASK);
 
         do {
+            RowsetSharedPtr max_rowset;
             // get history data to be converted and it will check if there is hold in base tablet
-            res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed);
+            res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset);
             if (res != OLAP_SUCCESS) {
                 LOG(WARNING) << "fail to get version to be changed. res=" << res;
                 break;
             }
 
             // should check the max_version >= request.alter_version, if not the convert is useless
-            RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version();
             if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) {
                 LOG(WARNING) << "base tablet's max version="
                              << (max_rowset == nullptr ? 0 : max_rowset->end_version())
@@ -1534,9 +1534,23 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
             std::vector<RowsetSharedPtr> rowsets_to_delete;
             std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
             new_tablet->acquire_version_and_rowsets(&version_rowsets);
+            std::sort(version_rowsets.begin(), version_rowsets.end(),
+                      [](const std::pair<Version, RowsetSharedPtr>& l,
+                         const std::pair<Version, RowsetSharedPtr>& r) {
+                          return l.first.first < r.first.first;
+                      });
             for (auto& pair : version_rowsets) {
                 if (pair.first.second <= max_rowset->end_version()) {
                     rowsets_to_delete.push_back(pair.second);
+                } else if (pair.first.first <= max_rowset->end_version()) {
+                    // If max version is [X-10] and new tablet has version [7-9][10-12],
+                    // we only can remove [7-9] from new tablet. If we add [X-10] to new tablet, it will has version
+                    // cross: [X-10] [10-12].
+                    // So, we should return OLAP_ERR_VERSION_ALREADY_MERGED for fast fail.
+                    LOG(WARNING) << "New tablet has a version " << pair.first
+                                 << " crossing base tablet's max_version="
+                                 << max_rowset->end_version();
+                    return OLAP_ERR_VERSION_ALREADY_MERGED;
                 }
             }
             std::vector<RowsetSharedPtr> empty_vec;
@@ -1633,8 +1647,15 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
                         std::make_pair(item.column_name, mv_param));
             }
         }
-
+        {
+            std::lock_guard<std::shared_mutex> wrlock(_mutex);
+            _tablet_ids_in_converting.insert(new_tablet->tablet_id());
+        }
         res = _convert_historical_rowsets(sc_params);
+        {
+            std::lock_guard<std::shared_mutex> wrlock(_mutex);
+            _tablet_ids_in_converting.erase(new_tablet->tablet_id());
+        }
         if (res != OLAP_SUCCESS) {
             break;
         }
@@ -1663,6 +1684,11 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
     return res;
 }
 
+bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) {
+    std::shared_lock rdlock(_mutex);
+    return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end();
+}
+
 OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
                                                        TabletSharedPtr new_tablet,
                                                        RowsetSharedPtr* base_rowset,
@@ -1797,18 +1823,16 @@ SCHEMA_VERSION_CONVERT_ERR:
 }
 
 OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed(
-        TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed) {
+        TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed, RowsetSharedPtr* max_rowset) {
     RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
     if (rowset == nullptr) {
         LOG(WARNING) << "Tablet has no version. base_tablet=" << base_tablet->full_name();
         return OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS;
     }
+    *max_rowset = rowset;
 
-    std::vector<Version> span_versions;
     RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second),
-                                                           &span_versions));
-    versions_to_be_changed->insert(versions_to_be_changed->end(), span_versions.begin(),
-                                   span_versions.end());
+                                                           versions_to_be_changed));
 
     return OLAP_SUCCESS;
 }
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 2c95a3485c..1916574f5d 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -200,6 +200,8 @@ public:
     // schema change v2, it will not set alter task in base tablet
     OLAPStatus process_alter_tablet_v2(const TAlterTabletReqV2& request);
 
+    bool tablet_in_converting(int64_t tablet_id);
+
 private:
 
     // Check the status of schema change and clear information between "a pair" of Schema change tables
@@ -211,7 +213,7 @@ private:
                                                    const TAlterTabletReq& request);
 
     OLAPStatus _get_versions_to_be_changed(TabletSharedPtr base_tablet,
-                                           std::vector<Version>* versions_to_be_changed);
+                                           std::vector<Version>* versions_to_be_changed, RowsetSharedPtr* max_rowset);
 
     struct AlterMaterializedViewParam {
         std::string column_name;
@@ -252,6 +254,8 @@ private:
     SchemaChangeHandler& operator=(const SchemaChangeHandler&) = delete;
 
     std::shared_ptr<MemTracker> _mem_tracker;
+    std::shared_mutex _mutex;
+    std::unordered_set<int64_t> _tablet_ids_in_converting;
 };
 
 using RowBlockDeleter = std::function<void(RowBlock*)>;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 90b70672d1..27c436c253 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -37,6 +37,7 @@
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_meta_manager.h"
+#include "olap/schema_change.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_meta_manager.h"
 #include "util/path_util.h"
@@ -247,8 +248,8 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
     return OLAP_SUCCESS;
 }
 
-void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
-                            std::vector<RowsetSharedPtr>& to_delete) {
+OLAPStatus Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
+                              std::vector<RowsetSharedPtr>& to_delete, bool check_delete) {
     // the compaction process allow to compact the single version, eg: version[4-4].
     // this kind of "single version compaction" has same "input version" and "output version".
     // which means "to_add->version()" equals to "to_delete->version()".
@@ -275,6 +276,22 @@ void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
         same_version = false;
     }
 
+    if (check_delete) {
+        for (auto& rs : to_delete) {
+            auto find_rs = _rs_version_map.find(rs->version());
+            if (find_rs == _rs_version_map.end()) {
+                LOG(WARNING) << "try to delete not exist version " << rs->version() << " from "
+                             << full_name();
+                return OLAP_ERR_DELETE_VERSION_ERROR;
+            } else if (find_rs->second->rowset_id() != rs->rowset_id()) {
+                LOG(WARNING) << "try to delete version " << rs->version() << " from " << full_name()
+                             << ", but rowset id changed, delete rowset id is " << rs->rowset_id()
+                             << ", exists rowsetid is" << find_rs->second->rowset_id();
+                return OLAP_ERR_DELETE_VERSION_ERROR;
+            }
+        }
+    }
+
     std::vector<RowsetMetaSharedPtr> rs_metas_to_delete;
     for (auto& rs : to_delete) {
         rs_metas_to_delete.push_back(rs->rowset_meta());
@@ -311,6 +328,7 @@ void Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
             StorageEngine::instance()->add_unused_rowset(rs);
         }
     }
+    return OLAP_SUCCESS;
 }
 
 // snapshot manager may call this api to check if version exists, so that
@@ -717,6 +735,15 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
             return false;
         }
     }
+
+    if (tablet_state() == TABLET_NOTREADY) {
+        // Before doing schema change, tablet's rowsets that versions smaller than max converting version will be
+        // removed. So, we only need to do the compaction when it is being converted.
+        // After being converted, tablet's state will be changed to TABLET_RUNNING.
+        auto schema_change_handler = SchemaChangeHandler::instance();
+        return schema_change_handler->tablet_in_converting(tablet_id());
+    }
+
     return true;
 }
 
@@ -808,12 +835,15 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
 }
 
 void Tablet::max_continuous_version_from_beginning(Version* version, Version* max_version) {
+    bool has_version_cross;
     ReadLock rdlock(_meta_lock);
-    _max_continuous_version_from_beginning_unlocked(version, max_version);
+    _max_continuous_version_from_beginning_unlocked(version, max_version, &has_version_cross);
 }
 
-void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version) const {
+void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version,
+                                                             bool* has_version_cross) const {
     std::vector<Version> existing_versions;
+    *has_version_cross = false;
     for (auto& rs : _tablet_meta->all_rs_metas()) {
         existing_versions.emplace_back(rs->version());
     }
@@ -825,10 +855,12 @@ void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, V
                   return left.first < right.first;
               });
 
-    Version max_continuous_version = {-1, 0};
+    Version max_continuous_version = {-1, -1};
     for (int i = 0; i < existing_versions.size(); ++i) {
         if (existing_versions[i].first > max_continuous_version.second + 1) {
             break;
+        } else if (existing_versions[i].first <= max_continuous_version.second) {
+            *has_version_cross = true;
         }
         max_continuous_version = existing_versions[i];
     }
@@ -1255,7 +1287,8 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
     // We start from the initial version and traverse backwards until we meet a discontinuous version.
     Version cversion;
     Version max_version;
-    _max_continuous_version_from_beginning_unlocked(&cversion, &max_version);
+    bool has_version_cross;
+    _max_continuous_version_from_beginning_unlocked(&cversion, &max_version, &has_version_cross);
     tablet_info->__set_version_miss(cversion.second < max_version.second);
     // find rowset with max version
     auto iter = _rs_version_map.find(max_version);
@@ -1272,6 +1305,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
         // and perform state modification operations.
     }
 
+    if (has_version_cross && tablet_state() == TABLET_RUNNING) {
+        tablet_info->__set_used(false);
+    }
+
     // the report version is the largest continuous version, same logic as in FE side
     tablet_info->version = cversion.second;
     // Useless but it is a required filed in TTabletInfo
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 70a0c39441..09a31ae459 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -97,8 +97,8 @@ public:
 
     // operation in rowsets
     OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true);
-    void modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
-                        std::vector<RowsetSharedPtr>& to_delete);
+    OLAPStatus modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
+                        std::vector<RowsetSharedPtr>& to_delete, bool check_delete = false);
 
     // _rs_version_map and _stale_rs_version_map should be protected by _meta_lock
     // The caller must call hold _meta_lock when call this two function.
@@ -276,8 +276,8 @@ private:
     // Returns:
     // version: the max continuous version from beginning
     // max_version: the max version of this tablet
-    void _max_continuous_version_from_beginning_unlocked(Version* version,
-                                                         Version* max_version) const;
+    void _max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version,
+                                                         bool* has_version_cross) const;
     RowsetSharedPtr _rowset_with_largest_size();
     /// Delete stale rowset by version. This method not only delete the version in expired rowset map,
     /// but also delete the version in rowset meta vector.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org