You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/07/19 14:04:11 UTC

[incubator-doris] branch master updated: [Compaction] Add delayed deletion of rowsets function, fix -230 error. (#4039)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 03cf9b2  [Compaction] Add delayed deletion of rowsets function, fix -230 error. (#4039)
03cf9b2 is described below

commit 03cf9b2a2449b64e5373d2472d8cc7d811d31525
Author: ZhangYu0123 <67...@users.noreply.github.com>
AuthorDate: Sun Jul 19 22:03:59 2020 +0800

    [Compaction] Add delayed deletion of rowsets function, fix -230 error. (#4039)
    
    Related issue #4017, main changes as follows:
    1. Add expired_snapshot_rs_version_map,_expired_snapshot_rs_metas,
    2. Add  VersionedRowsetTracker record compacted path version
    3. Record path version when rowsets compact
    4. In gc process, add expired snapshot rowsets to unused set to remove.
---
 be/src/common/config.h                             |   6 +
 be/src/olap/CMakeLists.txt                         |   2 +-
 be/src/olap/base_compaction.cpp                    |   4 -
 be/src/olap/cumulative_compaction.cpp              |   4 -
 be/src/olap/rowset_graph.cpp                       | 230 ------
 be/src/olap/rowset_graph.h                         |  52 --
 be/src/olap/tablet.cpp                             | 190 ++++-
 be/src/olap/tablet.h                               |  19 +-
 be/src/olap/tablet_manager.cpp                     |   1 +
 be/src/olap/tablet_meta.cpp                        |  24 +
 be/src/olap/tablet_meta.h                          |  13 +
 be/src/olap/txn_manager.cpp                        |   3 +-
 be/src/olap/version_graph.cpp                      | 462 ++++++++++++
 be/src/olap/version_graph.h                        | 200 +++++
 be/test/olap/CMakeLists.txt                        |   1 +
 be/test/olap/timestamped_version_tracker_test.cpp  | 831 +++++++++++++++++++++
 docs/en/administrator-guide/config/be_config.md    |  19 +-
 .../http-actions/compaction-action.md              |  13 +
 docs/zh-CN/administrator-guide/config/be_config.md | 118 +--
 .../http-actions/compaction-action.md              |  15 +-
 run-ut.sh                                          |   1 +
 21 files changed, 1844 insertions(+), 364 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4915732..d0858aa 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -232,6 +232,8 @@ namespace config {
     CONF_mInt32(pending_data_expire_time_sec, "1800");
     // inc_rowset expired interval
     CONF_mInt32(inc_rowset_expired_sec, "1800");
+    // inc_rowset snapshot rs sweep time interval
+    CONF_mInt32(tablet_rowset_stale_sweep_time_sec, "1800");
     // garbage sweep policy
     CONF_Int32(max_garbage_sweep_interval, "3600");
     CONF_Int32(min_garbage_sweep_interval, "180");
@@ -538,6 +540,10 @@ namespace config {
 
     // Whether to continue to start be when load tablet from header failed.
     CONF_Bool(ignore_load_tablet_failure, "false");
+
+    // Whether to continue to start be when load tablet from header failed.
+    CONF_Bool(ignore_rowset_stale_unconsistent_delete, "false");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 3ade4c0..884c045 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -63,7 +63,7 @@ add_library(Olap STATIC
     row_block.cpp
     row_block2.cpp
     row_cursor.cpp
-    rowset_graph.cpp
+    version_graph.cpp
     schema.cpp
     schema_change.cpp
     serialize.cpp
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 24e90bf..a9305d3 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -56,10 +56,6 @@ OLAPStatus BaseCompaction::compact() {
     DorisMetrics::instance()->base_compaction_bytes_total.increment(_input_rowsets_size);
     TRACE("save base compaction metrics");
 
-    // 5. garbage collect input rowsets after base compaction 
-    RETURN_NOT_OK(gc_unused_rowsets());
-    TRACE("unused rowsets have been moved to GC queue");
-
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index 5ab51ba..c4b7e86 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -65,10 +65,6 @@ OLAPStatus CumulativeCompaction::compact() {
     DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(_input_rowsets_size);
     TRACE("save cumulative compaction metrics");
 
-    // 7. garbage collect input rowsets after cumulative compaction 
-    RETURN_NOT_OK(gc_unused_rowsets());
-    TRACE("unused rowsets have been moved to GC queue");
-
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/rowset_graph.cpp b/be/src/olap/rowset_graph.cpp
deleted file mode 100644
index 2224630..0000000
--- a/be/src/olap/rowset_graph.cpp
+++ /dev/null
@@ -1,230 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap/rowset_graph.h"
-
-#include <queue>
-#include <memory>
-
-#include "common/logging.h"
-
-namespace doris {
-
-void RowsetGraph::construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
-    if (rs_metas.empty()) {
-        VLOG(3) << "there is no version in the header.";
-        return;
-    }
-
-    // Distill vertex values from versions in TabletMeta.
-    std::vector<int64_t> vertex_values;
-    vertex_values.reserve(2 * rs_metas.size());
-
-    for (size_t i = 0; i < rs_metas.size(); ++i) {
-        vertex_values.push_back(rs_metas[i]->start_version());
-        vertex_values.push_back(rs_metas[i]->end_version() + 1);
-    }
-
-    std::sort(vertex_values.begin(), vertex_values.end());
-
-    // Items in vertex_values are sorted, but not unique.
-    // we choose unique items in vertex_values to create vertexes.
-    int64_t last_vertex_value = -1;
-    for (size_t i = 0; i < vertex_values.size(); ++i) {
-        if (i != 0 && vertex_values[i] == last_vertex_value) {
-            continue;
-        }
-
-        // Add vertex to graph.
-        _add_vertex_to_graph(vertex_values[i]);
-        last_vertex_value = vertex_values[i];
-    }
-
-    // Create edges for version graph according to TabletMeta's versions.
-    for (size_t i = 0; i < rs_metas.size(); ++i) {
-        // Versions in header are unique.
-        // We ensure _vertex_index_map has its start_version.
-        int64_t start_vertex_index = _vertex_index_map[rs_metas[i]->start_version()];
-        int64_t end_vertex_index = _vertex_index_map[rs_metas[i]->end_version() + 1];
-        // Add one edge from start_version to end_version.
-        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
-        // Add reverse edge from end_version to start_version.
-        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
-    }
-}
-
-void RowsetGraph::reconstruct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
-    _version_graph.clear();
-    _vertex_index_map.clear();
-    construct_rowset_graph(rs_metas);
-}
-
-void RowsetGraph::add_version_to_graph(const Version& version) {
-    // Add version.first as new vertex of version graph if not exist.
-    int64_t start_vertex_value = version.first;
-    int64_t end_vertex_value = version.second + 1;
-
-    // Add vertex to graph.
-    _add_vertex_to_graph(start_vertex_value);
-    _add_vertex_to_graph(end_vertex_value);
-
-    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
-    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
-
-    // We assume this version is new version, so we just add two edges
-    // into version graph. add one edge from start_version to end_version
-    _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
-
-    // We add reverse edge(from end_version to start_version) to graph
-    _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
-}
-
-OLAPStatus RowsetGraph::delete_version_from_graph(const Version& version) {
-    int64_t start_vertex_value = version.first;
-    int64_t end_vertex_value = version.second + 1;
-
-    if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end()
-          || _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) {
-        LOG(WARNING) << "vertex for version does not exists. "
-                     << "version=" << version.first << "-" << version.second;
-        return OLAP_ERR_HEADER_DELETE_VERSION;
-    }
-
-    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
-    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
-    // Remove edge and its reverse edge.
-    _version_graph[start_vertex_index].edges.remove(end_vertex_index);
-    _version_graph[end_vertex_index].edges.remove(start_vertex_index);
-
-    return OLAP_SUCCESS;
-}
-
-void RowsetGraph::_add_vertex_to_graph(int64_t vertex_value) {
-    // Vertex with vertex_value already exists.
-    if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) {
-        VLOG(3) << "vertex with vertex value already exists. value=" << vertex_value;
-        return;
-    }
-
-    _version_graph.emplace_back(Vertex(vertex_value));
-    _vertex_index_map[vertex_value] = _version_graph.size() - 1;
-}
-
-OLAPStatus RowsetGraph::capture_consistent_versions(const Version& spec_version,
-                                                    std::vector<Version>* version_path) const {
-    if (spec_version.first > spec_version.second) {
-        LOG(WARNING) << "invalid specfied version. "
-                     << "spec_version=" << spec_version.first << "-" << spec_version.second;
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    // bfs_queue's element is vertex_index.
-    std::queue<int64_t> bfs_queue;
-    // predecessor[i] means the predecessor of vertex_index 'i'.
-    std::vector<int64_t> predecessor(_version_graph.size());
-    // visited[int64_t]==true means it had entered bfs_queue.
-    std::vector<bool> visited(_version_graph.size());
-    // [start_vertex_value, end_vertex_value)
-    int64_t start_vertex_value = spec_version.first;
-    int64_t end_vertex_value = spec_version.second + 1;
-    // -1 is invalid vertex index.
-    int64_t start_vertex_index = -1;
-    // -1 is valid vertex index.
-    int64_t end_vertex_index = -1;
-
-    for (size_t i = 0; i < _version_graph.size(); ++i) {
-        if (_version_graph[i].value == start_vertex_value) {
-            start_vertex_index = i;
-        }
-        if (_version_graph[i].value == end_vertex_value) {
-            end_vertex_index = i;
-        }
-    }
-
-    if (start_vertex_index < 0 || end_vertex_index < 0) {
-        LOG(WARNING) << "fail to find path in version_graph. "
-                     << "spec_version: " << spec_version.first << "-" << spec_version.second;
-        return OLAP_ERR_VERSION_NOT_EXIST;
-    }
-
-    for (size_t i = 0; i < _version_graph.size(); ++i) {
-        visited[i] = false;
-    }
-
-    bfs_queue.push(start_vertex_index);
-    visited[start_vertex_index] = true;
-    // The predecessor of root is itself.
-    predecessor[start_vertex_index] = start_vertex_index;
-
-    while (bfs_queue.empty() == false && visited[end_vertex_index] == false) {
-        int64_t top_vertex_index = bfs_queue.front();
-        bfs_queue.pop();
-        for (const auto& it : _version_graph[top_vertex_index].edges) {
-            if (visited[it] == false) {
-                // If we don't support reverse version in the path, and start vertex
-                // value is larger than the end vertex value, we skip this edge.
-                if (_version_graph[top_vertex_index].value > _version_graph[it].value) {
-                    continue;
-                }
-
-                visited[it] = true;
-                predecessor[it] = top_vertex_index;
-                bfs_queue.push(it);
-            }
-        }
-    }
-
-    if (!visited[end_vertex_index]) {
-        LOG(WARNING) << "fail to find path in version_graph. "
-                     << "spec_version: " << spec_version.first << "-" << spec_version.second;
-        return OLAP_ERR_VERSION_NOT_EXIST;
-    }
-
-    std::vector<int64_t> reversed_path;
-    int64_t tmp_vertex_index = end_vertex_index;
-    reversed_path.push_back(tmp_vertex_index);
-
-    // For start_vertex_index, its predecessor must be itself.
-    while (predecessor[tmp_vertex_index] != tmp_vertex_index) {
-        tmp_vertex_index = predecessor[tmp_vertex_index];
-        reversed_path.push_back(tmp_vertex_index);
-    }
-
-    if (version_path != nullptr) {
-        // Make version_path from reversed_path.
-        std::stringstream shortest_path_for_debug;
-        for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) {
-            int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value;
-            int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value;
-
-            // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
-            if (tmp_start_vertex_value <= tmp_end_vertex_value) {
-                version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1);
-            } else {
-                version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1);
-            }
-
-            shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' ';
-        }
-        VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version
-                 << ", path=" << shortest_path_for_debug.str();
-    }
-
-    return OLAP_SUCCESS;
-}
-
-}  // namespace doris
diff --git a/be/src/olap/rowset_graph.h b/be/src/olap/rowset_graph.h
deleted file mode 100644
index a579b24..0000000
--- a/be/src/olap/rowset_graph.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_OLAP_ROWSET_GRAPH_H
-#define DORIS_BE_SRC_OLAP_ROWSET_GRAPH_H
-
-#include "olap/olap_common.h"
-#include "olap/olap_define.h"
-#include "olap/rowset/rowset_meta.h"
-
-namespace doris {
-
-class RowsetGraph {
-public:
-    void construct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas);
-    void reconstruct_rowset_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas);
-    void add_version_to_graph(const Version& version);
-    OLAPStatus delete_version_from_graph(const Version& version);
-    OLAPStatus capture_consistent_versions(const Version& spec_version,
-                                           std::vector<Version>* version_path) const;
-private:
-    void _add_vertex_to_graph(int64_t vertex_value);
-
-    // OLAP version contains two parts, [start_version, end_version]. In order
-    // to construct graph, the OLAP version has two corresponding vertex, one
-    // vertex's value is version.start_version, the other is
-    // version.end_version + 1.
-    // Use adjacency list to describe version graph.
-    std::vector<Vertex> _version_graph;
-
-    // vertex value --> vertex_index of _version_graph
-    // It is easy to find vertex index according to vertex value.
-    std::unordered_map<int64_t, int64_t> _vertex_index_map;
-};
-
-}  // namespace doris
-
-#endif // DORIS_BE_SRC_OLAP_OLAP_ROWSET_GRAPH_H
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 2c3b655..63f3e33 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -61,7 +61,8 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
         _last_cumu_compaction_success_millis(0),
         _last_base_compaction_success_millis(0),
         _cumulative_point(kInvalidCumulativePoint) {
-    _rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas());
+    // change _rs_graph to _timestamped_version_tracker
+    _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());
 }
 
 OLAPStatus Tablet::_init_once_action() {
@@ -173,8 +174,9 @@ OLAPStatus Tablet::revise_tablet_meta(
         }
         _rs_version_map[version] = std::move(rowset);
     }
