You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/07/07 08:52:04 UTC

[GitHub] [incubator-doris] ZhangYu0123 opened a new pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

ZhangYu0123 opened a new pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039


   Related issue #4017, main changes as follows:
   (1) Add expired_snapshot_rs_version_map,_expired_snapshot_rs_metas,(2) Add  VersionedRowsetTracker record compacted path version, (3) Record path version when rowsets compact,(4) In gc process, add expired snapshot rowsets to unused set to remove.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039#discussion_r452279720



##########
File path: be/src/olap/tablet_meta.h
##########
@@ -325,6 +334,10 @@ inline const std::vector<RowsetMetaSharedPtr>& TabletMeta::all_inc_rs_metas() co
     return _inc_rs_metas;
 }
 
+inline const std::vector<RowsetMetaSharedPtr>& TabletMeta::all_expired_snapshot_rs_metas() const {
+    return _expired_snapshot_rs_metas;

Review comment:
       how about `_stale_rs_metas`

##########
File path: be/src/olap/tablet.cpp
##########
@@ -358,9 +384,89 @@ void Tablet::delete_expired_inc_rowsets() {
     save_meta();
 }
 
+void Tablet::delete_expired_snapshot_rowset() {
+
+    int64_t now = UnixSeconds();
+    vector<pair<Version, VersionHash>> expired_versions;
+    WriteLock wrlock(&_meta_lock);
+    // Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
+    double expired_snapshot_sweep_endtime = ::difftime(now, config::tablet_rowset_expired_snapshot_sweep_time);
+    
+    std::vector<int64_t> path_version_vec;
+    // capture the path version to delete
+    _timestamped_version_tracker.capture_expired_paths(static_cast<int64_t>(expired_snapshot_sweep_endtime), &path_version_vec);
+
+    auto old_size = _expired_snapshot_rs_version_map.size();
+    auto old_meta_size = _tablet_meta->all_expired_snapshot_rs_metas().size();
+
+    std::vector<Version> to_delete_version;
+    // fetch all versions of rowsets to delete
+    for (auto& path_version : path_version_vec) {
+        std::vector<Version> version_path;
+        // fetch the path versions in the version path and delete the path version in tracker
+        _timestamped_version_tracker.fetch_and_delete_path_by_id(path_version, &version_path);
+        to_delete_version.insert(to_delete_version.end(), version_path.begin(), version_path.end());
+    }
+
+    if (to_delete_version.empty()) {
+        return;
+    }
+
+    // check consistent versions
+    const RowsetSharedPtr lastest_delta = rowset_with_max_version();
+    DCHECK(lastest_delta != nullptr);
+
+    Version test_version = Version(0, lastest_delta->end_version());
+    OLAPStatus status = capture_consistent_versions(test_version, nullptr);
+
+    // When there is no consistent versions, we must reconstruct the tracker.
+    if (status != OLAP_SUCCESS) {
+        LOG(WARNING) << "The consistent version check fails, there are bugs. "
+                        << "Reconstruct the tracker to recover versions.";

Review comment:
       print tablet id too




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wutiangan commented on a change in pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

Posted by GitBox <gi...@apache.org>.
wutiangan commented on a change in pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039#discussion_r454061303



##########
File path: be/src/olap/tablet.cpp
##########
@@ -310,7 +321,10 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
     RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
     _rs_version_map[rowset->version()] = rowset;
     _inc_rs_version_map[rowset->version()] = rowset;
-    _rs_graph.add_version_to_graph(rowset->version());
+
+    // _rs_graph.add_version_to_graph(rowset->version());

Review comment:
       remove this line?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wuyunfeng commented on a change in pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

Posted by GitBox <gi...@apache.org>.
wuyunfeng commented on a change in pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039#discussion_r450711406



##########
File path: be/src/olap/rowset_graph.cpp
##########
@@ -17,14 +17,247 @@
 
 #include "olap/rowset_graph.h"
 
-#include <queue>
 #include <memory>
+#include <queue>
 
 #include "common/logging.h"
 
 namespace doris {
 
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+void VersionedRowsetTracker::_construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    int64_t max_version = 0;
+
+    std::vector<RowsetMetaSharedPtr> all_rs_metas(rs_metas);
+    all_rs_metas.insert(all_rs_metas.end(), expired_snapshot_rs_metas.begin(),
+                        expired_snapshot_rs_metas.end());
+    // construct the roset graph
+    _rowsetGraph.reconstruct_rowset_graph(all_rs_metas, max_version);
+
+    // fill main_path
+    std::unordered_map<Version, RowsetMetaSharedPtr, HashOfVersion> main_path;
+    for (auto& rs_meta : rs_metas) {
+        main_path[rs_meta->version()] = rs_meta;
+    }
+    // fill other_path
+    typedef std::shared_ptr<std::map<int64_t, RowsetMetaSharedPtr>> Edges;
+    std::map<int64_t, Edges> other_path;
+
+    for (auto& rs_meta : expired_snapshot_rs_metas) {
+        if (other_path.find(rs_meta->start_version()) == other_path.end()) {
+            other_path[rs_meta->start_version()] =
+                    Edges(new std::map<int64_t, RowsetMetaSharedPtr>());
+        }
+        other_path[rs_meta->start_version()]->insert(
+                std::pair<int64_t, RowsetMetaSharedPtr>(rs_meta->end_version(), rs_meta));
+    }
+
+    auto iter = other_path.begin();
+    for (; iter != other_path.end(); iter++) {
+        Edges edges = iter->second;
+        auto min_begin_version = iter->first;
+
+        while (!edges->empty()) {
+            PathVersionListSharedPtr path_version_ptr(new std::vector<VersionTrackerSharedPtr>());
+            // 1. find a path
+            //   begin from min_begin_version

Review comment:
       merge this two comment into one line maybe better

##########
File path: be/src/common/config.h
##########
@@ -230,6 +230,8 @@ namespace config {
     CONF_mInt32(pending_data_expire_time_sec, "1800");
     // inc_rowset expired interval
     CONF_mInt32(inc_rowset_expired_sec, "1800");
+    // inc_rowset snapshot rs sweep time interval
+    CONF_mInt32(tablet_rowset_expired_snapshot_sweep_time, "1800");

Review comment:
       It is better to add `unit` to your config item

##########
File path: be/src/olap/tablet.cpp
##########
@@ -61,7 +61,10 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
         _last_cumu_compaction_success_millis(0),
         _last_base_compaction_success_millis(0),
         _cumulative_point(kInvalidCumulativePoint) {
-    _rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas());
+    // change _rs_graph to _versioned_rs_tracker
+    // _rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas());
+    _versioned_rs_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(),

Review comment:
       66 - 67 has nonstandard format
   

##########
File path: be/src/olap/rowset_graph.cpp
##########
@@ -17,14 +17,247 @@
 
 #include "olap/rowset_graph.h"
 
-#include <queue>
 #include <memory>
+#include <queue>
 
 #include "common/logging.h"
 
 namespace doris {
 
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+void VersionedRowsetTracker::_construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    int64_t max_version = 0;
+
+    std::vector<RowsetMetaSharedPtr> all_rs_metas(rs_metas);
+    all_rs_metas.insert(all_rs_metas.end(), expired_snapshot_rs_metas.begin(),
+                        expired_snapshot_rs_metas.end());
+    // construct the roset graph
+    _rowsetGraph.reconstruct_rowset_graph(all_rs_metas, max_version);
+
+    // fill main_path
+    std::unordered_map<Version, RowsetMetaSharedPtr, HashOfVersion> main_path;
+    for (auto& rs_meta : rs_metas) {
+        main_path[rs_meta->version()] = rs_meta;
+    }
+    // fill other_path
+    typedef std::shared_ptr<std::map<int64_t, RowsetMetaSharedPtr>> Edges;
+    std::map<int64_t, Edges> other_path;
+
+    for (auto& rs_meta : expired_snapshot_rs_metas) {
+        if (other_path.find(rs_meta->start_version()) == other_path.end()) {
+            other_path[rs_meta->start_version()] =
+                    Edges(new std::map<int64_t, RowsetMetaSharedPtr>());
+        }
+        other_path[rs_meta->start_version()]->insert(
+                std::pair<int64_t, RowsetMetaSharedPtr>(rs_meta->end_version(), rs_meta));
+    }
+
+    auto iter = other_path.begin();
+    for (; iter != other_path.end(); iter++) {
+        Edges edges = iter->second;
+        auto min_begin_version = iter->first;
+
+        while (!edges->empty()) {
+            PathVersionListSharedPtr path_version_ptr(new std::vector<VersionTrackerSharedPtr>());
+            // 1. find a path
+            //   begin from min_begin_version
+            auto min_end_version = edges->begin()->first;
+            auto min_rs_meta = edges->begin()->second;
+            //   tracker the first

Review comment:
       ```suggestion
               // tracker the first
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039#discussion_r455059053



##########
File path: be/src/olap/tablet.cpp
##########
@@ -358,9 +383,102 @@ void Tablet::delete_expired_inc_rowsets() {
     save_meta();
 }
 
+void Tablet::delete_expired_snapshot_rowset() {
+
+    int64_t now = UnixSeconds();
+    vector<pair<Version, VersionHash>> expired_versions;
+    WriteLock wrlock(&_meta_lock);
+    // Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
+    double expired_snapshot_sweep_endtime = ::difftime(now, config::tablet_rowset_expired_snapshot_sweep_time);
+    
+    std::vector<int64_t> path_version_vec;
+    // capture the path version to delete
+    _timestamped_version_tracker.capture_expired_paths(static_cast<int64_t>(expired_snapshot_sweep_endtime), &path_version_vec);
+
+    if (path_version_vec.empty()) {
+        return;
+    }
+
+    // do check consistent operation
+    auto path_id_iter = path_version_vec.rbegin();
+
+    std::map<int64_t, PathVersionListSharedPtr> expired_snapshot_version_path_map;
+    while (path_id_iter != path_version_vec.rend()) {
+
+        PathVersionListSharedPtr version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);
+        const std::vector<TimestampedVersionSharedPtr>& to_delete_version = version_path->timestamped_versions();
+
+        auto first_version = to_delete_version.front();
+        auto end_version = to_delete_version.back();
+        Version test_version = Version(first_version->version().first, end_version->version().second);
+
+        OLAPStatus status = capture_consistent_versions(test_version, nullptr);
+        // When there is no consistent versions, we must reconstruct the tracker.
+        if (status != OLAP_SUCCESS) {
+            LOG(WARNING) << "The consistent version check fails, there are bugs. "
+                << "Reconstruct the tracker to recover versions in tablet=" << tablet_id();
+
+            _timestamped_version_tracker.reconstruct_versioned_tracker(
+                    _tablet_meta->all_rs_metas(), expired_snapshot_version_path_map);
+
+            // double check the consistent versions
+            status = capture_consistent_versions(test_version, nullptr);
+
+            if (status != OLAP_SUCCESS) {
+                if (!config::ignore_rowset_expired_snapshot_unconsistent_delete) {
+                    LOG(FATAL) << "rowset expired snapshot unconsistent delete. tablet= " << tablet_id();
+                } else {
+                    LOG(WARNING) << "rowset expired snapshot unconsistent delete. tablet= " << tablet_id();
+                }
+            }
+            return;
+        }
+        expired_snapshot_version_path_map[*path_id_iter] = version_path;
+        path_id_iter++;
+    }
+
+    auto old_size = _expired_snapshot_rs_version_map.size();
+    auto old_meta_size = _tablet_meta->all_expired_snapshot_rs_metas().size();
+
+    // do delete operation
+    auto to_delete_iter = expired_snapshot_version_path_map.begin();
+    while (to_delete_iter != expired_snapshot_version_path_map.end()) {
+        
+        std::vector<TimestampedVersionSharedPtr>& to_delete_version = to_delete_iter->second->timestamped_versions();
+        for (auto& timestampedVersion : to_delete_version) {
+            auto it = _expired_snapshot_rs_version_map.find(timestampedVersion->version());
+            if (it != _expired_snapshot_rs_version_map.end()) {
+                // delete rowset
+                StorageEngine::instance()->add_unused_rowset(it->second);
+                _expired_snapshot_rs_version_map.erase(it);
+                LOG(INFO) << "delete expired snapshot rowset tablet=" << full_name() 
+                    <<" version[" << timestampedVersion->version().first << "," << timestampedVersion->version().second 
+                    << "] move to unused_rowset success " << std::fixed << expired_snapshot_sweep_endtime;
+            } else {
+                LOG(WARNING) << "delete expired snapshot rowset tablet=" << full_name() 
+                    <<" version[" << timestampedVersion->version().first << "," << timestampedVersion->version().second 
+                    << "] not find in expired snapshot rs version ";

Review comment:
       ```suggestion
                       << "] not find in expired snapshot rs version map";
   ```

##########
File path: be/src/olap/tablet.cpp
##########
@@ -358,9 +383,102 @@ void Tablet::delete_expired_inc_rowsets() {
     save_meta();
 }
 
+void Tablet::delete_expired_snapshot_rowset() {
+
+    int64_t now = UnixSeconds();
+    vector<pair<Version, VersionHash>> expired_versions;
+    WriteLock wrlock(&_meta_lock);
+    // Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
+    double expired_snapshot_sweep_endtime = ::difftime(now, config::tablet_rowset_expired_snapshot_sweep_time);
+    
+    std::vector<int64_t> path_version_vec;
+    // capture the path version to delete
+    _timestamped_version_tracker.capture_expired_paths(static_cast<int64_t>(expired_snapshot_sweep_endtime), &path_version_vec);
+
+    if (path_version_vec.empty()) {
+        return;
+    }
+
+    // do check consistent operation
+    auto path_id_iter = path_version_vec.rbegin();
+
+    std::map<int64_t, PathVersionListSharedPtr> expired_snapshot_version_path_map;
+    while (path_id_iter != path_version_vec.rend()) {
+
+        PathVersionListSharedPtr version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);
+        const std::vector<TimestampedVersionSharedPtr>& to_delete_version = version_path->timestamped_versions();
+
+        auto first_version = to_delete_version.front();
+        auto end_version = to_delete_version.back();
+        Version test_version = Version(first_version->version().first, end_version->version().second);
+
+        OLAPStatus status = capture_consistent_versions(test_version, nullptr);
+        // When there is no consistent versions, we must reconstruct the tracker.
+        if (status != OLAP_SUCCESS) {
+            LOG(WARNING) << "The consistent version check fails, there are bugs. "
+                << "Reconstruct the tracker to recover versions in tablet=" << tablet_id();
+
+            _timestamped_version_tracker.reconstruct_versioned_tracker(
+                    _tablet_meta->all_rs_metas(), expired_snapshot_version_path_map);

Review comment:
       expired_snapshot_version_path_map does not contains the expired path you just deleted.

##########
File path: be/src/olap/tablet.cpp
##########
@@ -358,9 +383,102 @@ void Tablet::delete_expired_inc_rowsets() {
     save_meta();
 }
 
+void Tablet::delete_expired_snapshot_rowset() {
+
+    int64_t now = UnixSeconds();
+    vector<pair<Version, VersionHash>> expired_versions;
+    WriteLock wrlock(&_meta_lock);
+    // Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
+    double expired_snapshot_sweep_endtime = ::difftime(now, config::tablet_rowset_expired_snapshot_sweep_time);
+    
+    std::vector<int64_t> path_version_vec;
+    // capture the path version to delete
+    _timestamped_version_tracker.capture_expired_paths(static_cast<int64_t>(expired_snapshot_sweep_endtime), &path_version_vec);
+
+    if (path_version_vec.empty()) {
+        return;
+    }
+
+    // do check consistent operation
+    auto path_id_iter = path_version_vec.rbegin();
+
+    std::map<int64_t, PathVersionListSharedPtr> expired_snapshot_version_path_map;
+    while (path_id_iter != path_version_vec.rend()) {
+
+        PathVersionListSharedPtr version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);
+        const std::vector<TimestampedVersionSharedPtr>& to_delete_version = version_path->timestamped_versions();
+
+        auto first_version = to_delete_version.front();
+        auto end_version = to_delete_version.back();
+        Version test_version = Version(first_version->version().first, end_version->version().second);
+
+        OLAPStatus status = capture_consistent_versions(test_version, nullptr);
+        // When there is no consistent versions, we must reconstruct the tracker.
+        if (status != OLAP_SUCCESS) {
+            LOG(WARNING) << "The consistent version check fails, there are bugs. "
+                << "Reconstruct the tracker to recover versions in tablet=" << tablet_id();
+
+            _timestamped_version_tracker.reconstruct_versioned_tracker(
+                    _tablet_meta->all_rs_metas(), expired_snapshot_version_path_map);
+
+            // double check the consistent versions
+            status = capture_consistent_versions(test_version, nullptr);
+
+            if (status != OLAP_SUCCESS) {
+                if (!config::ignore_rowset_expired_snapshot_unconsistent_delete) {
+                    LOG(FATAL) << "rowset expired snapshot unconsistent delete. tablet= " << tablet_id();
+                } else {
+                    LOG(WARNING) << "rowset expired snapshot unconsistent delete. tablet= " << tablet_id();
+                }
+            }
+            return;
+        }
+        expired_snapshot_version_path_map[*path_id_iter] = version_path;
+        path_id_iter++;
+    }
+
+    auto old_size = _expired_snapshot_rs_version_map.size();
+    auto old_meta_size = _tablet_meta->all_expired_snapshot_rs_metas().size();
+
+    // do delete operation
+    auto to_delete_iter = expired_snapshot_version_path_map.begin();
+    while (to_delete_iter != expired_snapshot_version_path_map.end()) {
+        
+        std::vector<TimestampedVersionSharedPtr>& to_delete_version = to_delete_iter->second->timestamped_versions();
+        for (auto& timestampedVersion : to_delete_version) {
+            auto it = _expired_snapshot_rs_version_map.find(timestampedVersion->version());
+            if (it != _expired_snapshot_rs_version_map.end()) {
+                // delete rowset
+                StorageEngine::instance()->add_unused_rowset(it->second);
+                _expired_snapshot_rs_version_map.erase(it);
+                LOG(INFO) << "delete expired snapshot rowset tablet=" << full_name() 
+                    <<" version[" << timestampedVersion->version().first << "," << timestampedVersion->version().second 
+                    << "] move to unused_rowset success " << std::fixed << expired_snapshot_sweep_endtime;
+            } else {
+                LOG(WARNING) << "delete expired snapshot rowset tablet=" << full_name() 
+                    <<" version[" << timestampedVersion->version().first << "," << timestampedVersion->version().second 
+                    << "] not find in expired snapshot rs version ";
+            }
+            _delete_expired_snapshot_rowset_by_version(timestampedVersion->version());

Review comment:
       in `_delete_expired_snapshot_rowset_by_version`, we call `_expired_snapshot_rs_version_map.erase()` again, which is duplicated. 

##########
File path: be/src/olap/tablet.cpp
##########
@@ -329,6 +342,18 @@ void Tablet::_delete_inc_rowset_by_version(const Version& version,
     VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version;
 }
 
+void Tablet::_delete_expired_snapshot_rowset_by_version(const Version& version) {
+    // delete expired snapshot rowset from map
+    _expired_snapshot_rs_version_map.erase(version);
+
+    RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_expired_snapshot_rs_meta_by_version(version);

Review comment:
       This check is meaningless. I think you can just call
   `_tablet_meta->delete_expired_snapshot_rs_meta_by_version(version);`

##########
File path: be/src/olap/tablet_meta.cpp
##########
@@ -544,6 +548,27 @@ OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
     return OLAP_SUCCESS;
 }
 
+void TabletMeta::delete_expired_snapshot_rs_meta_by_version(const Version& version) {
+    auto it = _expired_snapshot_rs_metas.begin();
+    while (it != _expired_snapshot_rs_metas.end()) {
+        if ((*it)->version() == version) {
+            _expired_snapshot_rs_metas.erase(it);
+            break;

Review comment:
       Are you sure there will be only one version which equals to the specified version?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039#discussion_r450727522



##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +50,86 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+class VersionTracker {
+public:
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    Version version() const { return _version; }
+
+    int64_t get_create_time() { return _create_time; }
+
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;
+    }
+
+    bool operator==(const VersionTracker& rhs) const {
+        return _version.first == rhs._version.first && _version.second == rhs._version.second;
+    }
+
+    bool contains(const VersionTracker& other) const {
+        return _version.first <= other._version.first && _version.second >= other._version.second;
+    }
+
+private:
+    Version _version;
+    int64_t _create_time;
+};
+
+using VersionTrackerSharedPtr = std::shared_ptr<VersionTracker>;
+using PathVersionListSharedPtr = std::shared_ptr<std::vector<VersionTrackerSharedPtr>>;
+
+class VersionedRowsetTracker {
+public:
+    // Construct rowsets version tracker with expired snapshot rowsets
+    void construct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    // Reconstruct rowsets version tracker with expired snapshot rowsets
+    void reconstruct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    // Add a version
+    void add_version(const Version& version);
+
+    // Add a version path with expired_snapshot_rs_metas
+    void add_expired_path_version(
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    // Capture consistent versions from graph
+    OLAPStatus capture_consistent_versions(const Version& spec_version,
+                                           std::vector<Version>* version_path) const;
+
+    // Capture all expired path version.
+    // When the last rowset createtime of a path greater than expired time  which can be expressed
+    // "now() - tablet_rowset_expired_snapshot_sweep_time" , this path will be remained.
+    // Otherwise, this path will be added to versions.
+    void capture_expired_path_version(int64_t expired_snapshot_sweep_endtime,
+                                      std::vector<int64_t>* path_version) const;
+
+    void fetch_path_version(int64_t path_version, std::vector<Version>& version_path);
+
+    void fetch_and_delete_path_version(int64_t path_version, std::vector<Version>& version_path);
+
+    void _print_current_state();
+
+private:
+    // construct rowsets version tracker with expired snapshot rowsets
+    void _construct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+private:
+    int64_t next_path_version = 1;

Review comment:
       _next_path_version




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4039: Add delayed deletion of rowsets function, fix -230 error.

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4039:
URL: https://github.com/apache/incubator-doris/pull/4039#discussion_r450879095



##########
File path: be/src/olap/rowset_graph.h
##########
@@ -23,16 +23,32 @@
 #include "olap/rowset/rowset_meta.h"
 
 namespace doris {
-
+/// RowsetGraph class which is implemented to build and maintain total versions of rowsets. 
+/// This class use adjacency-matrix represent rowsets version and links. A vertex is a version 
+/// and a link is the _version object of a rowset (from start version to end version).

Review comment:
       ```suggestion
   /// and a link is the _version object of a rowset (from start version to end version + 1).
   ```

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {

Review comment:
       How about name it to `TimestampedVersion` ?
   Tracker sounds like it needs to have a certain function, and this class is actually just adding a time stamp to the version.

##########
File path: be/src/olap/cumulative_compaction.cpp
##########
@@ -65,9 +65,10 @@ OLAPStatus CumulativeCompaction::compact() {
     DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(_input_rowsets_size);
     TRACE("save cumulative compaction metrics");
 
+    // delay garbage operation
     // 7. garbage collect input rowsets after cumulative compaction 
-    RETURN_NOT_OK(gc_unused_rowsets());
-    TRACE("unused rowsets have been moved to GC queue");
+    // RETURN_NOT_OK(gc_unused_rowsets());
+    // TRACE("unused rowsets have been moved to GC queue");

Review comment:
       Remove it

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 

Review comment:
       ```suggestion
   /// VersionTracker class which is implemented to maintain multi-version path of rowsets. 
   ```

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)

Review comment:
       ```suggestion
       VersionTracker(const Version& version, int64_t create_time)
   ```

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    /// Return the rowset version of VersionTracker record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of VersionTracker record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;

Review comment:
       ```suggestion
           return _version != rhs._version;
   ```

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    /// Return the rowset version of VersionTracker record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of VersionTracker record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;
+    }
+
+    /// Compare two version trackers.
+    bool operator==(const VersionTracker& rhs) const {
+        return _version.first == rhs._version.first && _version.second == rhs._version.second;

Review comment:
       ```suggestion
           return _version == rhs._version;
   ```

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    /// Return the rowset version of VersionTracker record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of VersionTracker record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;
+    }
+
+    /// Compare two version trackers.
+    bool operator==(const VersionTracker& rhs) const {
+        return _version.first == rhs._version.first && _version.second == rhs._version.second;
+    }
+
+    /// Judge if a tracker contains the other.
+    bool contains(const VersionTracker& other) const {
+        return _version.first <= other._version.first && _version.second >= other._version.second;

Review comment:
       ```suggestion
           return _version.contains(rhs._version);
   ```

##########
File path: be/src/olap/rowset_graph.cpp
##########
@@ -17,14 +17,243 @@
 
 #include "olap/rowset_graph.h"
 
-#include <queue>
 #include <memory>
+#include <queue>
 
 #include "common/logging.h"
 
 namespace doris {
 
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+void VersionedRowsetTracker::_construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    int64_t max_version = 0;
+
+    std::vector<RowsetMetaSharedPtr> all_rs_metas(rs_metas);
+    all_rs_metas.insert(all_rs_metas.end(), expired_snapshot_rs_metas.begin(),
+                        expired_snapshot_rs_metas.end());
+    // construct the roset graph
+    _rowset_graph.reconstruct_rowset_graph(all_rs_metas, max_version);
+
+    // version -> rowsetmeta
+    // fill main_path which are included not merged rowsets
+    std::unordered_map<Version, RowsetMetaSharedPtr, HashOfVersion> main_path;
+    for (auto& rs_meta : rs_metas) {
+        main_path[rs_meta->version()] = rs_meta;
+    }
+    // fill other_path which are included merged rowsets
+    typedef std::shared_ptr<std::map<int64_t, RowsetMetaSharedPtr>> Edges;
+    std::map<int64_t, Edges> other_path;
+
+    for (auto& rs_meta : expired_snapshot_rs_metas) {
+        if (other_path.find(rs_meta->start_version()) == other_path.end()) {
+            other_path[rs_meta->start_version()] =
+                    Edges(new std::map<int64_t, RowsetMetaSharedPtr>());
+        }
+        other_path[rs_meta->start_version()]->insert(
+                std::pair<int64_t, RowsetMetaSharedPtr>(rs_meta->end_version(), rs_meta));
+    }
+
+    auto iter = other_path.begin();
+    for (; iter != other_path.end(); iter++) {
+        Edges edges = iter->second;
+        auto min_begin_version = iter->first;
+
+        while (!edges->empty()) {
+            PathVersionListSharedPtr path_version_ptr(new std::vector<VersionTrackerSharedPtr>());
+            // 1. find a path, begin from min_begin_version
+            auto min_end_version = edges->begin()->first;
+            auto min_rs_meta = edges->begin()->second;
+            // tracker the first
+            VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                    Version(min_begin_version, min_end_version), min_rs_meta->creation_time()));
+            path_version_ptr->push_back(tracker_ptr);
+            // 1.1 do loop, find next start to make a path
+            auto tmp_start_version = min_end_version + 1;
+            do {
+                int64_t tmp_end_version = -1;
+                int64_t create_time = -1;
+                // 1.2 the next must in other_path, find from other_path
+                auto tmp_edge_iter = other_path.find(tmp_start_version);
+                if (tmp_edge_iter == other_path.end() || tmp_edge_iter->second->empty()) {
+                    break;
+                }
+                // 1.3 record this version to make a tracker, put into path_version
+                auto next_rs_meta_iter = tmp_edge_iter->second->begin();
+                tmp_end_version = next_rs_meta_iter->first;
+                create_time = next_rs_meta_iter->second->creation_time();
+                VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                        Version(tmp_start_version, tmp_end_version), create_time));
+                path_version_ptr->push_back(tracker_ptr);
+                // 1.4 judge if this path finish
+                auto max_end_version = tmp_end_version;
+                // find from other_path
+                if (edges->find(max_end_version) != edges->end()) {
+                    break;
+                }
+                // find from main_path
+                if (main_path.find(Version(min_begin_version, max_end_version)) !=
+                    main_path.end()) {
+                    break;
+                }
+                // 1.5 do next
+                tmp_start_version = tmp_end_version + 1;
+
+            } while (true);
+            // 1.6 add path_version to map
+            _expired_snapshot_rs_path_map[_next_path_version++] = path_version_ptr;
+
+            // 2 remove this path from other_path
+            auto path_iter = path_version_ptr->begin();
+            for (; path_iter != path_version_ptr->end(); path_iter++) {
+                Version version = (*path_iter)->version();
+                other_path[version.first]->erase(version.second);
+            }
+        }
+    }
+    other_path.clear();
+    main_path.clear();
+    _print_current_state();
+}
+
+void VersionedRowsetTracker::construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+
+    _construct_versioned_tracker(rs_metas, expired_snapshot_rs_metas);
+}
+
+void VersionedRowsetTracker::reconstruct_versioned_tracker(

Review comment:
       Looks same as `construct_versioned_tracker()`

##########
File path: be/src/olap/base_compaction.cpp
##########
@@ -56,9 +56,10 @@ OLAPStatus BaseCompaction::compact() {
     DorisMetrics::instance()->base_compaction_bytes_total.increment(_input_rowsets_size);
     TRACE("save base compaction metrics");
 
+    // delay garbage operation 
     // 5. garbage collect input rowsets after base compaction 
-    RETURN_NOT_OK(gc_unused_rowsets());
-    TRACE("unused rowsets have been moved to GC queue");
+    // RETURN_NOT_OK(gc_unused_rowsets());

Review comment:
       Remove the code if not use

##########
File path: be/src/olap/rowset_graph.cpp
##########
@@ -17,14 +17,243 @@
 
 #include "olap/rowset_graph.h"
 
-#include <queue>
 #include <memory>
+#include <queue>
 
 #include "common/logging.h"
 
 namespace doris {
 
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+void VersionedRowsetTracker::_construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    int64_t max_version = 0;
+
+    std::vector<RowsetMetaSharedPtr> all_rs_metas(rs_metas);
+    all_rs_metas.insert(all_rs_metas.end(), expired_snapshot_rs_metas.begin(),
+                        expired_snapshot_rs_metas.end());
+    // construct the roset graph
+    _rowset_graph.reconstruct_rowset_graph(all_rs_metas, max_version);
+
+    // version -> rowsetmeta
+    // fill main_path which are included not merged rowsets
+    std::unordered_map<Version, RowsetMetaSharedPtr, HashOfVersion> main_path;
+    for (auto& rs_meta : rs_metas) {
+        main_path[rs_meta->version()] = rs_meta;
+    }
+    // fill other_path which are included merged rowsets
+    typedef std::shared_ptr<std::map<int64_t, RowsetMetaSharedPtr>> Edges;
+    std::map<int64_t, Edges> other_path;
+
+    for (auto& rs_meta : expired_snapshot_rs_metas) {
+        if (other_path.find(rs_meta->start_version()) == other_path.end()) {
+            other_path[rs_meta->start_version()] =
+                    Edges(new std::map<int64_t, RowsetMetaSharedPtr>());
+        }
+        other_path[rs_meta->start_version()]->insert(
+                std::pair<int64_t, RowsetMetaSharedPtr>(rs_meta->end_version(), rs_meta));
+    }
+
+    auto iter = other_path.begin();
+    for (; iter != other_path.end(); iter++) {
+        Edges edges = iter->second;
+        auto min_begin_version = iter->first;
+
+        while (!edges->empty()) {
+            PathVersionListSharedPtr path_version_ptr(new std::vector<VersionTrackerSharedPtr>());
+            // 1. find a path, begin from min_begin_version
+            auto min_end_version = edges->begin()->first;
+            auto min_rs_meta = edges->begin()->second;
+            // tracker the first
+            VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                    Version(min_begin_version, min_end_version), min_rs_meta->creation_time()));
+            path_version_ptr->push_back(tracker_ptr);
+            // 1.1 do loop, find next start to make a path
+            auto tmp_start_version = min_end_version + 1;
+            do {
+                int64_t tmp_end_version = -1;
+                int64_t create_time = -1;
+                // 1.2 the next must in other_path, find from other_path
+                auto tmp_edge_iter = other_path.find(tmp_start_version);
+                if (tmp_edge_iter == other_path.end() || tmp_edge_iter->second->empty()) {
+                    break;
+                }
+                // 1.3 record this version to make a tracker, put into path_version
+                auto next_rs_meta_iter = tmp_edge_iter->second->begin();
+                tmp_end_version = next_rs_meta_iter->first;
+                create_time = next_rs_meta_iter->second->creation_time();
+                VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                        Version(tmp_start_version, tmp_end_version), create_time));
+                path_version_ptr->push_back(tracker_ptr);
+                // 1.4 judge if this path finish
+                auto max_end_version = tmp_end_version;
+                // find from other_path
+                if (edges->find(max_end_version) != edges->end()) {
+                    break;
+                }
+                // find from main_path
+                if (main_path.find(Version(min_begin_version, max_end_version)) !=
+                    main_path.end()) {
+                    break;
+                }
+                // 1.5 do next
+                tmp_start_version = tmp_end_version + 1;
+
+            } while (true);
+            // 1.6 add path_version to map
+            _expired_snapshot_rs_path_map[_next_path_version++] = path_version_ptr;
+
+            // 2 remove this path from other_path
+            auto path_iter = path_version_ptr->begin();
+            for (; path_iter != path_version_ptr->end(); path_iter++) {
+                Version version = (*path_iter)->version();
+                other_path[version.first]->erase(version.second);
+            }
+        }
+    }
+    other_path.clear();
+    main_path.clear();
+    _print_current_state();
+}
+
+void VersionedRowsetTracker::construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+
+    _construct_versioned_tracker(rs_metas, expired_snapshot_rs_metas);
+}
+
+void VersionedRowsetTracker::reconstruct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+    _expired_snapshot_rs_path_map.clear();
+    _next_path_version = 1;
+
+    _construct_versioned_tracker(rs_metas, expired_snapshot_rs_metas);
+}
+
+void VersionedRowsetTracker::add_version(const Version& version) {
+    _rowset_graph.add_version_to_graph(version);
+}
+
+void VersionedRowsetTracker::add_expired_path_version(
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (expired_snapshot_rs_metas.empty()) {
+        VLOG(3) << "there is no version in the expired_snapshot_rs_metas.";
+        return;
+    }
+
+    PathVersionListSharedPtr ptr(new std::vector<VersionTrackerSharedPtr>());
+    for (auto rs : expired_snapshot_rs_metas) {
+        VersionTrackerSharedPtr vt_ptr(new VersionTracker(rs->version(), rs->creation_time()));
+        ptr->push_back(vt_ptr);
+    }
+    sort(ptr->begin(), ptr->end());
+    _expired_snapshot_rs_path_map[_next_path_version] = ptr;
+    _next_path_version++;
+}
+
+// Capture consistent versions from graph
+OLAPStatus VersionedRowsetTracker::capture_consistent_versions(
+        const Version& spec_version, std::vector<Version>* version_path) const {
+    return _rowset_graph.capture_consistent_versions(spec_version, version_path);
+}
+
+void VersionedRowsetTracker::capture_expired_path_version(
+        int64_t expired_snapshot_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
+    std::unordered_map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
+            _expired_snapshot_rs_path_map.begin();
+
+    while (iter != _expired_snapshot_rs_path_map.end()) {
+        std::vector<VersionTrackerSharedPtr>::iterator version_path_iter = iter->second->begin();
+        int64_t max_create_time = -1;
+        while (version_path_iter != iter->second->end()) {
+            if (max_create_time < (*version_path_iter)->get_create_time()) {
+                max_create_time = (*version_path_iter)->get_create_time();
+            }
+
+            version_path_iter++;
+        }
+        if (max_create_time <= expired_snapshot_sweep_endtime) {
+            int64_t path_version = iter->first;
+            path_version_vec->push_back(path_version);
+        }
+        iter++;
+    }
+}
+
+void VersionedRowsetTracker::fetch_path_version(int64_t path_version,
+                                                std::vector<Version>& version_path) {
+    if (_expired_snapshot_rs_path_map.count(path_version) == 0) {
+        VLOG(3) << "path version " << path_version << " does not exist!";
+        return;
+    }
+
+    PathVersionListSharedPtr ptr = _expired_snapshot_rs_path_map[path_version];
+
+    std::vector<VersionTrackerSharedPtr>::iterator iter = ptr->begin();
+    while (iter != ptr->end()) {
+        version_path.push_back((*iter)->version());
+        iter++;
+    }
+}
+
+void VersionedRowsetTracker::fetch_and_delete_path_version(int64_t path_version,
+                                                           std::vector<Version>& version_path) {
+    if (_expired_snapshot_rs_path_map.count(path_version) == 0) {
+        VLOG(3) << "path version " << path_version << " does not exist!";
+        return;
+    }
+
+    _print_current_state();
+    fetch_path_version(path_version, version_path);
+
+    _expired_snapshot_rs_path_map.erase(path_version);
+
+    for (auto& version : version_path) {
+        _rowset_graph.delete_version_from_graph(version);
+    }
+}
+
+void VersionedRowsetTracker::_print_current_state() {
+    LOG(INFO) << "current expired next_path_version " << _next_path_version;
+
+    std::unordered_map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
+            _expired_snapshot_rs_path_map.begin();
+    while (iter != _expired_snapshot_rs_path_map.end()) {
+        std::stringstream tracker_info;
+        tracker_info << "current expired path_version " << iter->first;
+
+        std::vector<VersionTrackerSharedPtr>::iterator version_path_iter = iter->second->begin();
+        int64_t max_create_time = -1;
+        while (version_path_iter != iter->second->end()) {
+            if (max_create_time < (*version_path_iter)->get_create_time()) {
+                max_create_time = (*version_path_iter)->get_create_time();
+            }
+            tracker_info << " -> [";
+            tracker_info << (*version_path_iter)->version().first;
+            tracker_info << ",";
+            tracker_info << (*version_path_iter)->version().second;
+            tracker_info << "]";
+
+            version_path_iter++;
+        }
+        LOG(INFO) << tracker_info.str();
+
+        iter++;
+    }
+}
+
+void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
+                                         int64_t& max_version) {

Review comment:
       ```suggestion
                                            int64_t* max_version) {
   ```
   
   Better to use point to indicate it is a output parameter.

##########
File path: be/src/olap/rowset_graph.cpp
##########
@@ -17,14 +17,243 @@
 
 #include "olap/rowset_graph.h"
 
-#include <queue>
 #include <memory>
+#include <queue>
 
 #include "common/logging.h"
 
 namespace doris {
 
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+void VersionedRowsetTracker::_construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    int64_t max_version = 0;
+
+    std::vector<RowsetMetaSharedPtr> all_rs_metas(rs_metas);
+    all_rs_metas.insert(all_rs_metas.end(), expired_snapshot_rs_metas.begin(),
+                        expired_snapshot_rs_metas.end());
+    // construct the roset graph
+    _rowset_graph.reconstruct_rowset_graph(all_rs_metas, max_version);
+
+    // version -> rowsetmeta
+    // fill main_path which are included not merged rowsets
+    std::unordered_map<Version, RowsetMetaSharedPtr, HashOfVersion> main_path;
+    for (auto& rs_meta : rs_metas) {
+        main_path[rs_meta->version()] = rs_meta;
+    }
+    // fill other_path which are included merged rowsets
+    typedef std::shared_ptr<std::map<int64_t, RowsetMetaSharedPtr>> Edges;
+    std::map<int64_t, Edges> other_path;
+
+    for (auto& rs_meta : expired_snapshot_rs_metas) {
+        if (other_path.find(rs_meta->start_version()) == other_path.end()) {
+            other_path[rs_meta->start_version()] =
+                    Edges(new std::map<int64_t, RowsetMetaSharedPtr>());
+        }
+        other_path[rs_meta->start_version()]->insert(
+                std::pair<int64_t, RowsetMetaSharedPtr>(rs_meta->end_version(), rs_meta));
+    }
+
+    auto iter = other_path.begin();
+    for (; iter != other_path.end(); iter++) {
+        Edges edges = iter->second;
+        auto min_begin_version = iter->first;
+
+        while (!edges->empty()) {
+            PathVersionListSharedPtr path_version_ptr(new std::vector<VersionTrackerSharedPtr>());
+            // 1. find a path, begin from min_begin_version
+            auto min_end_version = edges->begin()->first;
+            auto min_rs_meta = edges->begin()->second;
+            // tracker the first
+            VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                    Version(min_begin_version, min_end_version), min_rs_meta->creation_time()));
+            path_version_ptr->push_back(tracker_ptr);
+            // 1.1 do loop, find next start to make a path
+            auto tmp_start_version = min_end_version + 1;
+            do {
+                int64_t tmp_end_version = -1;
+                int64_t create_time = -1;
+                // 1.2 the next must in other_path, find from other_path
+                auto tmp_edge_iter = other_path.find(tmp_start_version);
+                if (tmp_edge_iter == other_path.end() || tmp_edge_iter->second->empty()) {
+                    break;
+                }
+                // 1.3 record this version to make a tracker, put into path_version
+                auto next_rs_meta_iter = tmp_edge_iter->second->begin();
+                tmp_end_version = next_rs_meta_iter->first;
+                create_time = next_rs_meta_iter->second->creation_time();
+                VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                        Version(tmp_start_version, tmp_end_version), create_time));
+                path_version_ptr->push_back(tracker_ptr);
+                // 1.4 judge if this path finish
+                auto max_end_version = tmp_end_version;
+                // find from other_path
+                if (edges->find(max_end_version) != edges->end()) {
+                    break;
+                }
+                // find from main_path
+                if (main_path.find(Version(min_begin_version, max_end_version)) !=
+                    main_path.end()) {
+                    break;
+                }
+                // 1.5 do next
+                tmp_start_version = tmp_end_version + 1;
+
+            } while (true);
+            // 1.6 add path_version to map
+            _expired_snapshot_rs_path_map[_next_path_version++] = path_version_ptr;
+
+            // 2 remove this path from other_path
+            auto path_iter = path_version_ptr->begin();
+            for (; path_iter != path_version_ptr->end(); path_iter++) {
+                Version version = (*path_iter)->version();
+                other_path[version.first]->erase(version.second);
+            }
+        }
+    }
+    other_path.clear();
+    main_path.clear();
+    _print_current_state();
+}
+
+void VersionedRowsetTracker::construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+
+    _construct_versioned_tracker(rs_metas, expired_snapshot_rs_metas);
+}
+
+void VersionedRowsetTracker::reconstruct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+    _expired_snapshot_rs_path_map.clear();
+    _next_path_version = 1;
+
+    _construct_versioned_tracker(rs_metas, expired_snapshot_rs_metas);
+}
+
+void VersionedRowsetTracker::add_version(const Version& version) {
+    _rowset_graph.add_version_to_graph(version);
+}
+
+void VersionedRowsetTracker::add_expired_path_version(
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (expired_snapshot_rs_metas.empty()) {
+        VLOG(3) << "there is no version in the expired_snapshot_rs_metas.";
+        return;
+    }
+
+    PathVersionListSharedPtr ptr(new std::vector<VersionTrackerSharedPtr>());
+    for (auto rs : expired_snapshot_rs_metas) {
+        VersionTrackerSharedPtr vt_ptr(new VersionTracker(rs->version(), rs->creation_time()));
+        ptr->push_back(vt_ptr);
+    }
+    sort(ptr->begin(), ptr->end());
+    _expired_snapshot_rs_path_map[_next_path_version] = ptr;
+    _next_path_version++;
+}
+
+// Capture consistent versions from graph
+OLAPStatus VersionedRowsetTracker::capture_consistent_versions(
+        const Version& spec_version, std::vector<Version>* version_path) const {
+    return _rowset_graph.capture_consistent_versions(spec_version, version_path);
+}
+
+void VersionedRowsetTracker::capture_expired_path_version(
+        int64_t expired_snapshot_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
+    std::unordered_map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
+            _expired_snapshot_rs_path_map.begin();
+
+    while (iter != _expired_snapshot_rs_path_map.end()) {
+        std::vector<VersionTrackerSharedPtr>::iterator version_path_iter = iter->second->begin();
+        int64_t max_create_time = -1;

Review comment:
       I think we can save the `max_create_time` of a version path in `_expired_snapshot_rs_path_map`

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    /// Return the rowset version of VersionTracker record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of VersionTracker record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;
+    }
+
+    /// Compare two version trackers.
+    bool operator==(const VersionTracker& rhs) const {
+        return _version.first == rhs._version.first && _version.second == rhs._version.second;
+    }
+
+    /// Judge if a tracker contains the other.
+    bool contains(const VersionTracker& other) const {
+        return _version.first <= other._version.first && _version.second >= other._version.second;
+    }
+
+private:
+    Version _version;
+    int64_t _create_time;
+};
+
+using VersionTrackerSharedPtr = std::shared_ptr<VersionTracker>;
+using PathVersionListSharedPtr = std::shared_ptr<std::vector<VersionTrackerSharedPtr>>;
+
+/// VersionedRowsetTracker class is responsible to track all rowsets version links of a tablet.
+/// This class not only records the graph of all versions, but also records the paths which will be removed
+/// after the path is expired.
+class VersionedRowsetTracker {

Review comment:
       ```suggestion
   class TimestampedVersionTracker {
   ```

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    /// Return the rowset version of VersionTracker record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of VersionTracker record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;
+    }
+
+    /// Compare two version trackers.
+    bool operator==(const VersionTracker& rhs) const {
+        return _version.first == rhs._version.first && _version.second == rhs._version.second;
+    }
+
+    /// Judge if a tracker contains the other.
+    bool contains(const VersionTracker& other) const {
+        return _version.first <= other._version.first && _version.second >= other._version.second;
+    }
+
+private:
+    Version _version;
+    int64_t _create_time;
+};
+
+using VersionTrackerSharedPtr = std::shared_ptr<VersionTracker>;
+using PathVersionListSharedPtr = std::shared_ptr<std::vector<VersionTrackerSharedPtr>>;
+
+/// VersionedRowsetTracker class is responsible to track all rowsets version links of a tablet.
+/// This class not only records the graph of all versions, but also records the paths which will be removed
+/// after the path is expired.
+class VersionedRowsetTracker {
+public:
+    /// Construct rowsets version tracker by rs_metas and expired snapshot rowsets.
+    void construct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Reconstruct rowsets version tracker by rs_metas and expired snapshot rowsets.
+    void reconstruct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Add a version to tracker, this version is a new version rowset, not merged rowset.
+    void add_version(const Version& version);
+
+    /// Add a version path with expired_snapshot_rs_metas, this versions in version path
+    /// are merged rowsets.  These rowsets are tracked and removed after they are expired.
+    /// TabletManager sweep these rowsets using tracker by timing.
+    void add_expired_path_version(
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Given a spec_version, this method can find a version path which is the shortest path
+    /// in the graph. The version paths are added to version_path as return info.
+    /// If this version not in main version, version_path can be included expired rowset.
+    OLAPStatus capture_consistent_versions(const Version& spec_version,
+                                           std::vector<Version>* version_path) const;
+
+    /// Capture all expired path version.
+    /// When the last rowset createtime of a path greater than expired time  which can be expressed
+    /// "now() - tablet_rowset_expired_snapshot_sweep_time" , this path will be remained.
+    /// Otherwise, this path will be added to versions.

Review comment:
       ```suggestion
       /// Otherwise, this path will be added to path_version.
   ```

##########
File path: be/src/olap/rowset_graph.cpp
##########
@@ -17,14 +17,243 @@
 
 #include "olap/rowset_graph.h"
 
-#include <queue>
 #include <memory>
+#include <queue>
 
 #include "common/logging.h"
 
 namespace doris {
 
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+void VersionedRowsetTracker::_construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    int64_t max_version = 0;
+
+    std::vector<RowsetMetaSharedPtr> all_rs_metas(rs_metas);
+    all_rs_metas.insert(all_rs_metas.end(), expired_snapshot_rs_metas.begin(),
+                        expired_snapshot_rs_metas.end());
+    // construct the roset graph
+    _rowset_graph.reconstruct_rowset_graph(all_rs_metas, max_version);
+
+    // version -> rowsetmeta
+    // fill main_path which are included not merged rowsets
+    std::unordered_map<Version, RowsetMetaSharedPtr, HashOfVersion> main_path;
+    for (auto& rs_meta : rs_metas) {
+        main_path[rs_meta->version()] = rs_meta;
+    }
+    // fill other_path which are included merged rowsets
+    typedef std::shared_ptr<std::map<int64_t, RowsetMetaSharedPtr>> Edges;
+    std::map<int64_t, Edges> other_path;
+
+    for (auto& rs_meta : expired_snapshot_rs_metas) {
+        if (other_path.find(rs_meta->start_version()) == other_path.end()) {
+            other_path[rs_meta->start_version()] =
+                    Edges(new std::map<int64_t, RowsetMetaSharedPtr>());
+        }
+        other_path[rs_meta->start_version()]->insert(
+                std::pair<int64_t, RowsetMetaSharedPtr>(rs_meta->end_version(), rs_meta));
+    }
+
+    auto iter = other_path.begin();
+    for (; iter != other_path.end(); iter++) {
+        Edges edges = iter->second;
+        auto min_begin_version = iter->first;
+
+        while (!edges->empty()) {
+            PathVersionListSharedPtr path_version_ptr(new std::vector<VersionTrackerSharedPtr>());
+            // 1. find a path, begin from min_begin_version
+            auto min_end_version = edges->begin()->first;
+            auto min_rs_meta = edges->begin()->second;
+            // tracker the first
+            VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                    Version(min_begin_version, min_end_version), min_rs_meta->creation_time()));
+            path_version_ptr->push_back(tracker_ptr);
+            // 1.1 do loop, find next start to make a path
+            auto tmp_start_version = min_end_version + 1;
+            do {
+                int64_t tmp_end_version = -1;
+                int64_t create_time = -1;
+                // 1.2 the next must in other_path, find from other_path
+                auto tmp_edge_iter = other_path.find(tmp_start_version);
+                if (tmp_edge_iter == other_path.end() || tmp_edge_iter->second->empty()) {
+                    break;
+                }
+                // 1.3 record this version to make a tracker, put into path_version
+                auto next_rs_meta_iter = tmp_edge_iter->second->begin();
+                tmp_end_version = next_rs_meta_iter->first;
+                create_time = next_rs_meta_iter->second->creation_time();
+                VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                        Version(tmp_start_version, tmp_end_version), create_time));
+                path_version_ptr->push_back(tracker_ptr);
+                // 1.4 judge if this path finish
+                auto max_end_version = tmp_end_version;
+                // find from other_path
+                if (edges->find(max_end_version) != edges->end()) {
+                    break;
+                }
+                // find from main_path
+                if (main_path.find(Version(min_begin_version, max_end_version)) !=
+                    main_path.end()) {
+                    break;
+                }
+                // 1.5 do next
+                tmp_start_version = tmp_end_version + 1;
+
+            } while (true);
+            // 1.6 add path_version to map
+            _expired_snapshot_rs_path_map[_next_path_version++] = path_version_ptr;
+
+            // 2 remove this path from other_path
+            auto path_iter = path_version_ptr->begin();
+            for (; path_iter != path_version_ptr->end(); path_iter++) {
+                Version version = (*path_iter)->version();

Review comment:
       ```suggestion
                   const Version& version = (*path_iter)->version();
   ```

##########
File path: be/src/olap/rowset_graph.cpp
##########
@@ -17,14 +17,243 @@
 
 #include "olap/rowset_graph.h"
 
-#include <queue>
 #include <memory>
+#include <queue>
 
 #include "common/logging.h"
 
 namespace doris {
 
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+void VersionedRowsetTracker::_construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    int64_t max_version = 0;
+
+    std::vector<RowsetMetaSharedPtr> all_rs_metas(rs_metas);
+    all_rs_metas.insert(all_rs_metas.end(), expired_snapshot_rs_metas.begin(),
+                        expired_snapshot_rs_metas.end());
+    // construct the roset graph
+    _rowset_graph.reconstruct_rowset_graph(all_rs_metas, max_version);
+
+    // version -> rowsetmeta
+    // fill main_path which are included not merged rowsets
+    std::unordered_map<Version, RowsetMetaSharedPtr, HashOfVersion> main_path;
+    for (auto& rs_meta : rs_metas) {
+        main_path[rs_meta->version()] = rs_meta;
+    }
+    // fill other_path which are included merged rowsets
+    typedef std::shared_ptr<std::map<int64_t, RowsetMetaSharedPtr>> Edges;
+    std::map<int64_t, Edges> other_path;
+
+    for (auto& rs_meta : expired_snapshot_rs_metas) {
+        if (other_path.find(rs_meta->start_version()) == other_path.end()) {
+            other_path[rs_meta->start_version()] =
+                    Edges(new std::map<int64_t, RowsetMetaSharedPtr>());
+        }
+        other_path[rs_meta->start_version()]->insert(
+                std::pair<int64_t, RowsetMetaSharedPtr>(rs_meta->end_version(), rs_meta));
+    }
+
+    auto iter = other_path.begin();
+    for (; iter != other_path.end(); iter++) {
+        Edges edges = iter->second;
+        auto min_begin_version = iter->first;
+
+        while (!edges->empty()) {
+            PathVersionListSharedPtr path_version_ptr(new std::vector<VersionTrackerSharedPtr>());
+            // 1. find a path, begin from min_begin_version
+            auto min_end_version = edges->begin()->first;
+            auto min_rs_meta = edges->begin()->second;
+            // tracker the first
+            VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                    Version(min_begin_version, min_end_version), min_rs_meta->creation_time()));
+            path_version_ptr->push_back(tracker_ptr);
+            // 1.1 do loop, find next start to make a path
+            auto tmp_start_version = min_end_version + 1;
+            do {
+                int64_t tmp_end_version = -1;
+                int64_t create_time = -1;
+                // 1.2 the next must in other_path, find from other_path
+                auto tmp_edge_iter = other_path.find(tmp_start_version);
+                if (tmp_edge_iter == other_path.end() || tmp_edge_iter->second->empty()) {
+                    break;
+                }
+                // 1.3 record this version to make a tracker, put into path_version
+                auto next_rs_meta_iter = tmp_edge_iter->second->begin();
+                tmp_end_version = next_rs_meta_iter->first;
+                create_time = next_rs_meta_iter->second->creation_time();
+                VersionTrackerSharedPtr tracker_ptr(new VersionTracker(
+                        Version(tmp_start_version, tmp_end_version), create_time));
+                path_version_ptr->push_back(tracker_ptr);
+                // 1.4 judge if this path finish
+                auto max_end_version = tmp_end_version;
+                // find from other_path
+                if (edges->find(max_end_version) != edges->end()) {
+                    break;
+                }
+                // find from main_path
+                if (main_path.find(Version(min_begin_version, max_end_version)) !=
+                    main_path.end()) {
+                    break;
+                }
+                // 1.5 do next
+                tmp_start_version = tmp_end_version + 1;
+
+            } while (true);
+            // 1.6 add path_version to map
+            _expired_snapshot_rs_path_map[_next_path_version++] = path_version_ptr;
+
+            // 2 remove this path from other_path
+            auto path_iter = path_version_ptr->begin();
+            for (; path_iter != path_version_ptr->end(); path_iter++) {
+                Version version = (*path_iter)->version();
+                other_path[version.first]->erase(version.second);
+            }
+        }
+    }
+    other_path.clear();
+    main_path.clear();
+    _print_current_state();
+}
+
+void VersionedRowsetTracker::construct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+
+    _construct_versioned_tracker(rs_metas, expired_snapshot_rs_metas);
+}
+
+void VersionedRowsetTracker::reconstruct_versioned_tracker(
+        const std::vector<RowsetMetaSharedPtr>& rs_metas,
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+    _expired_snapshot_rs_path_map.clear();
+    _next_path_version = 1;
+
+    _construct_versioned_tracker(rs_metas, expired_snapshot_rs_metas);
+}
+
+void VersionedRowsetTracker::add_version(const Version& version) {
+    _rowset_graph.add_version_to_graph(version);
+}
+
+void VersionedRowsetTracker::add_expired_path_version(
+        const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas) {
+    if (expired_snapshot_rs_metas.empty()) {
+        VLOG(3) << "there is no version in the expired_snapshot_rs_metas.";
+        return;
+    }
+
+    PathVersionListSharedPtr ptr(new std::vector<VersionTrackerSharedPtr>());
+    for (auto rs : expired_snapshot_rs_metas) {
+        VersionTrackerSharedPtr vt_ptr(new VersionTracker(rs->version(), rs->creation_time()));
+        ptr->push_back(vt_ptr);
+    }
+    sort(ptr->begin(), ptr->end());
+    _expired_snapshot_rs_path_map[_next_path_version] = ptr;
+    _next_path_version++;
+}
+
+// Capture consistent versions from graph
+OLAPStatus VersionedRowsetTracker::capture_consistent_versions(
+        const Version& spec_version, std::vector<Version>* version_path) const {
+    return _rowset_graph.capture_consistent_versions(spec_version, version_path);
+}
+
+void VersionedRowsetTracker::capture_expired_path_version(
+        int64_t expired_snapshot_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
+    std::unordered_map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
+            _expired_snapshot_rs_path_map.begin();
+
+    while (iter != _expired_snapshot_rs_path_map.end()) {
+        std::vector<VersionTrackerSharedPtr>::iterator version_path_iter = iter->second->begin();
+        int64_t max_create_time = -1;
+        while (version_path_iter != iter->second->end()) {
+            if (max_create_time < (*version_path_iter)->get_create_time()) {
+                max_create_time = (*version_path_iter)->get_create_time();
+            }
+
+            version_path_iter++;
+        }
+        if (max_create_time <= expired_snapshot_sweep_endtime) {
+            int64_t path_version = iter->first;
+            path_version_vec->push_back(path_version);
+        }
+        iter++;
+    }
+}
+
+void VersionedRowsetTracker::fetch_path_version(int64_t path_version,
+                                                std::vector<Version>& version_path) {
+    if (_expired_snapshot_rs_path_map.count(path_version) == 0) {
+        VLOG(3) << "path version " << path_version << " does not exist!";
+        return;
+    }
+
+    PathVersionListSharedPtr ptr = _expired_snapshot_rs_path_map[path_version];
+
+    std::vector<VersionTrackerSharedPtr>::iterator iter = ptr->begin();
+    while (iter != ptr->end()) {
+        version_path.push_back((*iter)->version());
+        iter++;
+    }
+}
+
+void VersionedRowsetTracker::fetch_and_delete_path_version(int64_t path_version,
+                                                           std::vector<Version>& version_path) {
+    if (_expired_snapshot_rs_path_map.count(path_version) == 0) {
+        VLOG(3) << "path version " << path_version << " does not exist!";
+        return;
+    }
+
+    _print_current_state();
+    fetch_path_version(path_version, version_path);
+
+    _expired_snapshot_rs_path_map.erase(path_version);
+
+    for (auto& version : version_path) {
+        _rowset_graph.delete_version_from_graph(version);
+    }
+}
+
+void VersionedRowsetTracker::_print_current_state() {

Review comment:
       Better to return a string, and let the caller decide how to print it.

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    /// Return the rowset version of VersionTracker record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of VersionTracker record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;
+    }
+
+    /// Compare two version trackers.
+    bool operator==(const VersionTracker& rhs) const {
+        return _version.first == rhs._version.first && _version.second == rhs._version.second;
+    }
+
+    /// Judge if a tracker contains the other.
+    bool contains(const VersionTracker& other) const {
+        return _version.first <= other._version.first && _version.second >= other._version.second;
+    }
+
+private:
+    Version _version;
+    int64_t _create_time;
+};
+
+using VersionTrackerSharedPtr = std::shared_ptr<VersionTracker>;
+using PathVersionListSharedPtr = std::shared_ptr<std::vector<VersionTrackerSharedPtr>>;
+
+/// VersionedRowsetTracker class is responsible to track all rowsets version links of a tablet.
+/// This class not only records the graph of all versions, but also records the paths which will be removed
+/// after the path is expired.
+class VersionedRowsetTracker {
+public:
+    /// Construct rowsets version tracker by rs_metas and expired snapshot rowsets.
+    void construct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Reconstruct rowsets version tracker by rs_metas and expired snapshot rowsets.
+    void reconstruct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Add a version to tracker, this version is a new version rowset, not merged rowset.
+    void add_version(const Version& version);
+
+    /// Add a version path with expired_snapshot_rs_metas, this versions in version path
+    /// are merged rowsets.  These rowsets are tracked and removed after they are expired.
+    /// TabletManager sweep these rowsets using tracker by timing.
+    void add_expired_path_version(
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Given a spec_version, this method can find a version path which is the shortest path
+    /// in the graph. The version paths are added to version_path as return info.
+    /// If this version not in main version, version_path can be included expired rowset.
+    OLAPStatus capture_consistent_versions(const Version& spec_version,
+                                           std::vector<Version>* version_path) const;
+
+    /// Capture all expired path version.
+    /// When the last rowset createtime of a path greater than expired time  which can be expressed
+    /// "now() - tablet_rowset_expired_snapshot_sweep_time" , this path will be remained.
+    /// Otherwise, this path will be added to versions.
+    void capture_expired_path_version(int64_t expired_snapshot_sweep_endtime,
+                                      std::vector<int64_t>* path_version) const;
+
+    /// Fetch all versions in a path_version.

Review comment:
       ```suggestion
       /// Fetch all versions with a path_version.
   ```

##########
File path: be/src/olap/rowset_graph.h
##########
@@ -47,6 +63,109 @@ class RowsetGraph {
     std::unordered_map<int64_t, int64_t> _vertex_index_map;
 };
 
-}  // namespace doris
+/// VersionTracker class which is implemented to maintain compacted version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class VersionTracker {
+public:
+    /// VersionTracker construction function. Use rowset version and create time to build a VersionTracker.
+    VersionTracker(Version version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~VersionTracker() {}
+
+    /// Return the rowset version of VersionTracker record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of VersionTracker record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const VersionTracker& rhs) const {
+        return _version.first != rhs._version.first || _version.second != rhs._version.second;
+    }
+
+    /// Compare two version trackers.
+    bool operator==(const VersionTracker& rhs) const {
+        return _version.first == rhs._version.first && _version.second == rhs._version.second;
+    }
+
+    /// Judge if a tracker contains the other.
+    bool contains(const VersionTracker& other) const {
+        return _version.first <= other._version.first && _version.second >= other._version.second;
+    }
+
+private:
+    Version _version;
+    int64_t _create_time;
+};
+
+using VersionTrackerSharedPtr = std::shared_ptr<VersionTracker>;
+using PathVersionListSharedPtr = std::shared_ptr<std::vector<VersionTrackerSharedPtr>>;
+
+/// VersionedRowsetTracker class is responsible to track all rowsets version links of a tablet.
+/// This class not only records the graph of all versions, but also records the paths which will be removed
+/// after the path is expired.
+class VersionedRowsetTracker {
+public:
+    /// Construct rowsets version tracker by rs_metas and expired snapshot rowsets.
+    void construct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Reconstruct rowsets version tracker by rs_metas and expired snapshot rowsets.
+    void reconstruct_versioned_tracker(
+            const std::vector<RowsetMetaSharedPtr>& rs_metas,
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Add a version to tracker, this version is a new version rowset, not merged rowset.
+    void add_version(const Version& version);
+
+    /// Add a version path with expired_snapshot_rs_metas, this versions in version path
+    /// are merged rowsets.  These rowsets are tracked and removed after they are expired.
+    /// TabletManager sweep these rowsets using tracker by timing.
+    void add_expired_path_version(
+            const std::vector<RowsetMetaSharedPtr>& expired_snapshot_rs_metas);
+
+    /// Given a spec_version, this method can find a version path which is the shortest path
+    /// in the graph. The version paths are added to version_path as return info.
+    /// If this version not in main version, version_path can be included expired rowset.
+    OLAPStatus capture_consistent_versions(const Version& spec_version,
+                                           std::vector<Version>* version_path) const;
+
+    /// Capture all expired path version.
+    /// When the last rowset createtime of a path greater than expired time  which can be expressed
+    /// "now() - tablet_rowset_expired_snapshot_sweep_time" , this path will be remained.
+    /// Otherwise, this path will be added to versions.
+    void capture_expired_path_version(int64_t expired_snapshot_sweep_endtime,
+                                      std::vector<int64_t>* path_version) const;
+
+    /// Fetch all versions in a path_version.
+    void fetch_path_version(int64_t path_version, std::vector<Version>& version_path);
+
+    /// Fetch all versions in a path_version, as the same time remove this path from the tracker.

Review comment:
       ```suggestion
       /// Fetch all versions with a path_version, at the same time remove this path from the tracker.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org