-
-    _rs_graph.reconstruct_rowset_graph(_tablet_meta->all_rs_metas());
+    
+    // reconstruct from tablet meta
+    _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());
 
     LOG(INFO) << "finish to clone data to tablet. res=" << res << ", "
               << "table=" << full_name() << ", "
@@ -195,7 +197,7 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
 
     RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
     _rs_version_map[rowset->version()] = rowset;
-    _rs_graph.add_version_to_graph(rowset->version());
+    _timestamped_version_tracker.add_version(rowset->version());
 
     vector<RowsetSharedPtr> rowsets_to_delete;
     // yiguolei: temp code, should remove the rowset contains by this rowset
@@ -234,17 +236,25 @@ void Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
     for (auto& rs : to_delete) {
         rs_metas_to_delete.push_back(rs->rowset_meta());
         _rs_version_map.erase(rs->version());
+
+        // put compaction rowsets in _stale_rs_version_map.
+        _stale_rs_version_map[rs->version()] = rs;
     }
 
     vector<RowsetMetaSharedPtr> rs_metas_to_add;
     for (auto& rs : to_add) {
         rs_metas_to_add.push_back(rs->rowset_meta());
         _rs_version_map[rs->version()] = rs;
+
+        _timestamped_version_tracker.add_version(rs->version());
         ++_newly_created_rowset_num;
     }
 
     _tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete);
-    _rs_graph.reconstruct_rowset_graph(_tablet_meta->all_rs_metas());
+    
+    // add rs_metas_to_delete to tracker
+    _timestamped_version_tracker.add_stale_path_version(rs_metas_to_delete);
+
 }
 
 // snapshot manager may call this api to check if version exists, so that
@@ -310,7 +320,9 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
     RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
     _rs_version_map[rowset->version()] = rowset;
     _inc_rs_version_map[rowset->version()] = rowset;
-    _rs_graph.add_version_to_graph(rowset->version());
+
+    _timestamped_version_tracker.add_version(rowset->version());
+
     RETURN_NOT_OK(_tablet_meta->add_inc_rs_meta(rowset->rowset_meta()));
     ++_newly_created_rowset_num;
     return OLAP_SUCCESS;
@@ -329,6 +341,16 @@ void Tablet::_delete_inc_rowset_by_version(const Version& version,
     VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version;
 }
 
+void Tablet::_delete_stale_rowset_by_version(const Version& version) {
+
+    RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_stale_rs_meta_by_version(version);
+    if (rowset_meta == nullptr) {
+        return;
+    }
+    _tablet_meta->delete_stale_rs_meta_by_version(version);
+    VLOG(3) << "delete stale rowset. tablet=" << full_name() << ", version=" << version;
+}
+
 void Tablet::delete_expired_inc_rowsets() {
     int64_t now = UnixSeconds();
     vector<pair<Version, VersionHash>> expired_versions;
@@ -358,9 +380,102 @@ void Tablet::delete_expired_inc_rowsets() {
     save_meta();
 }
 
+void Tablet::delete_expired_stale_rowset() {
+
+    int64_t now = UnixSeconds();
+    vector<pair<Version, VersionHash>> expired_versions;
+    WriteLock wrlock(&_meta_lock);
+    // Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
+    double expired_stale_sweep_endtime = ::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
+    
+    std::vector<int64_t> path_id_vec;
+    // capture the path version to delete
+    _timestamped_version_tracker.capture_expired_paths(static_cast<int64_t>(expired_stale_sweep_endtime), &path_id_vec);
+
+    if (path_id_vec.empty()) {
+        return;
+    }
+
+    // do check consistent operation
+    auto path_id_iter = path_id_vec.rbegin();
+
+    std::map<int64_t, PathVersionListSharedPtr> stale_version_path_map;
+    while (path_id_iter != path_id_vec.rend()) {
+
+        PathVersionListSharedPtr version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(*path_id_iter);
+        const std::vector<TimestampedVersionSharedPtr>& to_delete_version = version_path->timestamped_versions();
+
+        auto first_version = to_delete_version.front();
+        auto end_version = to_delete_version.back();
+        Version test_version = Version(first_version->version().first, end_version->version().second);
+
+        stale_version_path_map[*path_id_iter] = version_path;
+
+        OLAPStatus status = capture_consistent_versions(test_version, nullptr);
+        // When there is no consistent versions, we must reconstruct the tracker.
+        if (status != OLAP_SUCCESS) {
+            LOG(WARNING) << "The consistent version check fails, there are bugs. "
+                << "Reconstruct the tracker to recover versions in tablet=" << tablet_id();
+
+            _timestamped_version_tracker.recover_versioned_tracker(stale_version_path_map);
+
+            // double check the consistent versions
+            status = capture_consistent_versions(test_version, nullptr);
+
+            if (status != OLAP_SUCCESS) {
+                if (!config::ignore_rowset_stale_unconsistent_delete) {
+                    LOG(FATAL) << "rowset stale unconsistent delete. tablet= " << tablet_id();
+                } else {
+                    LOG(WARNING) << "rowset stale unconsistent delete. tablet= " << tablet_id();
+                }
+            }
+            return;
+        }
+        path_id_iter++;
+    }
+
+    auto old_size = _stale_rs_version_map.size();
+    auto old_meta_size = _tablet_meta->all_stale_rs_metas().size();
+
+    // do delete operation
+    auto to_delete_iter = stale_version_path_map.begin();
+    while (to_delete_iter != stale_version_path_map.end()) {
+        
+        std::vector<TimestampedVersionSharedPtr>& to_delete_version = to_delete_iter->second->timestamped_versions();
+        for (auto& timestampedVersion : to_delete_version) {
+            auto it = _stale_rs_version_map.find(timestampedVersion->version());
+            if (it != _stale_rs_version_map.end()) {
+                // delete rowset
+                StorageEngine::instance()->add_unused_rowset(it->second);
+                _stale_rs_version_map.erase(it);
+                LOG(INFO) << "delete stale rowset tablet=" << full_name() 
+                    <<" version[" << timestampedVersion->version().first << "," << timestampedVersion->version().second 
+                    << "] move to unused_rowset success " << std::fixed << expired_stale_sweep_endtime;
+            } else {
+                LOG(WARNING) << "delete stale rowset tablet=" << full_name() 
+                    <<" version[" << timestampedVersion->version().first << "," << timestampedVersion->version().second 
+                    << "] not find in stale rs version map";
+            }
+            _delete_stale_rowset_by_version(timestampedVersion->version());
+        }
+        to_delete_iter++;
+    }
+    LOG(INFO) << "delete stale rowset _stale_rs_version_map tablet="
+        << full_name() << " current_size=" << _stale_rs_version_map.size()
+        << " old_size=" << old_size
+        << " current_meta_size="  <<  _tablet_meta->all_stale_rs_metas().size()
+        << " old_meta_size=" << old_meta_size
+        << " sweep endtime " << std::fixed << expired_stale_sweep_endtime;
+
+    save_meta();
+    
+}
+
 OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
-                                               vector<Version>* version_path) const {
-    OLAPStatus status = _rs_graph.capture_consistent_versions(spec_version, version_path);
+                                               vector<Version>* version_path) const {    
+    // OLAPStatus status = _rs_graph.capture_consistent_versions(spec_version, version_path);
+    OLAPStatus status = _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path);
+
     if (status != OLAP_SUCCESS) {
         std::vector<Version> missed_versions;
         calc_missed_versions_unlocked(spec_version.second, &missed_versions);
@@ -415,13 +530,29 @@ OLAPStatus Tablet::_capture_consistent_rowsets_unlocked(const vector<Version>& v
     DCHECK(rowsets != nullptr && rowsets->empty());
     rowsets->reserve(version_path.size());
     for (auto& version : version_path) {
-        auto it = _rs_version_map.find(version);
-        if (it == _rs_version_map.end()) {
+        bool is_find = false;
+        do {
+            auto it = _rs_version_map.find(version);
+            if (it != _rs_version_map.end()) {
+                is_find = true;
+                rowsets->push_back(it->second);
+                break;
+            }
+
+            auto it_expired = _stale_rs_version_map.find(version);
+            if (it_expired != _stale_rs_version_map.end()) {
+                is_find = true;
+                rowsets->push_back(it_expired->second);
+                break;
+            }
+        } while(0);
+
+        if (!is_find) {
             LOG(WARNING) << "fail to find Rowset for version. tablet=" << full_name()
                          << ", version='" << version;
             return OLAP_ERR_CAPTURE_ROWSET_ERROR;
-        }
-        rowsets->push_back(it->second);
+        } 
+
     }
     return OLAP_SUCCESS;
 }
@@ -440,9 +571,15 @@ OLAPStatus Tablet::capture_rs_readers(const vector<Version>& version_path,
     for (auto version : version_path) {
         auto it = _rs_version_map.find(version);
         if (it == _rs_version_map.end()) {
-            LOG(WARNING) << "fail to find Rowset for version. tablet=" << full_name()
+            VLOG(3) << "fail to find Rowset in rs_version for version. tablet=" << full_name()
                          << ", version='" << version.first << "-" << version.second;
-            return OLAP_ERR_CAPTURE_ROWSET_READER_ERROR;
+
+            it = _stale_rs_version_map.find(version);
+            if (it == _rs_version_map.end()) {
+                LOG(WARNING) << "fail to find Rowset in stale_rs_version for version. tablet=" << full_name()
+                             << ", version='" << version.first << "-" << version.second;
+                return OLAP_ERR_CAPTURE_ROWSET_READER_ERROR;
+            }
         }
         RowsetReaderSharedPtr rs_reader;
         auto res = it->second->create_reader(&rs_reader);
@@ -763,6 +900,7 @@ void Tablet::delete_all_files() {
         it.second->remove();
     }
     _inc_rs_version_map.clear();
+    _stale_rs_version_map.clear();
 }
 
 bool Tablet::check_path(const std::string& path_to_check) const {
@@ -877,6 +1015,9 @@ void Tablet::get_compaction_status(std::string* json_result) {
     rapidjson::Document root;
     root.SetObject();
 
+    rapidjson::Document path_arr;
+    path_arr.SetArray();
+
     std::vector<RowsetSharedPtr> rowsets;
     std::vector<bool> delete_flags;
     {
@@ -891,6 +1032,8 @@ void Tablet::get_compaction_status(std::string* json_result) {
         for (auto& rs : rowsets) {
             delete_flags.push_back(version_for_delete_predicate(rs->version()));
         }
+        // get snapshot version path json_doc
+        _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
     }
 
     root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
@@ -925,6 +1068,9 @@ void Tablet::get_compaction_status(std::string* json_result) {
     }
     root.AddMember("rowsets", versions_arr, root.GetAllocator());
 
+    // add stale version rowsets 
+    root.AddMember("stale version path", path_arr, root.GetAllocator());
+
     // to json string
     rapidjson::StringBuffer strbuf;
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
@@ -967,6 +1113,22 @@ void Tablet::do_tablet_meta_checkpoint() {
         }
         rs_meta->set_remove_from_rowset_meta();
     }
+
+    // check _stale_rs_version_map to remove meta from rowset meta store
+    for (auto& rs_meta : _tablet_meta->all_stale_rs_metas()) {
+        // If we delete it from rowset manager's meta explicitly in previous checkpoint, just skip.
+        if(rs_meta->is_remove_from_rowset_meta()) {
+            continue;
+        }
+        if (RowsetMetaManager::check_rowset_meta(
+                    _data_dir->get_meta(), tablet_uid(), rs_meta->rowset_id())) {
+            RowsetMetaManager::remove(_data_dir->get_meta(), tablet_uid(), rs_meta->rowset_id());
+            LOG(INFO) << "remove rowset id from meta store because it is already persistent with tablet meta"
+                       << ", rowset_id=" << rs_meta->rowset_id();
+        }
+        rs_meta->set_remove_from_rowset_meta();
+    }
+
     _newly_created_rowset_num = 0;
     _last_checkpoint_time = UnixMillis();
 }
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 9248158..3657615 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -30,7 +30,7 @@
 #include "olap/data_dir.h"
 #include "olap/olap_define.h"
 #include "olap/tuple.h"
-#include "olap/rowset_graph.h"
+#include "olap/version_graph.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/tablet_meta.h"
@@ -101,6 +101,11 @@ public:
 
     OLAPStatus add_inc_rowset(const RowsetSharedPtr& rowset);
     void delete_expired_inc_rowsets();
+    /// Delete stale rowset by timing. This delete policy uses now() munis 
+    /// config::tablet_rowset_expired_stale_sweep_time_sec to compute the deadline of expired rowset 
+    /// to delete.  When rowset is deleted, it will be added to StorageEngine unused map and record
+    /// need to delete flag.
+    void delete_expired_stale_rowset();
 
     OLAPStatus capture_consistent_versions(const Version& spec_version,
                                            vector<Version>* version_path) const;
@@ -237,14 +242,17 @@ private:
                                                         VersionHash* v_hash) const ;
     RowsetSharedPtr _rowset_with_largest_size();
     void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash);
+    /// Delete stale rowset by version. This method not only delete the version in expired rowset map,
+    /// but also delete the version in rowset meta vector.
+    void _delete_stale_rowset_by_version(const Version& version);
     OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>& version_path,
                                                     vector<RowsetSharedPtr>* rowsets) const;
 
 private:
     static const int64_t kInvalidCumulativePoint = -1;
 
-    RowsetGraph _rs_graph;
-
+    TimestampedVersionTracker _timestamped_version_tracker;
+    
     DorisCallOnce<OLAPStatus> _init_once;
     // meta store lock is used for prevent 2 threads do checkpoint concurrently
     // it will be used in econ-mode in the future
@@ -269,7 +277,10 @@ private:
     // _inc_rs_version_map may do not exist in _rs_version_map.
     std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _rs_version_map;
     std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _inc_rs_version_map;
-
+    // This variable _stale_rs_version_map is used to record these rowsets which are be compacted.
+    // These _stale rowsets are been removed when rowsets' pathVersion is expired, 
+    // this policy is judged and computed by TimestampedVersionTracker.
+    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;
     // if this tablet is broken, set to true. default is false
     std::atomic<bool> _is_bad;
     // timestamp of last cumu compaction failure
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 3c3fbe5..b6d7211 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -982,6 +982,7 @@ OLAPStatus TabletManager::start_trash_sweep() {
 
             for (const auto& tablet : all_tablets) {
                 tablet->delete_expired_inc_rowsets();
+                tablet->delete_expired_stale_rowset();
             }
             all_tablets.clear();
 
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 4b9a45f..7f75bc8 100755
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -497,6 +497,7 @@ void TabletMeta::delete_rs_meta_by_version(const Version& version,
 
 void TabletMeta::modify_rs_metas(const vector<RowsetMetaSharedPtr>& to_add,
                                  const vector<RowsetMetaSharedPtr>& to_delete) {
+    // Remove to_delete rowsets from _rs_metas                                 
     for (auto rs_to_del : to_delete) {
         auto it = _rs_metas.begin();
         while (it != _rs_metas.end()) {
@@ -512,6 +513,9 @@ void TabletMeta::modify_rs_metas(const vector<RowsetMetaSharedPtr>& to_add,
             }
         }
     }
+    // put to_delete rowsets in _stale_rs_metas.
+    _stale_rs_metas.insert(_stale_rs_metas.end(), to_delete.begin(), to_delete.end());
+    // put to_add rowsets in _rs_metas.
     _rs_metas.insert(_rs_metas.end(), to_add.begin(), to_add.end());
 }
 
@@ -544,6 +548,26 @@ OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
     return OLAP_SUCCESS;
 }
 
+void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) {
+    auto it = _stale_rs_metas.begin();
+    while (it != _stale_rs_metas.end()) {
+        if ((*it)->version() == version) {
+            it = _stale_rs_metas.erase(it);
+        } else {
+            it++;
+        }
+    }
+}
+
+RowsetMetaSharedPtr TabletMeta::acquire_stale_rs_meta_by_version(const Version& version) const {
+    for (auto it : _stale_rs_metas) {
+        if (it->version() == version) {
+            return it;
+        }
+    }
+    return nullptr;
+}
+
 void TabletMeta::delete_inc_rs_meta_by_version(const Version& version) {
     auto it = _inc_rs_metas.begin();
     while (it != _inc_rs_metas.end()) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index f2c51cb..8ae8ccd 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -174,9 +174,12 @@ public:
 
     void revise_inc_rs_metas(std::vector<RowsetMetaSharedPtr>&& rs_metas);
     inline const std::vector<RowsetMetaSharedPtr>& all_inc_rs_metas() const;
+    inline const std::vector<RowsetMetaSharedPtr>& all_stale_rs_metas() const;
     OLAPStatus add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta);
     void delete_inc_rs_meta_by_version(const Version& version);
     RowsetMetaSharedPtr acquire_inc_rs_meta_by_version(const Version& version) const;
+    void delete_stale_rs_meta_by_version(const Version& version);
+    RowsetMetaSharedPtr acquire_stale_rs_meta_by_version(const Version& version) const;
 
     void add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version);
     void remove_delete_predicate_by_version(const Version& version);
@@ -219,8 +222,14 @@ private:
 
     TabletState _tablet_state = TABLET_NOTREADY;
     TabletSchema _schema;
+
     std::vector<RowsetMetaSharedPtr> _rs_metas;
     std::vector<RowsetMetaSharedPtr> _inc_rs_metas;
+    // This variable _stale_rs_metas is used to record these rowsets‘ meta which are be compacted.
+    // These stale rowsets meta are been removed when rowsets' pathVersion is expired, 
+    // this policy is judged and computed by TimestampedVersionTracker.
+    std::vector<RowsetMetaSharedPtr> _stale_rs_metas;
+
     DelPredicateArray _del_pred_array;
     AlterTabletTaskSharedPtr _alter_task;
     bool _in_restore_mode = false;
@@ -325,6 +334,10 @@ inline const std::vector<RowsetMetaSharedPtr>& TabletMeta::all_inc_rs_metas() co
     return _inc_rs_metas;
 }
 
+inline const std::vector<RowsetMetaSharedPtr>& TabletMeta::all_stale_rs_metas() const {
+    return _stale_rs_metas;
+}
+
 // Only for unit test now.
 bool operator==(const TabletMeta& a, const TabletMeta& b);
 bool operator!=(const TabletMeta& a, const TabletMeta& b);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 35b3633..2d0baa4 100755
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -309,7 +309,8 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
                       << " partition_id: " << key.first
                       << ", txn_id: " << key.second
                       << ", tablet: " << tablet_info.to_string()
-                      << ", rowsetid: " << rowset_ptr->rowset_id();
+                      << ", rowsetid: " << rowset_ptr->rowset_id()
+                      << ", version: " << version.first <<"," << version.second;
             if (it->second.empty()) {
                 txn_tablet_map.erase(it);
                 _clear_txn_partition_map_unlocked(transaction_id, partition_id);
diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp
new file mode 100644
index 0000000..47f0f59
--- /dev/null
+++ b/be/src/olap/version_graph.cpp
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/version_graph.h"
+
+#include <memory>
+#include <queue>
+
+#include "common/logging.h"
+#include "util/time.h"
+
+namespace doris {
+
+void TimestampedVersionTracker::_construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+    int64_t max_version = 0;
+
+    // construct the roset graph
+    _version_graph.reconstruct_version_graph(rs_metas, &max_version);
+}
+
+void TimestampedVersionTracker::construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+    _stale_version_path_map.clear();
+    _next_path_id = 1;
+    _construct_versioned_tracker(rs_metas);
+}
+
+
+void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) {
+
+    auto path_arr_iter = _stale_version_path_map.begin();
+
+    // do loop version path 
+    while (path_arr_iter != _stale_version_path_map.end()) {
+
+        auto path_id = path_arr_iter->first;
+        auto path_version_path = path_arr_iter->second;
+
+        rapidjson::Document item;
+        item.SetObject();
+        // add path_id to item
+        auto path_id_str = std::to_string(path_id);
+        rapidjson::Value path_id_value;
+        path_id_value.SetString(path_id_str.c_str(), path_id_str.length(), path_arr.GetAllocator());
+        item.AddMember("path id", path_id_value, path_arr.GetAllocator());
+
+        // add max create time to item
+        std::string create_time_str = ToStringFromUnix(path_version_path->max_create_time());
+        rapidjson::Value create_time_value;
+        create_time_value.SetString(create_time_str.c_str(), create_time_str.length(), path_arr.GetAllocator());
+        item.AddMember("last create time", create_time_value, path_arr.GetAllocator());
+
+        // add path list to item
+        std::stringstream path_list_stream;
+        path_list_stream << path_id_str;
+        auto path_list_ptr = path_version_path->timestamped_versions();
+        auto path_list_iter = path_list_ptr.begin();
+        while (path_list_iter != path_list_ptr.end()) {
+            path_list_stream << " -> ";
+            path_list_stream << "[";
+            path_list_stream << (*path_list_iter)->version().first;
+            path_list_stream << "-";
+            path_list_stream << (*path_list_iter)->version().second;
+            path_list_stream << "]";
+            path_list_iter++;
+        }
+        std::string path_list = path_list_stream.str();
+        rapidjson::Value path_list_value;
+        path_list_value.SetString(path_list.c_str(), path_list.length(), path_arr.GetAllocator());
+        item.AddMember("path list", path_list_value, path_arr.GetAllocator());
+        
+        // add item to path_arr
+        path_arr.PushBack(item, path_arr.GetAllocator());
+
+        path_arr_iter++;
+    }
+}
+
+void TimestampedVersionTracker::recover_versioned_tracker(const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) {
+    
+    auto _path_map_iter = stale_version_path_map.begin();
+    // recover stale_version_path_map
+    while (_path_map_iter != stale_version_path_map.end()) {
+        // add PathVersionListSharedPtr to map
+        _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second;
+
+        std::vector<TimestampedVersionSharedPtr>& timestamped_versions = _path_map_iter->second->timestamped_versions();
+        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = timestamped_versions.begin();
+        while (version_path_iter != timestamped_versions.end()) {
+            // add version to _version_graph
+            _version_graph.add_version_to_graph((*version_path_iter)->version());
+            version_path_iter++;
+        }
+        _path_map_iter++;
+    }
+    LOG(INFO) <<"recover_versioned_tracker current map info "<< _get_current_path_map_str();
+}
+
+void TimestampedVersionTracker::add_version(const Version& version) {
+    _version_graph.add_version_to_graph(version);
+}
+
+void TimestampedVersionTracker::add_stale_path_version(
+        const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) {
+    if (stale_rs_metas.empty()) {
+        VLOG(3) << "there is no version in the stale_rs_metas.";
+        return;
+    }
+
+    PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer());
+    for (auto rs : stale_rs_metas) {
+        TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->creation_time()));
+        ptr->add_timestamped_version(vt_ptr);
+    }
+
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions();
+
+    struct TimestampedVersionPtrCompare {
+        bool operator () (const TimestampedVersionSharedPtr ptr1, const TimestampedVersionSharedPtr ptr2) {
+            return ptr1->version().first < ptr2->version().first;
+        }
+    };
+    sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare());
+    _stale_version_path_map[_next_path_id] = ptr;
+    _next_path_id++;
+}
+
+// Capture consistent versions from graph
+OLAPStatus TimestampedVersionTracker::capture_consistent_versions(
+        const Version& spec_version, std::vector<Version>* version_path) const {
+    return _version_graph.capture_consistent_versions(spec_version, version_path);
+}
+
+void TimestampedVersionTracker::capture_expired_paths(
+        int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
+
+    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
+            _stale_version_path_map.begin();
+
+    while (iter != _stale_version_path_map.end()) {
+        int64_t max_create_time = iter->second->max_create_time();
+        if (max_create_time <= stale_sweep_endtime) {
+            int64_t path_version = iter->first;
+            path_version_vec->push_back(path_version);
+        }
+        iter++;
+    }
+}
+
+PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) {
+    if (_stale_version_path_map.count(path_id) == 0) {
+        VLOG(3) << "path version " << path_id << " does not exist!";
+        return nullptr;
+    }
+
+    return _stale_version_path_map[path_id];
+}
+
+PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) {
+    if (_stale_version_path_map.count(path_id) == 0) {
+        VLOG(3) << "path version " << path_id << " does not exist!";
+        return nullptr;
+    }
+
+    LOG(INFO) << _get_current_path_map_str();
+    PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id);
+
+    _stale_version_path_map.erase(path_id);
+
+    for (auto& version : ptr->timestamped_versions()) {
+        _version_graph.delete_version_from_graph(version->version());
+    }
+    return ptr;
+}
+
+std::string TimestampedVersionTracker::_get_current_path_map_str() {
+
+    std::stringstream tracker_info;
+    tracker_info << "current expired next_path_id " << _next_path_id << std::endl;
+
+    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
+            _stale_version_path_map.begin();
+    while (iter != _stale_version_path_map.end()) {
+        
+        tracker_info << "current expired path_version " << iter->first;
+        std::vector<TimestampedVersionSharedPtr>& timestamped_versions = iter->second->timestamped_versions();
+        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = timestamped_versions.begin();
+        int64_t max_create_time = -1;
+        while (version_path_iter != timestamped_versions.end()) {
+            if (max_create_time < (*version_path_iter)->get_create_time()) {
+                max_create_time = (*version_path_iter)->get_create_time();
+            }
+            tracker_info << " -> [";
+            tracker_info << (*version_path_iter)->version().first;
+            tracker_info << ",";
+            tracker_info << (*version_path_iter)->version().second;
+            tracker_info << "]";
+
+            version_path_iter++;
+        }
+
+        tracker_info << std::endl;
+        iter++;
+    }
+    return tracker_info.str();
+}
+
+void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) {
+    // compare and refresh _max_create_time
+    if (version->get_create_time() > _max_create_time) {
+        _max_create_time = version->get_create_time();
+    }
+    _timestamped_versions_container.push_back(version);
+}
+
+std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() {
+    return _timestamped_versions_container;
+}
+
+void VersionGraph::construct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
+                                         int64_t* max_version) {
+    if (rs_metas.empty()) {
+        VLOG(3) << "there is no version in the header.";
+        return;
+    }
+
+    // Distill vertex values from versions in TabletMeta.
+    std::vector<int64_t> vertex_values;
+    vertex_values.reserve(2 * rs_metas.size());
+
+    for (size_t i = 0; i < rs_metas.size(); ++i) {
+        vertex_values.push_back(rs_metas[i]->start_version());
+        vertex_values.push_back(rs_metas[i]->end_version() + 1);
+        if ( max_version != nullptr and *max_version < rs_metas[i]->end_version()) {
+            *max_version = rs_metas[i]->end_version();
+        }
+    }
+    std::sort(vertex_values.begin(), vertex_values.end());
+
+    // Items in vertex_values are sorted, but not unique.
+    // we choose unique items in vertex_values to create vertexes.
+    int64_t last_vertex_value = -1;
+    for (size_t i = 0; i < vertex_values.size(); ++i) {
+        if (i != 0 && vertex_values[i] == last_vertex_value) {
+            continue;
+        }
+
+        // Add vertex to graph.
+        _add_vertex_to_graph(vertex_values[i]);
+        last_vertex_value = vertex_values[i];
+    }
+    // Create edges for version graph according to TabletMeta's versions.
+    for (size_t i = 0; i < rs_metas.size(); ++i) {
+        // Versions in header are unique.
+        // We ensure _vertex_index_map has its start_version.
+        int64_t start_vertex_index = _vertex_index_map[rs_metas[i]->start_version()];
+        int64_t end_vertex_index = _vertex_index_map[rs_metas[i]->end_version() + 1];
+        // Add one edge from start_version to end_version.
+        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
+        // Add reverse edge from end_version to start_version.
+        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
+
+    }
+
+}
+
+void VersionGraph::reconstruct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
+                                           int64_t* max_version) {
+    _version_graph.clear();
+    _vertex_index_map.clear();
+
+    construct_version_graph(rs_metas, max_version);
+}
+
+void VersionGraph::add_version_to_graph(const Version& version) {
+    // Add version.first as new vertex of version graph if not exist.
+    int64_t start_vertex_value = version.first;
+    int64_t end_vertex_value = version.second + 1;
+
+    // Add vertex to graph.
+    _add_vertex_to_graph(start_vertex_value);
+    _add_vertex_to_graph(end_vertex_value);
+
+    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
+    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
+
+    // We assume this version is new version, so we just add two edges
+    // into version graph. add one edge from start_version to end_version
+    _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
+
+    // We add reverse edge(from end_version to start_version) to graph
+    _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
+}
+
+OLAPStatus VersionGraph::delete_version_from_graph(const Version& version) {
+    int64_t start_vertex_value = version.first;
+    int64_t end_vertex_value = version.second + 1;
+
+    if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() ||
+        _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) {
+        LOG(WARNING) << "vertex for version does not exists. "
+                     << "version=" << version.first << "-" << version.second;
+        return OLAP_ERR_HEADER_DELETE_VERSION;
+    }
+
+    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
+    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
+    // Remove edge and its reverse edge.
+    // When there are same versions in edges, just remove the frist version.
+    auto start_edges_iter = _version_graph[start_vertex_index].edges.begin();
+    while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) {
+        if (*start_edges_iter == end_vertex_index) {
+            _version_graph[start_vertex_index].edges.erase(start_edges_iter);
+            break;
+        }
+        start_edges_iter++;
+    }
+
+    auto end_edges_iter = _version_graph[end_vertex_index].edges.begin();
+    while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) {
+        if (*end_edges_iter == start_vertex_index) {
+            _version_graph[end_vertex_index].edges.erase(end_edges_iter);
+            break;
+        }
+        end_edges_iter++;
+    }
+
+    return OLAP_SUCCESS;
+}
+
+void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) {
+    // Vertex with vertex_value already exists.
+    if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) {
+        VLOG(3) << "vertex with vertex value already exists. value=" << vertex_value;
+        return;
+    }
+
+    _version_graph.emplace_back(Vertex(vertex_value));
+    _vertex_index_map[vertex_value] = _version_graph.size() - 1;
+}
+
+OLAPStatus VersionGraph::capture_consistent_versions(const Version& spec_version,
+                                                    std::vector<Version>* version_path) const {
+    if (spec_version.first > spec_version.second) {
+        LOG(WARNING) << "invalid specfied version. "
+                     << "spec_version=" << spec_version.first << "-" << spec_version.second;
+        return OLAP_ERR_INPUT_PARAMETER_ERROR;
+    }
+
+    // bfs_queue's element is vertex_index.
+    std::queue<int64_t> bfs_queue;
+    // predecessor[i] means the predecessor of vertex_index 'i'.
+    std::vector<int64_t> predecessor(_version_graph.size());
+    // visited[int64_t]==true means it had entered bfs_queue.
+    std::vector<bool> visited(_version_graph.size());
+    // [start_vertex_value, end_vertex_value)
+    int64_t start_vertex_value = spec_version.first;
+    int64_t end_vertex_value = spec_version.second + 1;
+    // -1 is invalid vertex index.
+    int64_t start_vertex_index = -1;
+    // -1 is valid vertex index.
+    int64_t end_vertex_index = -1;
+
+    for (size_t i = 0; i < _version_graph.size(); ++i) {
+        if (_version_graph[i].value == start_vertex_value) {
+            start_vertex_index = i;
+        }
+        if (_version_graph[i].value == end_vertex_value) {
+            end_vertex_index = i;
+        }
+    }
+
+    if (start_vertex_index < 0 || end_vertex_index < 0) {
+        LOG(WARNING) << "fail to find path in version_graph. "
+                     << "spec_version: " << spec_version.first << "-" << spec_version.second;
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
+
+    for (size_t i = 0; i < _version_graph.size(); ++i) {
+        visited[i] = false;
+    }
+
+    bfs_queue.push(start_vertex_index);
+    visited[start_vertex_index] = true;
+    // The predecessor of root is itself.
+    predecessor[start_vertex_index] = start_vertex_index;
+
+    while (bfs_queue.empty() == false && visited[end_vertex_index] == false) {
+        int64_t top_vertex_index = bfs_queue.front();
+        bfs_queue.pop();
+        for (const auto& it : _version_graph[top_vertex_index].edges) {
+            if (visited[it] == false) {
+                // If we don't support reverse version in the path, and start vertex
+                // value is larger than the end vertex value, we skip this edge.
+                if (_version_graph[top_vertex_index].value > _version_graph[it].value) {
+                    continue;
+                }
+
+                visited[it] = true;
+                predecessor[it] = top_vertex_index;
+                bfs_queue.push(it);
+            }
+        }
+    }
+
+    if (!visited[end_vertex_index]) {
+        LOG(WARNING) << "fail to find path in version_graph. "
+                     << "spec_version: " << spec_version.first << "-" << spec_version.second;
+        return OLAP_ERR_VERSION_NOT_EXIST;
+    }
+
+    std::vector<int64_t> reversed_path;
+    int64_t tmp_vertex_index = end_vertex_index;
+    reversed_path.push_back(tmp_vertex_index);
+
+    // For start_vertex_index, its predecessor must be itself.
+    while (predecessor[tmp_vertex_index] != tmp_vertex_index) {
+        tmp_vertex_index = predecessor[tmp_vertex_index];
+        reversed_path.push_back(tmp_vertex_index);
+    }
+
+    if (version_path != nullptr) {
+        // Make version_path from reversed_path.
+        std::stringstream shortest_path_for_debug;
+        for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) {
+            int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value;
+            int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value;
+
+            // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
+            if (tmp_start_vertex_value <= tmp_end_vertex_value) {
+                version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1);
+            } else {
+                version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1);
+            }
+
+            shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' ';
+        }
+        VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version
+                 << ", path=" << shortest_path_for_debug.str();
+    }
+
+    return OLAP_SUCCESS;
+}
+
+} // namespace doris
diff --git a/be/src/olap/version_graph.h b/be/src/olap/version_graph.h
new file mode 100644
index 0000000..b8cac9f
--- /dev/null
+++ b/be/src/olap/version_graph.h
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_VERSION_GRAPH_H
+#define DORIS_BE_SRC_OLAP_VERSION_GRAPH_H
+
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/document.h>
+
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris {
+/// VersionGraph class which is implemented to build and maintain total versions of rowsets. 
+/// This class use adjacency-matrix represent rowsets version and links. A vertex is a version 
+/// and a link is the _version object of a rowset (from start version to end version + 1).
+/// Use this class, when given a spec_version, we can get a version path which is the shortest path
+/// in the graph.
+class VersionGraph {
+public:
+    /// Use rs_metas to construct the graph including vertex and edges, and return the
+    /// max_version in metas.
+    void construct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
+                                int64_t* max_version);
+    /// Reconstruct the graph, begin construction the vertex vec and edges list will be cleared.
+    void reconstruct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
+                                  int64_t* max_version);
+    /// Add a version to this graph, graph will add the vesion and edge in version.
+    void add_version_to_graph(const Version& version);
+    /// Delete a version from graph. Notice that this del operation only remove this edges and 
+    /// remain the vertex.
+    OLAPStatus delete_version_from_graph(const Version& version);
+    /// Given a spec_version, this method can find a version path which is the shortest path
+    /// in the graph. The version paths are added to version_path as return info.
+    OLAPStatus capture_consistent_versions(const Version& spec_version,
+                                           std::vector<Version>* version_path) const;
+
+private:
+    /// Private method add a version to graph. 
+    void _add_vertex_to_graph(int64_t vertex_value);
+
+    // OLAP version contains two parts, [start_version, end_version]. In order
+    // to construct graph, the OLAP version has two corresponding vertex, one
+    // vertex's value is version.start_version, the other is
+    // version.end_version + 1.
+    // Use adjacency list to describe version graph.
+    std::vector<Vertex> _version_graph;
+
+    // vertex value --> vertex_index of _version_graph
+    // It is easy to find vertex index according to vertex value.
+    std::unordered_map<int64_t, int64_t> _vertex_index_map;
+};
+
+/// TimestampedVersion class which is implemented to maintain multi-version path of rowsets. 
+/// This compaction info of a rowset includes start version, end version and the create time.
+class TimestampedVersion {
+public:
+    /// TimestampedVersion construction function. Use rowset version and create time to build a TimestampedVersion.
+    TimestampedVersion(const Version& version, int64_t create_time)
+            : _version(version.first, version.second), _create_time(create_time) {}
+
+    ~TimestampedVersion() {}
+
+    /// Return the rowset version of TimestampedVersion record.
+    Version version() const { return _version; }
+    /// Return the rowset create_time of TimestampedVersion record.
+    int64_t get_create_time() { return _create_time; }
+
+    /// Compare two version trackers.
+    bool operator!=(const TimestampedVersion& rhs) const {
+        return _version != rhs._version;
+    }
+
+    /// Compare two version trackers.
+    bool operator==(const TimestampedVersion& rhs) const {
+        return _version == rhs._version;
+    }
+
+    /// Judge if a tracker contains the other.
+    bool contains(const TimestampedVersion& other) const {
+        return _version.contains(other._version);
+    }
+
+private:
+    Version _version;
+    int64_t _create_time;
+};
+
+using TimestampedVersionSharedPtr = std::shared_ptr<TimestampedVersion>;
+
+/// TimestampedVersionPathContainer class is used to maintain a path timestamped version path
+/// and record the max create time in a path version. Once a timestamped version is added, the max_create_time
+/// will compare with the version timestamp and be refreshed.
+class TimestampedVersionPathContainer {
+
+public:
+    /// TimestampedVersionPathContainer construction function, max_create_time is assgined to 0.
+    TimestampedVersionPathContainer():_max_create_time(0) {
+    }
+
+    /// Return the max create time in a path version.
+    int64_t max_create_time() { return _max_create_time; }
+
+    /// Add a timestamped version to timestamped_versions_container. Once a timestamped version is added, 
+    /// the max_create_time will compare with the version timestamp and be refreshed.
+    void add_timestamped_version(TimestampedVersionSharedPtr version);
+
+    /// Return the timestamped_versions_container as const type.
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions();
+
+private:
+    std::vector<TimestampedVersionSharedPtr> _timestamped_versions_container;
+    int64_t _max_create_time;
+};
+
+using PathVersionListSharedPtr = std::shared_ptr<TimestampedVersionPathContainer>;
+
+/// TimestampedVersionTracker class is responsible to track all rowsets version links of a tablet.
+/// This class not only records the graph of all versions, but also records the paths which will be removed
+/// after the path is expired.
+class TimestampedVersionTracker {
+public:
+    /// Construct rowsets version tracker by rs_metas and stale version path map.
+    void construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas);
+
+    /// Recover rowsets version tracker from stale version path map. When delete operation fails, the 
+    /// tracker can be recovered from deleted stale_version_path_map.
+    void recover_versioned_tracker(const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map);
+
+    /// Add a version to tracker, this version is a new version rowset, not merged rowset.
+    void add_version(const Version& version);
+
+    /// Add a version path with stale_rs_metas, this versions in version path
+    /// are merged rowsets.  These rowsets are tracked and removed after they are expired.
+    /// TabletManager sweep these rowsets using tracker by timing.
+    void add_stale_path_version(
+            const std::vector<RowsetMetaSharedPtr>& stale_rs_metas);
+
+    /// Given a spec_version, this method can find a version path which is the shortest path
+    /// in the graph. The version paths are added to version_path as return info.
+    /// If this version not in main version, version_path can be included expired rowset.
+    OLAPStatus capture_consistent_versions(const Version& spec_version,
+                                           std::vector<Version>* version_path) const;
+
+    /// Capture all expired path version.
+    /// When the last rowset createtime of a path greater than expired time  which can be expressed
+    /// "now() - tablet_rowset_stale_sweep_time_sec" , this path will be remained.
+    /// Otherwise, this path will be added to path_version.
+    void capture_expired_paths(int64_t stale_sweep_endtime,
+                                      std::vector<int64_t>* path_version) const;
+
+    /// Fetch all versions with a path_version.
+    PathVersionListSharedPtr fetch_path_version_by_id(int64_t path_id);
+
+    /// Fetch all versions with a path_version, at the same time remove this path from the tracker.
+    /// Next time, fetch this path, it will return empty. 
+    PathVersionListSharedPtr fetch_and_delete_path_by_id(int64_t path_id);
+
+    /// Print all expired version path in a tablet.
+    std::string _get_current_path_map_str();
+
+    /// Get json document of _stale_version_path_map. Fill the path_id and version_path 
+    /// list in the document. The parameter path arr is used as return variable.
+    void get_stale_version_path_json_doc(rapidjson::Document& path_arr);
+
+private:
+    /// Construct rowsets version tracker with stale rowsets.
+    void _construct_versioned_tracker(const std::vector<RowsetMetaSharedPtr>& rs_metas);
+
+private:
+    // This variable records the id of path version which will be dispatched to next path version, 
+    // it is not persisted.
+    int64_t _next_path_id = 1;
+    
+    // path_version -> list of path version,
+    // This variable is used to maintain the map from path version and it's all version. 
+    std::map<int64_t, PathVersionListSharedPtr> _stale_version_path_map;
+
+    VersionGraph _version_graph;
+};
+
+} // namespace doris
+
+#endif // DORIS_BE_SRC_OLAP_OLAP_VERSION_GRAPH_H
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 75db505..76e0569 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -21,6 +21,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/test/olap")
 # where to put generated binaries
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/olap")
 
+ADD_BE_TEST(timestamped_version_tracker_test)
 ADD_BE_TEST(row_block_test)
 ADD_BE_TEST(row_block_v2_test)
 ADD_BE_TEST(bit_field_test)
diff --git a/be/test/olap/timestamped_version_tracker_test.cpp b/be/test/olap/timestamped_version_tracker_test.cpp
new file mode 100644
index 0000000..ed39878
--- /dev/null
+++ b/be/test/olap/timestamped_version_tracker_test.cpp
@@ -0,0 +1,831 @@
+#include <gtest/gtest.h>
+#include <sstream>
+#include <fstream>
+#include "boost/filesystem.hpp"
+#include "json2pb/json_to_pb.h"
+
+#include "olap/version_graph.h"
+#include "olap/olap_meta.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris {
+
+using RowsetMetaSharedContainerPtr = std::shared_ptr<std::vector<RowsetMetaSharedPtr>>;
+
+class TestTimestampedVersionTracker : public testing::Test {
+public:
+    TestTimestampedVersionTracker() {}
+    void SetUp() {
+        _json_rowset_meta = R"({
+            "rowset_id": 540081,
+            "tablet_id": 15673,
+            "txn_id": 4042,
+            "tablet_schema_hash": 567997577,
+            "rowset_type": "ALPHA_ROWSET",
+            "rowset_state": "VISIBLE",
+            "start_version": 2,
+            "end_version": 2,
+            "version_hash": 8391828013814912580,
+            "num_rows": 3929,
+            "total_disk_size": 84699,
+            "data_disk_size": 84464,
+            "index_disk_size": 235,
+            "empty": false,
+            "load_id": {
+                "hi": -5350970832824939812,
+                "lo": -6717994719194512122
+            },
+            "creation_time": 1553765670,
+            "alpha_rowset_extra_meta_pb": {
+                "segment_groups": [
+                {
+                    "segment_group_id": 0,
+                    "num_segments": 1,
+                    "index_size": 132,
+                    "data_size": 576,
+                    "num_rows": 5,
+                    "zone_maps": [
+                    {
+                        "min": "MQ==",
+                        "max": "NQ==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "MQ==",
+                        "max": "Mw==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "J2J1c2gn",
+                        "max": "J3RvbSc=",
+                        "null_flag": false
+                    }
+                    ],
+                    "empty": false
+                },
+                {
+                    "segment_group_id": 1,
+                    "num_segments": 1,
+                    "index_size": 132,
+                    "data_size": 576,
+                    "num_rows": 5,
+                    "zone_maps": [
+                    {
+                        "min": "MQ==",
+                        "max": "NQ==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "MQ==",
+                        "max": "Mw==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "J2J1c2gn",
+                        "max": "J3RvbSc=",
+                        "null_flag": false
+                    }
+                    ],
+                    "empty": false
+                }
+                ]
+            }
+        })";
+    }
+    void TearDown() {
+    }
+
+    void init_rs_meta(RowsetMetaSharedPtr &pb1, int64_t start, int64_t end) {
+
+        pb1->init_from_json(_json_rowset_meta);
+        pb1->set_start_version(start);
+        pb1->set_end_version(end);
+        pb1->set_creation_time(10000);
+    }
+
+    void init_all_rs_meta(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 0);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 1, 1);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 2, 5);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 6, 9);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 10, 11);
+        rs_metas->push_back(ptr5);
+    }
+
+    void init_expried_row_rs_meta(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 2, 3);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 4, 5);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 6, 6);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 7, 8);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 6, 8);
+        rs_metas->push_back(ptr5);
+
+        RowsetMetaSharedPtr ptr6(new RowsetMeta());
+        init_rs_meta(ptr6, 9, 9);
+        rs_metas->push_back(ptr6);
+
+        RowsetMetaSharedPtr ptr7(new RowsetMeta());
+        init_rs_meta(ptr7, 10, 10);
+        rs_metas->push_back(ptr7);
+
+    }
+
+    void init_expried_row_rs_meta_with_same_rowset(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+
+        RowsetMetaSharedPtr ptr0(new RowsetMeta());
+        init_rs_meta(ptr0, 1, 1);
+        rs_metas->push_back(ptr0);
+
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 2, 3);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 4, 5);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 6, 6);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 7, 8);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 6, 8);
+        rs_metas->push_back(ptr5);
+
+        RowsetMetaSharedPtr ptr6(new RowsetMeta());
+        init_rs_meta(ptr6, 9, 9);
+        rs_metas->push_back(ptr6);
+
+        RowsetMetaSharedPtr ptr7(new RowsetMeta());
+        init_rs_meta(ptr7, 10, 10);
+        rs_metas->push_back(ptr7);
+
+    }
+
+    void fetch_expried_row_rs_meta(std::vector<RowsetMetaSharedContainerPtr>* rs_metas) {
+
+        RowsetMetaSharedContainerPtr v2(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 2, 3);
+        v2->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 4, 5);
+        v2->push_back(ptr2);
+
+        RowsetMetaSharedContainerPtr v3(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 6, 6);
+        v3->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 7, 8);
+        v3->push_back(ptr4);
+
+        RowsetMetaSharedContainerPtr v4(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 6, 8);
+        v4->push_back(ptr5);
+
+        RowsetMetaSharedPtr ptr6(new RowsetMeta());
+        init_rs_meta(ptr6, 9, 9);
+        v4->push_back(ptr6);
+
+        RowsetMetaSharedContainerPtr v5(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr7(new RowsetMeta());
+        init_rs_meta(ptr7, 10, 10);
+        v5->push_back(ptr7);
+
+        rs_metas->push_back(v2);
+        rs_metas->push_back(v3);
+        rs_metas->push_back(v4);
+        rs_metas->push_back(v5);
+    }
+
+    void fetch_expried_row_rs_meta_with_same_rowset(std::vector<RowsetMetaSharedContainerPtr>* rs_metas) {
+
+        RowsetMetaSharedContainerPtr v1(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr0(new RowsetMeta());
+        init_rs_meta(ptr0, 1, 1);
+        v1->push_back(ptr0);
+
+        RowsetMetaSharedContainerPtr v2(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 2, 3);
+        v2->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 4, 5);
+        v2->push_back(ptr2);
+
+        RowsetMetaSharedContainerPtr v3(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 6, 6);
+        v3->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 7, 8);
+        v3->push_back(ptr4);
+
+        RowsetMetaSharedContainerPtr v4(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 6, 8);
+        v4->push_back(ptr5);
+
+        RowsetMetaSharedPtr ptr6(new RowsetMeta());
+        init_rs_meta(ptr6, 9, 9);
+        v4->push_back(ptr6);
+
+        RowsetMetaSharedContainerPtr v5(new std::vector<RowsetMetaSharedPtr>());
+        RowsetMetaSharedPtr ptr7(new RowsetMeta());
+        init_rs_meta(ptr7, 10, 10);
+        v5->push_back(ptr7);
+
+        rs_metas->push_back(v1);
+        rs_metas->push_back(v2);
+        rs_metas->push_back(v3);
+        rs_metas->push_back(v4);
+        rs_metas->push_back(v5);
+    }
+
+private:
+    OlapMeta* _meta;
+    std::string _json_rowset_meta;
+
+};
+
+TEST_F(TestTimestampedVersionTracker, construct_version_graph) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    VersionGraph version_graph;
+
+    init_all_rs_meta(&rs_metas);
+    int64_t max_version = 0;
+    version_graph.construct_version_graph(rs_metas, &max_version);
+
+    ASSERT_EQ(6, version_graph._version_graph.size());
+    int64_t exp = 11;
+    ASSERT_EQ(exp, max_version);
+}
+
+TEST_F(TestTimestampedVersionTracker, construct_version_graph_with_same_version) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedPtr> expried_rs_metas;
+
+    VersionGraph version_graph;
+
+    init_all_rs_meta(&rs_metas);
+
+    rs_metas.insert(rs_metas.end(), expried_rs_metas.begin(),
+                        expried_rs_metas.end());
+    int64_t max_version = 0;
+    version_graph.construct_version_graph(rs_metas, &max_version);
+
+    ASSERT_EQ(6, version_graph._version_graph.size());
+    int64_t exp = 11;
+    ASSERT_EQ(exp, max_version);
+}
+
+TEST_F(TestTimestampedVersionTracker, reconstruct_version_graph) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    VersionGraph version_graph;
+
+    init_all_rs_meta(&rs_metas);
+    int64_t max_version = 0;
+    version_graph.reconstruct_version_graph(rs_metas, &max_version);
+
+    ASSERT_EQ(6, version_graph._version_graph.size());
+    int64_t exp = 11;
+    ASSERT_EQ(exp, max_version);
+}
+
+TEST_F(TestTimestampedVersionTracker, delete_version_from_graph) {
+
+    VersionGraph version_graph;
+
+    Version version0(0, 0);
+
+    version_graph.add_version_to_graph(version0);
+    version_graph.delete_version_from_graph(version0);
+
+    ASSERT_EQ(2, version_graph._version_graph.size());
+    ASSERT_EQ(0, version_graph._version_graph[0].edges.size());
+}
+
+TEST_F(TestTimestampedVersionTracker, delete_version_from_graph_with_same_version) {
+
+    VersionGraph version_graph;
+
+    Version version0(0, 0);
+    Version version1(0, 0);
+
+    version_graph.add_version_to_graph(version0);
+    version_graph.add_version_to_graph(version1);
+
+    version_graph.delete_version_from_graph(version0);
+
+    ASSERT_EQ(2, version_graph._version_graph.size());
+    ASSERT_EQ(1, version_graph._version_graph[0].edges.size());
+}
+
+TEST_F(TestTimestampedVersionTracker, add_version_to_graph) {
+
+    VersionGraph version_graph;
+
+    Version version0(0, 0);
+    Version version1(1, 1);
+
+    version_graph.add_version_to_graph(version0);
+    version_graph.add_version_to_graph(version1);
+
+    ASSERT_EQ(3, version_graph._version_graph.size());
+    ASSERT_EQ(0, version_graph._vertex_index_map.find(0)->second);
+    ASSERT_EQ(1, version_graph._vertex_index_map.find(1)->second);
+}
+
+TEST_F(TestTimestampedVersionTracker, add_version_to_graph_with_same_version) {
+
+    VersionGraph version_graph;
+
+    Version version0(0, 0);
+    Version version1(0, 0);
+
+    version_graph.add_version_to_graph(version0);
+    version_graph.add_version_to_graph(version1);
+
+    ASSERT_EQ(2, version_graph._version_graph.size());
+    ASSERT_EQ(2, version_graph._version_graph[0].edges.size());
+}
+
+TEST_F(TestTimestampedVersionTracker, capture_consistent_versions) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    init_expried_row_rs_meta(&expried_rs_metas);
+
+    VersionGraph version_graph;
+    int64_t max_version = 0;
+    rs_metas.insert(rs_metas.end(), expried_rs_metas.begin(),
+                        expried_rs_metas.end());
+
+    version_graph.construct_version_graph(rs_metas, &max_version);
+
+    Version spec_version(0, 8);
+    version_graph.capture_consistent_versions(spec_version, &version_path);
+
+    ASSERT_EQ(4, version_path.size());
+    ASSERT_EQ(Version(0, 0), version_path[0]);
+    ASSERT_EQ(Version(1, 1), version_path[1]);
+    ASSERT_EQ(Version(2, 5), version_path[2]);
+    ASSERT_EQ(Version(6, 8), version_path[3]);
+}
+
+TEST_F(TestTimestampedVersionTracker, capture_consistent_versions_with_same_rowset) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    init_expried_row_rs_meta_with_same_rowset(&expried_rs_metas);
+
+    VersionGraph version_graph;
+    int64_t max_version = 0;
+    rs_metas.insert(rs_metas.end(), expried_rs_metas.begin(),
+                        expried_rs_metas.end());
+
+    version_graph.construct_version_graph(rs_metas, &max_version);
+
+    Version spec_version(0, 8);
+    version_graph.capture_consistent_versions(spec_version, &version_path);
+
+    ASSERT_EQ(4, version_path.size());
+    ASSERT_EQ(Version(0, 0), version_path[0]);
+    ASSERT_EQ(Version(1, 1), version_path[1]);
+    ASSERT_EQ(Version(2, 5), version_path[2]);
+    ASSERT_EQ(Version(6, 8), version_path[3]);
+}
+
+TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    init_expried_row_rs_meta(&expried_rs_metas);
+
+    rs_metas.insert(rs_metas.end(), expried_rs_metas.begin(),
+                        expried_rs_metas.end());
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+
+    ASSERT_EQ(10, tracker._version_graph._version_graph.size());
+    ASSERT_EQ(0, tracker._stale_version_path_map.size());
+    ASSERT_EQ(1, tracker._next_path_id);
+}
+
+TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker_with_same_rowset) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    init_expried_row_rs_meta_with_same_rowset(&expried_rs_metas);
+
+    rs_metas.insert(rs_metas.end(), expried_rs_metas.begin(),
+                        expried_rs_metas.end());
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+
+    ASSERT_EQ(10, tracker._version_graph._version_graph.size());
+    ASSERT_EQ(0, tracker._stale_version_path_map.size());
+    ASSERT_EQ(1, tracker._next_path_id);
+}
+
+TEST_F(TestTimestampedVersionTracker, recover_versioned_tracker) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    init_expried_row_rs_meta(&expried_rs_metas);
+    rs_metas.insert(rs_metas.end(), expried_rs_metas.begin(),
+                        expried_rs_metas.end());
+
+    const std::map<int64_t, PathVersionListSharedPtr> stale_version_path_map;
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    tracker.recover_versioned_tracker(stale_version_path_map);
+
+    ASSERT_EQ(10, tracker._version_graph._version_graph.size());
+    ASSERT_EQ(0, tracker._stale_version_path_map.size());
+    ASSERT_EQ(1, tracker._next_path_id);
+}
+
+TEST_F(TestTimestampedVersionTracker, add_version) {
+
+    TimestampedVersionTracker tracker;
+
+    Version version0(0, 0);
+    Version version1(1, 1);
+
+    tracker.add_version(version0);
+    tracker.add_version(version1);
+
+    ASSERT_EQ(3, tracker._version_graph._version_graph.size());
+    ASSERT_EQ(0, tracker._version_graph._vertex_index_map.find(0)->second);
+    ASSERT_EQ(1, tracker._version_graph._vertex_index_map.find(1)->second);
+}
+
+TEST_F(TestTimestampedVersionTracker, add_version_with_same_rowset) {
+
+    TimestampedVersionTracker tracker;
+
+    Version version0(0, 0);
+    Version version1(0, 0);
+
+    tracker.add_version(version0);
+    tracker.add_version(version1);
+
+    ASSERT_EQ(2, tracker._version_graph._version_graph.size());
+    ASSERT_EQ(2, tracker._version_graph._version_graph[0].edges.size());
+}
+
+TEST_F(TestTimestampedVersionTracker, add_stale_path_version) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+
+    init_expried_row_rs_meta(&expried_rs_metas);
+    tracker.add_stale_path_version(expried_rs_metas);
+
+    ASSERT_EQ(1, tracker._stale_version_path_map.size());
+    ASSERT_EQ(7, tracker._stale_version_path_map.begin()->second->timestamped_versions().size());
+}
+
+TEST_F(TestTimestampedVersionTracker, add_stale_path_version_with_same_rowset) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+
+    fetch_expried_row_rs_meta_with_same_rowset(&expried_rs_metas);
+    for(auto ptr:expried_rs_metas) {
+        tracker.add_stale_path_version(*ptr);
+    }
+
+    ASSERT_EQ(5, tracker._stale_version_path_map.size());
+    ASSERT_EQ(1, tracker._stale_version_path_map.begin()->second->timestamped_versions().size());
+}
+
+TEST_F(TestTimestampedVersionTracker, capture_consistent_versions_tracker) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    fetch_expried_row_rs_meta(&expried_rs_metas);
+
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    for(auto ptr:expried_rs_metas) {
+        for (auto rs : *ptr) {
+            tracker.add_version(rs->version());
+        }
+        tracker.add_stale_path_version(*ptr);
+    }
+
+    Version spec_version(0, 8);
+    tracker.capture_consistent_versions(spec_version, &version_path);
+
+    ASSERT_EQ(4, version_path.size());
+    ASSERT_EQ(Version(0, 0), version_path[0]);
+    ASSERT_EQ(Version(1, 1), version_path[1]);
+    ASSERT_EQ(Version(2, 5), version_path[2]);
+    ASSERT_EQ(Version(6, 8), version_path[3]);
+}
+
+TEST_F(TestTimestampedVersionTracker, capture_consistent_versions_tracker_with_same_rowset) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    fetch_expried_row_rs_meta_with_same_rowset(&expried_rs_metas);
+
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    for(auto ptr:expried_rs_metas) {
+        for (auto rs : *ptr) {
+            tracker.add_version(rs->version());
+        }
+        tracker.add_stale_path_version(*ptr);
+    }
+
+    Version spec_version(0, 8);
+    tracker.capture_consistent_versions(spec_version, &version_path);
+
+    ASSERT_EQ(4, version_path.size());
+    ASSERT_EQ(Version(0, 0), version_path[0]);
+    ASSERT_EQ(Version(1, 1), version_path[1]);
+    ASSERT_EQ(Version(2, 5), version_path[2]);
+    ASSERT_EQ(Version(6, 8), version_path[3]);
+}
+
+TEST_F(TestTimestampedVersionTracker, fetch_and_delete_path_version) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+
+    init_all_rs_meta(&rs_metas);
+    fetch_expried_row_rs_meta(&expried_rs_metas);
+
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    for(auto ptr:expried_rs_metas) {
+        for (auto rs : *ptr) {
+            tracker.add_version(rs->version());
+        }
+        tracker.add_stale_path_version(*ptr);
+    }
+
+    ASSERT_EQ(4, tracker._stale_version_path_map.size());
+
+    Version spec_version(0, 8);
+    PathVersionListSharedPtr ptr = tracker.fetch_and_delete_path_by_id(1);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions();
+
+    ASSERT_EQ(2, timestamped_versions.size());
+    ASSERT_EQ(Version(2, 3), timestamped_versions[0]->version());
+    ASSERT_EQ(Version(4, 5), timestamped_versions[1]->version());
+
+    ptr = tracker.fetch_and_delete_path_by_id(2);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions2 = ptr->timestamped_versions();
+    ASSERT_EQ(2, timestamped_versions2.size());
+    ASSERT_EQ(Version(6, 6), timestamped_versions2[0]->version());
+    ASSERT_EQ(Version(7, 8), timestamped_versions2[1]->version());
+
+    ptr = tracker.fetch_and_delete_path_by_id(3);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions3 = ptr->timestamped_versions();
+    ASSERT_EQ(2, timestamped_versions3.size());
+    ASSERT_EQ(Version(6, 8), timestamped_versions3[0]->version());
+    ASSERT_EQ(Version(9, 9), timestamped_versions3[1]->version());
+
+    ptr = tracker.fetch_and_delete_path_by_id(4);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions4 = ptr->timestamped_versions();
+    ASSERT_EQ(1, timestamped_versions4.size());
+    ASSERT_EQ(Version(10, 10), timestamped_versions4[0]->version());
+
+    ASSERT_EQ(0, tracker._stale_version_path_map.size());
+}
+
+TEST_F(TestTimestampedVersionTracker, fetch_and_delete_path_version_with_same_rowset) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+
+    init_all_rs_meta(&rs_metas);
+    fetch_expried_row_rs_meta_with_same_rowset(&expried_rs_metas);
+
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    for(auto ptr:expried_rs_metas) {
+        for (auto rs : *ptr) {
+            tracker.add_version(rs->version());
+        }
+        tracker.add_stale_path_version(*ptr);
+    }
+
+    ASSERT_EQ(5, tracker._stale_version_path_map.size());
+
+    PathVersionListSharedPtr ptr = tracker.fetch_and_delete_path_by_id(1);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions();
+    ASSERT_EQ(1, timestamped_versions.size());
+    ASSERT_EQ(Version(1, 1), timestamped_versions[0]->version());
+
+    ptr = tracker.fetch_and_delete_path_by_id(2);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions2 = ptr->timestamped_versions();
+    ASSERT_EQ(2, timestamped_versions2.size());
+    ASSERT_EQ(Version(2, 3), timestamped_versions2[0]->version());
+    ASSERT_EQ(Version(4, 5), timestamped_versions2[1]->version());
+
+    ptr = tracker.fetch_and_delete_path_by_id(3);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions3 = ptr->timestamped_versions();
+    ASSERT_EQ(2, timestamped_versions3.size());
+    ASSERT_EQ(Version(6, 6), timestamped_versions3[0]->version());
+    ASSERT_EQ(Version(7, 8), timestamped_versions3[1]->version());
+
+    ptr = tracker.fetch_and_delete_path_by_id(4);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions4 = ptr->timestamped_versions();
+    ASSERT_EQ(2, timestamped_versions4.size());
+    ASSERT_EQ(Version(6, 8), timestamped_versions4[0]->version());
+    ASSERT_EQ(Version(9, 9), timestamped_versions4[1]->version());
+
+    ptr = tracker.fetch_and_delete_path_by_id(5);
+    std::vector<TimestampedVersionSharedPtr>& timestamped_versions5 = ptr->timestamped_versions();
+    ASSERT_EQ(1, timestamped_versions5.size());
+    ASSERT_EQ(Version(10, 10), timestamped_versions5[0]->version());
+
+    ASSERT_EQ(0, tracker._stale_version_path_map.size());
+}
+
+TEST_F(TestTimestampedVersionTracker, capture_expired_path_version) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+    std::vector<int64_t> path_version;
+
+    init_all_rs_meta(&rs_metas);
+    fetch_expried_row_rs_meta(&expried_rs_metas);
+
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    for(auto ptr:expried_rs_metas) {
+        for (auto rs : *ptr) {
+            tracker.add_version(rs->version());
+        }
+        tracker.add_stale_path_version(*ptr);
+    }
+
+    tracker.capture_expired_paths(9999, &path_version);
+    ASSERT_EQ(0, path_version.size());
+
+    tracker.capture_expired_paths(10001, &path_version);
+    ASSERT_EQ(4, path_version.size());
+}
+
+TEST_F(TestTimestampedVersionTracker, get_stale_version_path_json_doc) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    fetch_expried_row_rs_meta(&expried_rs_metas);
+
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    for(auto ptr:expried_rs_metas) {
+        for (auto rs : *ptr) {
+            tracker.add_version(rs->version());
+        }
+        tracker.add_stale_path_version(*ptr);
+    }
+    rapidjson::Document path_arr;
+    path_arr.SetArray();
+
+    tracker.get_stale_version_path_json_doc(path_arr);
+    rapidjson::StringBuffer strbuf;
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
+    path_arr.Accept(writer);
+    std::string json_result = std::string(strbuf.GetString());
+
+    std::string expect_result = R"([
+    {
+        "path id": "1",
+        "last create time": "1970-01-01 10:46:40",
+        "path list": "1 -> [2-3] -> [4-5]"
+    },
+    {
+        "path id": "2",
+        "last create time": "1970-01-01 10:46:40",
+        "path list": "2 -> [6-6] -> [7-8]"
+    },
+    {
+        "path id": "3",
+        "last create time": "1970-01-01 10:46:40",
+        "path list": "3 -> [6-8] -> [9-9]"
+    },
+    {
+        "path id": "4",
+        "last create time": "1970-01-01 10:46:40",
+        "path list": "4 -> [10-10]"
+    }
+])";
+
+    ASSERT_EQ(expect_result, json_result);
+}
+
+TEST_F(TestTimestampedVersionTracker, get_stale_version_path_json_doc_empty) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    std::vector<RowsetMetaSharedContainerPtr> expried_rs_metas;
+    std::vector<Version> version_path;
+
+    init_all_rs_meta(&rs_metas);
+    fetch_expried_row_rs_meta(&expried_rs_metas);
+
+    TimestampedVersionTracker tracker;
+    tracker.construct_versioned_tracker(rs_metas);
+    
+    rapidjson::Document path_arr;
+    path_arr.SetArray();
+
+    tracker.get_stale_version_path_json_doc(path_arr);
+
+    rapidjson::StringBuffer strbuf;
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
+    path_arr.Accept(writer);
+    std::string json_result = std::string(strbuf.GetString());
+
+    std::string expect_result = R"([])";
+
+    ASSERT_EQ(expect_result, json_result);
+}
+}
+
+// @brief Test Stub
+int main(int argc, char** argv) {
+    testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS(); 
+}
\ No newline at end of file
diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md
index 0486efe..196cd5e 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -245,6 +245,15 @@ When BE starts, it will check all the paths under the `storage_root_path` in  co
 
 The default value is `false`.
 
+### ignore_rowset_stale_unconsistent_delete
+
+* Type: boolean
+* Description:It is used to decide whether to delete the outdated merged rowset if it cannot form a consistent version path.
+* Default: false
+
+The merged expired rowset version path will be deleted after half an hour. In abnormal situations, deleting these versions will result in the problem that the consistent path of the query cannot be constructed. When the configuration is false, the program check is strict and the program will directly report an error and exit.
+When configured as true, the program will run normally and ignore this error. In general, ignoring this error will not affect the query, only when the merged version is dispathed by fe, -230 error will appear.
+
 ### inc_rowset_expired_sec
 
 * Type: boolean
@@ -262,8 +271,6 @@ Indicates how many tablets in this data directory failed to load. At the same ti
 1. If the tablet information is not repairable, you can delete the wrong tablet through the `meta_tool` tool under the condition that other copies are normal.
 2. Set `ignore_load_tablet_failure` to true, BE will ignore these wrong tablets and start normally.
 
-### `inc_rowset_expired_sec`
-
 ### `index_stream_cache_capacity`
 
 ### `load_data_reserve_hours`
@@ -454,6 +461,14 @@ Indicates how many tablets in this data directory failed to load. At the same ti
 
 ### `tablet_stat_cache_update_interval_second`
 
+### `tablet_rowset_stale_sweep_time_sec`
+
+* Type: int64
+* Description: It is used to control the expiration time of cleaning up the merged rowset version. When the current time now() minus the max created rowset‘s create time in a version path is greater than tablet_rowset_stale_sweep_time_sec, the current path is cleaned up and these merged rowsets are deleted, the unit is second.
+* Default: 1800
+
+When writing is too frequent and the disk time is insufficient, you can configure less tablet_rowset_stale_sweep_time_sec. However, if this time is less than 5 minutes, it may cause fe to query the version that has been merged, causing a query -230 error.
+
 ### `tablet_writer_open_rpc_timeout_sec`
 
 ### `tc_free_memory_rate`
diff --git a/docs/en/administrator-guide/http-actions/compaction-action.md b/docs/en/administrator-guide/http-actions/compaction-action.md
index 1b3d4f2..9627f8f 100644
--- a/docs/en/administrator-guide/http-actions/compaction-action.md
+++ b/docs/en/administrator-guide/http-actions/compaction-action.md
@@ -63,6 +63,18 @@ If the tablet exists, the result is returned in JSON format:
         "[49-49] 2 DATA OVERLAPPING",
         "[50-50] 0 DELETE NONOVERLAPPING",
         "[51-51] 5 DATA OVERLAPPING"
+    ],
+    "stale version path": [
+        {
+            "path id": "2",
+            "last create time": "2019-12-16 18:11:15.110",
+            "path list": "2-> [0-24] -> [25-48]"
+        }, 
+        {
+            "path id": "1",
+            "last create time": "2019-12-16 18:13:15.110",
+            "path list": "1-> [25-40] -> [40-48]"
+        }
     ]
 }
 ```
@@ -73,6 +85,7 @@ Explanation of results:
 * last cumulative failure time: The time when the last cumulative compaction failed. After 10 minutes by default, cumulative compaction is attempted on the this tablet again.
 * last base failure time: The time when the last base compaction failed. After 10 minutes by default, base compaction is attempted on the this tablet again.
 * rowsets: The current rowsets collection of this tablet. [0-48] means a rowset with version 0-48. The second number is the number of segments in a rowset. The `DELETE` indicates the delete version. `OVERLAPPING` and `NONOVERLAPPING` indicates whether data between segments is overlap.
+* stale version path: The merged version path of the rowset collection currently merged in the tablet. It is an array structure and each element represents a merged path. Each element contains three attributes: path id indicates the version path id, and last create time indicates the creation time of the most recent rowset on the path. By default, all rowsets on this path will be deleted after half an hour at the last create time.
 
 ### Examples
 
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md
index fd75d88..658ce21 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -55,6 +55,31 @@ under the License.
 
 ### `base_cumulative_delta_ratio`
 
+### `base_compaction_trace_threshold`
+
+* 类型:int32
+* 描述:打印base compaction的trace信息的阈值,单位秒
+* 默认值:10
+
+base compaction是一个耗时较长的后台操作,为了跟踪其运行信息,可以调整这个阈值参数来控制trace日志的打印。打印信息如下:
+
+```
+W0610 11:26:33.804431 56452 storage_engine.cpp:552] Trace:
+0610 11:23:03.727535 (+     0us) storage_engine.cpp:554] start to perform base compaction
+0610 11:23:03.728961 (+  1426us) storage_engine.cpp:560] found best tablet 546859
+0610 11:23:03.728963 (+     2us) base_compaction.cpp:40] got base compaction lock
+0610 11:23:03.729029 (+    66us) base_compaction.cpp:44] rowsets picked
+0610 11:24:51.784439 (+108055410us) compaction.cpp:46] got concurrency lock and start to do compaction
+0610 11:24:51.784818 (+   379us) compaction.cpp:74] prepare finished
+0610 11:26:33.359265 (+101574447us) compaction.cpp:87] merge rowsets finished
+0610 11:26:33.484481 (+125216us) compaction.cpp:102] output rowset built
+0610 11:26:33.484482 (+     1us) compaction.cpp:106] check correctness finished
+0610 11:26:33.513197 (+ 28715us) compaction.cpp:110] modify rowsets finished
+0610 11:26:33.513300 (+   103us) base_compaction.cpp:49] compaction finished
+0610 11:26:33.513441 (+   141us) base_compaction.cpp:56] unused rowsets have been moved to GC queue
+Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"input_rowsets_data_size":1256413170,"input_segments_num":44,"merge_rowsets_latency_us":101574444,"merged_rows":0,"output_row_num":3346807,"output_rowset_data_size":1228439659,"output_segments_num":6}
+```
+
 ### `be_port`
 
 ### `be_service_threads`
@@ -113,6 +138,14 @@ under the License.
 
 ### `cumulative_compaction_skip_window_seconds`
 
+### `cumulative_compaction_trace_threshold`
+
+* 类型:int32
+* 描述:打印cumulative compaction的trace信息的阈值,单位秒
+* 默认值:2
+
+与base_compaction_trace_threshold类似。
+
 ### `default_num_rows_per_column_file_block`
 
 ### `default_query_options`
@@ -209,6 +242,32 @@ under the License.
 
 ​    默认为false
 
+### `ignore_load_tablet_failure`
+
+* 类型:布尔
+* 描述:用来决定在有tablet 加载失败的情况下是否忽略错误,继续启动be
+* 默认值:false
+
+BE启动时,会对每个数据目录单独启动一个线程进行 tablet header 元信息的加载。默认配置下,如果某个数据目录有 tablet 加载失败,则启动进程会终止。同时会在 `be.INFO` 日志中看到如下错误信息:
+
+```
+load tablets from header failed, failed tablets size: xxx, path=xxx
+```
+
+表示该数据目录共有多少 tablet 加载失败。同时,日志中也会有加载失败的 tablet 的具体信息。此时需要人工介入来对错误原因进行排查。排查后,通常有两种方式进行恢复:
+
+1. tablet 信息不可修复,在确保其他副本正常的情况下,可以通过 `meta_tool` 工具将错误的tablet删除。
+2. 将 `ignore_load_tablet_failure` 设置为 true,则 BE 会忽略这些错误的 tablet,正常启动。
+
+### `ignore_rowset_stale_unconsistent_delete`
+
+* 类型:布尔
+* 描述:用来决定当删除过期的合并过的rowset后无法构成一致的版本路径时,是否仍要删除。
+* 默认值:false
+
+合并的过期 rowset 版本路径会在半个小时后进行删除。在异常下,删除这些版本会出现构造不出查询一致路径的问题,当配置为false时,程序检查比较严格,程序会直接报错退出。
+当配置为true时,程序会正常运行,忽略这个错误。一般情况下,忽略这个错误不会对查询造成影响,仅会在fe下发了合并过的版本时出现-230错误。
+
 ### `inc_rowset_expired_sec`
 
 ### `index_stream_cache_capacity`
@@ -401,6 +460,14 @@ under the License.
 
 ### `tablet_stat_cache_update_interval_second`
 
+### `tablet_rowset_stale_sweep_time_sec`
+
+* 类型:int64
+* 描述:用来表示清理合并版本的过期时间,当当前时间 now() 减去一个合并的版本路径中rowset最近创建创建时间大于tablet_rowset_stale_sweep_time_sec时,对当前路径进行清理,删除这些合并过的rowset, 单位为s。
+* 默认值:1800
+
+当写入过于频繁,磁盘时间不足时,可以配置较少这个时间。不过这个时间过短小于5分钟时,可能会引发fe查询不到已经合并过的版本,引发查询-230错误。
+
 ### `tablet_writer_open_rpc_timeout_sec`
 
 ### `tc_free_memory_rate`
@@ -448,54 +515,3 @@ under the License.
 ### `webserver_port`
 
 ### `write_buffer_size`
-
-### ignore_load_tablet_failure
-
-* 类型:布尔
-* 描述:用来决定在有tablet 加载失败的情况下是否忽略错误,继续启动be
-* 默认值:false
-
-BE启动时,会对每个数据目录单独启动一个线程进行 tablet header 元信息的加载。默认配置下,如果某个数据目录有 tablet 加载失败,则启动进程会终止。同时会在 `be.INFO` 日志中看到如下错误信息:
-
-```
-load tablets from header failed, failed tablets size: xxx, path=xxx
-```
-
-表示该数据目录共有多少 tablet 加载失败。同时,日志中也会有加载失败的 tablet 的具体信息。此时需要人工介入来对错误原因进行排查。排查后,通常有两种方式进行恢复:
-
-1. tablet 信息不可修复,在确保其他副本正常的情况下,可以通过 `meta_tool` 工具将错误的tablet删除。
-2. 将 `ignore_load_tablet_failure` 设置为 true,则 BE 会忽略这些错误的 tablet,正常启动。
-
-### base_compaction_trace_threshold
-
-* 类型:int32
-* 描述:打印base compaction的trace信息的阈值,单位秒
-* 默认值:10
-
-base compaction是一个耗时较长的后台操作,为了跟踪其运行信息,可以调整这个阈值参数来控制trace日志的打印。打印信息如下:
-
-```
-W0610 11:26:33.804431 56452 storage_engine.cpp:552] Trace:
-0610 11:23:03.727535 (+     0us) storage_engine.cpp:554] start to perform base compaction
-0610 11:23:03.728961 (+  1426us) storage_engine.cpp:560] found best tablet 546859
-0610 11:23:03.728963 (+     2us) base_compaction.cpp:40] got base compaction lock
-0610 11:23:03.729029 (+    66us) base_compaction.cpp:44] rowsets picked
-0610 11:24:51.784439 (+108055410us) compaction.cpp:46] got concurrency lock and start to do compaction
-0610 11:24:51.784818 (+   379us) compaction.cpp:74] prepare finished
-0610 11:26:33.359265 (+101574447us) compaction.cpp:87] merge rowsets finished
-0610 11:26:33.484481 (+125216us) compaction.cpp:102] output rowset built
-0610 11:26:33.484482 (+     1us) compaction.cpp:106] check correctness finished
-0610 11:26:33.513197 (+ 28715us) compaction.cpp:110] modify rowsets finished
-0610 11:26:33.513300 (+   103us) base_compaction.cpp:49] compaction finished
-0610 11:26:33.513441 (+   141us) base_compaction.cpp:56] unused rowsets have been moved to GC queue
-Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"input_rowsets_data_size":1256413170,"input_segments_num":44,"merge_rowsets_latency_us":101574444,"merged_rows":0,"output_row_num":3346807,"output_rowset_data_size":1228439659,"output_segments_num":6}
-```
-
-### cumulative_compaction_trace_threshold
-
-* 类型:int32
-* 描述:打印cumulative compaction的trace信息的阈值,单位秒
-* 默认值:2
-
-与base_compaction_trace_threshold类似。
-
diff --git a/docs/zh-CN/administrator-guide/http-actions/compaction-action.md b/docs/zh-CN/administrator-guide/http-actions/compaction-action.md
index ddf7d1f..c4a04e7 100644
--- a/docs/zh-CN/administrator-guide/http-actions/compaction-action.md
+++ b/docs/zh-CN/administrator-guide/http-actions/compaction-action.md
@@ -56,13 +56,25 @@ curl -X GET http://be_host:webserver_port/api/compaction/show?tablet_id=xxxx\&sc
     "cumulative point": 50,
     "last cumulative failure time": "2019-12-16 18:13:43.224",
     "last base failure time": "2019-12-16 18:13:23.320",
-    "last cumu success time": "2019-12-16 18:12:15.110",
+    "last cumu success time": ,
     "last base success time": "2019-12-16 18:11:50.780",
     "rowsets": [
         "[0-48] 10 DATA OVERLAPPING",
         "[49-49] 2 DATA OVERLAPPING",
         "[50-50] 0 DELETE NONOVERLAPPING",
         "[51-51] 5 DATA OVERLAPPING"
+    ],
+    "stale version path": [
+        {
+            "path id": "2",
+            "last create time": "2019-12-16 18:11:15.110",
+            "path list": "2-> [0-24] -> [25-48]"
+        }, 
+        {
+            "path id": "1",
+            "last create time": "2019-12-16 18:13:15.110",
+            "path list": "1-> [25-40] -> [40-48]"
+        }
     ]
 }
 ```
@@ -73,6 +85,7 @@ curl -X GET http://be_host:webserver_port/api/compaction/show?tablet_id=xxxx\&sc
 * last cumulative failure time:上一次尝试 cumulative compaction 失败的时间。默认 10min 后才会再次尝试对该 tablet 做 cumulative compaction。
 * last base failure time:上一次尝试 base compaction 失败的时间。默认 10min 后才会再次尝试对该 tablet 做 base compaction。
 * rowsets:该 tablet 当前的 rowset 集合。如 [0-48] 表示 0-48 版本。第二位数字表示该版本中 segment 的数量。`DELETE` 表示 delete 版本。`DATA` 表示数据版本。`OVERLAPPING` 和 `NONOVERLAPPING` 表示segment数据是否重叠。
+* stale version path:该 table 当前被合并rowset集合的合并版本路径,该结构是一个数组结构,每个元素表示一个合并路径。每个元素中包含了三个属性:path id 表示版本路径id,last create time 表示当前路径上最近的 rowset 创建时间,默认在这个时间半个小时之后这条路径上的所有 rowset 会被过期删除。
 
 ### 示例
 
diff --git a/run-ut.sh b/run-ut.sh
index bf81772..f657058 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -269,6 +269,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/skiplist_test
 ${DORIS_TEST_BINARY_DIR}/olap/serialize_test
 # ${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test
 ${DORIS_TEST_BINARY_DIR}/olap/options_test
+${DORIS_TEST_BINARY_DIR}/olap/timestamped_version_tracker_test
 
 # Running memory engine Unittest
 ${DORIS_TEST_BINARY_DIR}/olap/memory/hash_index_test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org