You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/16 03:25:17 UTC

[doris] branch master updated: [Feature](binlog) Add binlog gc && Auth master_token (#20854)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d41edd9eb [Feature](binlog) Add binlog gc && Auth master_token (#20854)
9d41edd9eb is described below

commit 9d41edd9eb713b3b502b4001204bd45e2718cdcb
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Fri Jun 16 11:25:11 2023 +0800

    [Feature](binlog) Add binlog gc && Auth master_token (#20854)
    
    
    Signed-off-by: Jack Drogon <ja...@gmail.com>
---
 be/src/agent/agent_server.cpp                      |   9 +
 be/src/agent/agent_server.h                        |   1 +
 be/src/agent/task_worker_pool.cpp                  |  39 ++++
 be/src/agent/task_worker_pool.h                    |   4 +
 be/src/olap/binlog.h                               |   8 +-
 be/src/olap/rowset/rowset_meta_manager.cpp         |   1 +
 be/src/olap/storage_engine.cpp                     | 144 +--------------
 be/src/olap/storage_engine.h                       |   5 +-
 be/src/olap/tablet.cpp                             |  83 +++++++++
 be/src/olap/tablet.h                               |   1 +
 .../java/org/apache/doris/binlog/BinlogGcer.java   | 179 ++++++++++++++++++
 .../org/apache/doris/binlog/BinlogManager.java     |  94 +++++++---
 .../org/apache/doris/binlog/BinlogTombstone.java   | 103 +++++++++++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 204 ++++++++++++++++++++-
 .../java/org/apache/doris/binlog/TableBinlog.java  | 114 ++++++++++++
 .../java/org/apache/doris/binlog/UpsertRecord.java |  16 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  12 ++
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../org/apache/doris/persist/BinlogGcInfo.java     |  67 +++++++
 .../java/org/apache/doris/persist/EditLog.java     |  10 +
 .../org/apache/doris/persist/OperationType.java    |   2 +
 .../apache/doris/service/FrontendServiceImpl.java  |  38 +++-
 .../java/org/apache/doris/task/AgentBatchTask.java |  10 +
 .../java/org/apache/doris/task/BinlogGcTask.java   |  46 +++++
 gensrc/proto/olap_file.proto                       |   5 +-
 gensrc/thrift/AgentService.thrift                  |  10 +
 gensrc/thrift/FrontendService.thrift               |  13 ++
 gensrc/thrift/Types.thrift                         |   3 +-
 28 files changed, 1055 insertions(+), 172 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index a7f2f9aa09..4660902333 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -128,6 +128,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
     CREATE_AND_START_THREAD(REPORT_OLAP_TABLE, _report_tablet_workers);
     CREATE_AND_START_POOL(SUBMIT_TABLE_COMPACTION, _submit_table_compaction_workers);
     CREATE_AND_START_POOL(PUSH_STORAGE_POLICY, _push_storage_policy_workers);
+    CREATE_AND_START_THREAD(GC_BINLOG, _gc_binlog_workers);
 #undef CREATE_AND_START_POOL
 #undef CREATE_AND_START_THREAD
 
@@ -237,6 +238,14 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
                         signature);
             }
             break;
+        case TTaskType::GC_BINLOG:
+            if (task.__isset.gc_binlog_req) {
+                _gc_binlog_workers->submit_task(task);
+            } else {
+                ret_st = Status::InvalidArgument(
+                        "task(signature={}) has wrong request member = gc_binlog_req", signature);
+            }
+            break;
         default:
             ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type",
                                              signature, task_type);
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 3d98fd025d..daa1823b07 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -89,6 +89,7 @@ private:
 
     std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers;
     std::unique_ptr<TopicSubscriber> _topic_subscriber;
+    std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
 };
 
 } // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 7da010eca0..fd2c8a17dd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -213,6 +213,10 @@ void TaskWorkerPool::start() {
         _worker_count = 1;
         _cb = std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, this);
         break;
+    case TaskWorkerType::GC_BINLOG:
+        _worker_count = 1;
+        _cb = std::bind<void>(&TaskWorkerPool::_gc_binlog_worker_thread_callback, this);
+        break;
     default:
         // pass
         break;
@@ -1705,6 +1709,41 @@ void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& agent_task_req,
     finish_task_request->__set_task_status(status.to_thrift());
 }
 
+void TaskWorkerPool::_gc_binlog_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            _tasks.pop_front();
+        }
+
+        std::unordered_map<int64_t, int64_t> gc_tablet_infos;
+        if (!agent_task_req.__isset.gc_binlog_req) {
+            LOG(WARNING) << "gc binlog task is not valid";
+            return;
+        }
+        if (!agent_task_req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
+            LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
+            return;
+        }
+
+        auto& tablet_gc_binlog_infos = agent_task_req.gc_binlog_req.tablet_gc_binlog_infos;
+        for (auto& tablet_info : tablet_gc_binlog_infos) {
+            // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
+            gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
+        }
+
+        StorageEngine::instance()->gc_binlogs(gc_tablet_infos);
+    }
+}
+
 CloneTaskPool::CloneTaskPool(ExecEnv* env, ThreadModel thread_model)
         : TaskWorkerPool(TaskWorkerType::CLONE, env, *env->master_info(), thread_model) {
     _worker_count = config::clone_worker_count;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index a33704480d..598c77d3ef 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -80,6 +80,7 @@ public:
         PUSH_COOLDOWN_CONF,
         PUSH_STORAGE_POLICY,
         ALTER_INVERTED_INDEX,
+        GC_BINLOG,
     };
 
     enum ReportType { TASK, DISK, TABLET };
@@ -141,6 +142,8 @@ public:
             return "PUSH_STORAGE_POLICY";
         case ALTER_INVERTED_INDEX:
             return "ALTER_INVERTED_INDEX";
+        case GC_BINLOG:
+            return "GC_BINLOG";
         default:
             return "Unknown";
         }
@@ -197,6 +200,7 @@ protected:
     void _submit_table_compaction_worker_thread_callback();
     void _push_cooldown_conf_worker_thread_callback();
     void _push_storage_policy_worker_thread_callback();
+    void _gc_binlog_worker_thread_callback();
 
     void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t signature,
                        const TTaskType::type task_type, TFinishTaskRequest* finish_task_request);
diff --git a/be/src/olap/binlog.h b/be/src/olap/binlog.h
index 9ae243d8bb..b6b95a9530 100644
--- a/be/src/olap/binlog.h
+++ b/be/src/olap/binlog.h
@@ -64,8 +64,12 @@ inline auto make_binlog_filename_key(const TabletUid& tablet_uid, std::string_vi
     return fmt::format("{}meta_{}_{:0>20}_", kBinlogPrefix, tablet_uid.to_string(), version);
 }
 
-inline auto make_binlog_meta_key_prefix(int64_t tablet_id) {
-    return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_id);
+inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid) {
+    return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_uid.to_string());
+}
+
+inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid, int64_t version) {
+    return fmt::format("{}meta_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version);
 }
 
 inline bool starts_with_binlog_meta(std::string_view str) {
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp
index 29e5a4eae3..69f189aa78 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -146,6 +146,7 @@ Status RowsetMetaManager::_save_with_binlog(OlapMeta* meta, TabletUid tablet_uid
     binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id());
     binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments());
     binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time());
+    binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2());
     std::string binlog_meta_value;
     if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) {
         LOG(WARNING) << "serialize binlog pb failed. rowset id:" << binlog_meta_key;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 1d65da4d59..94f1bb5a87 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -97,13 +97,6 @@ using std::vector;
 using strings::Substitute;
 
 namespace doris {
-namespace {
-inline int64_t now_ms() {
-    auto duration = std::chrono::steady_clock::now().time_since_epoch();
-    return static_cast<int64_t>(
-            std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
-}
-} // namespace
 using namespace ErrorCode;
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
@@ -686,8 +679,6 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
         }
     }
 
-    // _gc_binlogs();
-
     if (usage != nullptr) {
         *usage = tmp_usage; // update usage
     }
@@ -776,136 +767,13 @@ void StorageEngine::_clean_unused_rowset_metas() {
     }
 }
 
-void StorageEngine::_gc_binlogs() {
-    LOG(INFO) << "start to gc binlogs";
-
-    auto data_dirs = get_stores();
-    struct tablet_info {
-        std::string tablet_path;
-        int64_t binlog_ttl_ms;
-    };
-    std::unordered_map<int64_t, tablet_info> tablets_info;
-
-    auto get_tablet_info = [&tablets_info, this](int64_t tablet_id) -> const tablet_info& {
-        if (auto iter = tablets_info.find(tablet_id); iter != tablets_info.end()) {
-            return iter->second;
-        }
-
-        auto tablet = tablet_manager()->get_tablet(tablet_id);
-        if (tablet == nullptr) {
-            LOG(WARNING) << "failed to find tablet " << tablet_id;
-            static tablet_info empty_tablet_info;
-            return empty_tablet_info;
-        }
-
-        auto tablet_path = tablet->tablet_path();
-        auto binlog_ttl_ms = tablet->binlog_ttl_ms();
-        tablets_info.emplace(tablet_id, tablet_info {tablet_path, binlog_ttl_ms});
-        return tablets_info[tablet_id];
-    };
-
-    for (auto data_dir : data_dirs) {
-        std::string prefix_key {kBinlogMetaPrefix};
-        OlapMeta* meta = data_dir->get_meta();
-        DCHECK(meta != nullptr);
-
-        auto now = now_ms();
-        int64_t last_tablet_id = 0;
-        std::vector<std::string> wait_for_deleted_binlog_keys;
-        std::vector<std::string> wait_for_deleted_binlog_files;
-        auto add_to_wait_for_deleted_binlog_keys =
-                [&wait_for_deleted_binlog_keys](std::string_view key) {
-                    wait_for_deleted_binlog_keys.emplace_back(key);
-                    wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key));
-                };
-
-        auto add_to_wait_for_deleted = [&add_to_wait_for_deleted_binlog_keys,
-                                        &wait_for_deleted_binlog_files](
-                                               std::string_view key, std::string_view tablet_path,
-                                               int64_t rowset_id, int64_t num_segments) {
-            add_to_wait_for_deleted_binlog_keys(key);
-            for (int64_t i = 0; i < num_segments; ++i) {
-                auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
-                wait_for_deleted_binlog_files.emplace_back(
-                        fmt::format("{}/_binlog/{}", tablet_path, segment_file));
-            }
-        };
-
-        auto check_binlog_ttl = [now, &get_tablet_info, &last_tablet_id,
-                                 &add_to_wait_for_deleted_binlog_keys, &add_to_wait_for_deleted](
-                                        const std::string& key,
-                                        const std::string& value) mutable -> bool {
-            LOG(INFO) << fmt::format("check binlog ttl, key:{}, value:{}", key, value);
-            if (!starts_with_binlog_meta(key)) {
-                last_tablet_id = -1;
-                return false;
-            }
-
-            BinlogMetaEntryPB binlog_meta_entry_pb;
-            if (!binlog_meta_entry_pb.ParseFromString(value)) {
-                LOG(WARNING) << "failed to parse binlog meta entry, key:" << key;
-                return true;
-            }
-
-            auto tablet_id = binlog_meta_entry_pb.tablet_id();
-            last_tablet_id = tablet_id;
-            const auto& tablet_info = get_tablet_info(tablet_id);
-            std::string_view tablet_path = tablet_info.tablet_path;
-            // tablet has been removed, removed all these binlog meta
-            if (tablet_path.empty()) {
-                add_to_wait_for_deleted_binlog_keys(key);
-                return true;
-            }
-
-            // check by ttl
-            auto rowset_id = binlog_meta_entry_pb.rowset_id();
-            auto binlog_ttl_ms = tablet_info.binlog_ttl_ms;
-            auto num_segments = binlog_meta_entry_pb.num_segments();
-            // binlog has been disabled, remove all
-            if (binlog_ttl_ms <= 0) {
-                add_to_wait_for_deleted(key, tablet_path, rowset_id, num_segments);
-                return true;
-            }
-            auto binlog_creation_time_ms = binlog_meta_entry_pb.creation_time();
-            if (now - binlog_creation_time_ms > binlog_ttl_ms) {
-                add_to_wait_for_deleted(key, tablet_path, rowset_id, num_segments);
-                return true;
-            }
-
-            // binlog not stale, skip
-            return false;
-        };
-
-        while (last_tablet_id >= 0) {
-            // every loop iterate one tablet
-            // get binlog meta by prefix
-            auto status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, check_binlog_ttl);
-            if (!status.ok()) {
-                LOG(WARNING) << "failed to iterate binlog meta, status:" << status;
-                break;
-            }
-
-            prefix_key = make_binlog_meta_key_prefix(last_tablet_id);
-        }
-
-        // first remove binlog files, if failed, just break, then retry next time
-        // this keep binlog meta in meta store, so that binlog can be removed next time
-        bool remove_binlog_files_failed = false;
-        for (auto& file : wait_for_deleted_binlog_files) {
-            if (unlink(file.c_str()) != 0) {
-                // file not exist, continue
-                if (errno == ENOENT) {
-                    continue;
-                }
+void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) {
+    for (auto [tablet_id, version] : gc_tablet_infos) {
+        LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id,
+                                 version);
 
-                remove_binlog_files_failed = true;
-                LOG(WARNING) << "failed to remove binlog file:" << file << ", errno:" << errno;
-                break;
-            }
-        }
-        if (remove_binlog_files_failed) {
-            meta->remove(META_COLUMN_FAMILY_INDEX, wait_for_deleted_binlog_keys);
-        }
+        TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+        tablet->gc_binlogs(version);
     }
 }
 
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index fdf49c3e14..15b1a98a78 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -218,6 +218,8 @@ public:
 
     Status process_index_change_task(const TAlterInvertedIndexReq& reqest);
 
+    void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
@@ -313,7 +315,8 @@ private:
                                   SegCompactionCandidatesSharedPtr segments);
 
     Status _handle_index_change(IndexBuilderSharedPtr index_builder);
-    void _gc_binlogs();
+
+    void _gc_binlogs(int64_t tablet_id, int64_t version);
 
 private:
     struct CompactionCandidate {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index e7216d9401..dea85713f0 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -74,6 +74,7 @@
 #include "io/fs/remote_file_system.h"
 #include "olap/base_compaction.h"
 #include "olap/base_tablet.h"
+#include "olap/binlog.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/cumulative_compaction_policy.h"
 #include "olap/delete_bitmap_calculator.h"
@@ -3445,6 +3446,88 @@ void Tablet::set_binlog_config(BinlogConfig binlog_config) {
     tablet_meta()->set_binlog_config(std::move(binlog_config));
 }
 
+void Tablet::gc_binlogs(int64_t version) {
+    auto meta = _data_dir->get_meta();
+    DCHECK(meta != nullptr);
+
+    const auto& tablet_uid = this->tablet_uid();
+    const auto tablet_id = this->tablet_id();
+    const auto& tablet_path = this->tablet_path();
+    std::string begin_key = make_binlog_meta_key_prefix(tablet_uid);
+    std::string end_key = make_binlog_meta_key_prefix(tablet_uid, version + 1);
+    LOG(INFO) << fmt::format("gc binlog meta, tablet_id:{}, begin_key:{}, end_key:{}", tablet_id,
+                             begin_key, end_key);
+
+    std::vector<std::string> wait_for_deleted_binlog_keys;
+    std::vector<std::string> wait_for_deleted_binlog_files;
+    auto add_to_wait_for_deleted = [&](std::string_view key, std::string_view rowset_id,
+                                       int64_t num_segments) {
+        // add binlog meta key and binlog data key
+        wait_for_deleted_binlog_keys.emplace_back(key);
+        wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key));
+
+        for (int64_t i = 0; i < num_segments; ++i) {
+            auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
+            wait_for_deleted_binlog_files.emplace_back(
+                    fmt::format("{}/_binlog/{}", tablet_path, segment_file));
+        }
+    };
+
+    auto check_binlog_ttl = [&](const std::string& key, const std::string& value) mutable -> bool {
+        if (key >= end_key) {
+            return false;
+        }
+
+        BinlogMetaEntryPB binlog_meta_entry_pb;
+        if (!binlog_meta_entry_pb.ParseFromString(value)) {
+            LOG(WARNING) << "failed to parse binlog meta entry, key:" << key;
+            return true;
+        }
+
+        auto num_segments = binlog_meta_entry_pb.num_segments();
+        std::string rowset_id;
+        if (binlog_meta_entry_pb.has_rowset_id_v2()) {
+            rowset_id = binlog_meta_entry_pb.rowset_id_v2();
+        } else {
+            // key is 'binglog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
+            auto pos = key.rfind("_");
+            if (pos == std::string::npos) {
+                LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
+                return false;
+            }
+            rowset_id = key.substr(pos + 1);
+        }
+        add_to_wait_for_deleted(key, rowset_id, num_segments);
+
+        return true;
+    };
+
+    auto status = meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, check_binlog_ttl);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to iterate binlog meta, status:" << status;
+        return;
+    }
+
+    // first remove binlog files, if failed, just break, then retry next time
+    // this keep binlog meta in meta store, so that binlog can be removed next time
+    bool remove_binlog_files_failed = false;
+    for (auto& file : wait_for_deleted_binlog_files) {
+        if (unlink(file.c_str()) != 0) {
+            // file not exist, continue
+            if (errno == ENOENT) {
+                continue;
+            }
+
+            remove_binlog_files_failed = true;
+            LOG(WARNING) << "failed to remove binlog file:" << file << ", errno:" << errno;
+            break;
+        }
+    }
+    if (!remove_binlog_files_failed) {
+        meta->remove(META_COLUMN_FAMILY_INDEX, wait_for_deleted_binlog_keys);
+    }
+}
+
 Status Tablet::calc_delete_bitmap_between_segments(
         RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
         DeleteBitmapPtr delete_bitmap) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 7ef38e1514..50eb3469f1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -504,6 +504,7 @@ public:
     std::string get_segment_filepath(std::string_view rowset_id,
                                      std::string_view segment_index) const;
     bool can_add_binlog(uint64_t total_binlog_size) const;
+    void gc_binlogs(int64_t version);
 
     inline void increase_io_error_times() { ++_io_error_times; }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
new file mode 100644
index 0000000000..a2e6afd1fc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.persist.BinlogGcInfo;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.BinlogGcTask;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public class BinlogGcer extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(BinlogGcer.class);
+    private static final long GC_DURATION_MS = 313 * 1000L; // 313s
+
+    // TODO(Drogon): use this to control gc frequency by real gc time waste sample
+    private long lastGcTime = 0L;
+
+    public BinlogGcer() {
+        super("binlog-gcer", GC_DURATION_MS);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        LOG.debug("start binlog syncer jobs.");
+        try {
+            List<BinlogTombstone> tombstones = Env.getCurrentEnv().getBinlogManager().gc();
+            if (tombstones != null && !tombstones.isEmpty()) {
+                LOG.info("tomebstones size: {}", tombstones.size());
+            } else {
+                LOG.info("no gc binlogg");
+                return;
+            }
+
+            try {
+                sendGcInfoToBe(tombstones);
+            } catch (Throwable e) {
+                // TODO(Drogon): retry
+                // if send gc info to be failed, next gc depend on gc duration
+                LOG.warn("Failed to send gc info to be", e);
+            }
+
+            for (BinlogTombstone tombstone : tombstones) {
+                tombstone.clearTableVersionMap();
+            }
+            BinlogGcInfo info = new BinlogGcInfo(tombstones);
+            Env.getCurrentEnv().getEditLog().logGcBinlog(info);
+        } catch (Throwable e) {
+            LOG.warn("Failed to process one round of BinlogGcer", e);
+        }
+    }
+
+    private void sendGcInfoToBe(List<BinlogTombstone> tombstones) {
+        if (tombstones == null || tombstones.isEmpty()) {
+            return;
+        }
+
+        Map<Long, BinlogGcTask> beBinlogGcTaskMap = Maps.newHashMap();
+        for (BinlogTombstone tombstone : tombstones) {
+            sendDbGcInfoToBe(beBinlogGcTaskMap, tombstone);
+        }
+
+        if (beBinlogGcTaskMap.isEmpty()) {
+            return;
+        }
+
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (BinlogGcTask task : beBinlogGcTaskMap.values()) {
+            batchTask.addTask(task);
+        }
+        AgentTaskExecutor.submit(batchTask);
+    }
+
+    private void sendDbGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, BinlogTombstone tombstone) {
+        long dbId = tombstone.getDbId();
+        Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            LOG.warn("db {} does not exist", dbId);
+            return;
+        }
+
+        Map<Long, UpsertRecord.TableRecord> tableVersionMap = tombstone.getTableVersionMap();
+        for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) {
+            long tableId = entry.getKey();
+
+            OlapTable table = null;
+            try {
+                Table tbl = db.getTableOrMetaException(tableId);
+                if (tbl == null) {
+                    LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+                    continue;
+                }
+                if (!(tbl instanceof OlapTable)) {
+                    LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
+                    continue;
+                }
+                table = (OlapTable) tbl;
+            } catch (Exception e) {
+                LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+                continue;
+            }
+
+            UpsertRecord.TableRecord record = entry.getValue();
+            sendTableGcInfoToBe(beBinlogGcTaskMap, table, record);
+        }
+    }
+
+    private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, OlapTable table,
+            UpsertRecord.TableRecord tableRecord) {
+        OlapTable olapTable = (OlapTable) table;
+
+        olapTable.readLock();
+        try {
+            for (UpsertRecord.TableRecord.PartitionRecord partitionRecord : tableRecord.getPartitionRecords()) {
+                long partitionId = partitionRecord.partitionId;
+                Partition partition = olapTable.getPartition(partitionId);
+                if (partition == null) {
+                    LOG.warn("fail to get partition. table: {}, partition id: {}", olapTable.getName(), partitionId);
+                    continue;
+                }
+
+                long version = partitionRecord.version;
+
+                List<MaterializedIndex> indexes = partition.getMaterializedIndices(IndexExtState.VISIBLE);
+                for (MaterializedIndex index : indexes) {
+                    List<Tablet> tablets = index.getTablets();
+                    for (Tablet tablet : tablets) {
+                        List<Replica> replicas = tablet.getReplicas();
+                        for (Replica replica : replicas) {
+                            long beId = replica.getBackendId();
+                            long signature = -1;
+                            BinlogGcTask binlogGcTask = null;
+                            if (beBinlogGcTaskMap.containsKey(beId)) {
+                                binlogGcTask = beBinlogGcTaskMap.get(beId);
+                            } else {
+                                binlogGcTask = new BinlogGcTask(beId, signature);
+                                beBinlogGcTaskMap.put(beId, binlogGcTask);
+                            }
+
+                            binlogGcTask.addTask(tablet.getId(), version);
+                        }
+                    }
+                }
+            }
+        } finally {
+            table.readUnlock();
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index c2e4800935..59ba596152 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -17,13 +17,17 @@
 
 package org.apache.doris.binlog;
 
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.persist.BinlogGcInfo;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -39,25 +43,22 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class BinlogManager {
-    private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
     private static final int BUFFER_SIZE = 16 * 1024;
 
+    private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
+
     private ReentrantReadWriteLock lock;
     private Map<Long, DBBinlog> dbBinlogMap;
-    // Pair(commitSeq, timestamp), used for gc
-    // need UpsertRecord to add timestamps for gc
-    private List<Pair<Long, Long>> timestamps;
 
     public BinlogManager() {
         lock = new ReentrantReadWriteLock();
         dbBinlogMap = Maps.newHashMap();
-        timestamps = new ArrayList<Pair<Long, Long>>();
     }
 
     private void addBinlog(TBinlog binlog) {
@@ -65,7 +66,15 @@ public class BinlogManager {
             return;
         }
 
+        // find db BinlogConfig
         long dbId = binlog.getDbId();
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            LOG.warn("db not found. dbId: {}", dbId);
+            return;
+        }
+        boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
+
         DBBinlog dbBinlog;
         lock.writeLock().lock();
         try {
@@ -74,14 +83,11 @@ public class BinlogManager {
                 dbBinlog = new DBBinlog(dbId);
                 dbBinlogMap.put(dbId, dbBinlog);
             }
-            if (binlog.getTimestamp() > 0) {
-                timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
-            }
         } finally {
             lock.writeLock().unlock();
         }
 
-        dbBinlog.addBinlog(binlog);
+        dbBinlog.addBinlog(binlog, dbBinlogEnable);
     }
 
     private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type,
@@ -140,35 +146,75 @@ public class BinlogManager {
         }
     }
 
-    // gc binlog, remove all binlog timestamp < minTimestamp
-    // TODO(Drogon): get minCommitSeq from timestamps
-    public void gc(long minTimestamp) {
+    public List<BinlogTombstone> gc() {
+        LOG.info("begin gc binlog");
+
         lock.writeLock().lock();
-        long minCommitSeq = -1;
+        Map<Long, DBBinlog> gcDbBinlogMap = null;
         try {
-            // user iterator to remove element in timestamps
-            for (Iterator<Pair<Long, Long>> iterator = timestamps.iterator(); iterator.hasNext();) {
-                Pair<Long, Long> pair = iterator.next();
-                // long commitSeq = pair.first;
-                long timestamp = pair.second;
+            gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap);
+        } finally {
+            lock.writeLock().unlock();
+        }
 
-                if (timestamp >= minTimestamp) {
-                    break;
-                }
+        if (gcDbBinlogMap.isEmpty()) {
+            LOG.info("gc binlog, dbBinlogMap is null");
+            return null;
+        }
 
-                iterator.remove();
+        List<BinlogTombstone> tombstones = Lists.newArrayList();
+        for (DBBinlog dbBinlog : gcDbBinlogMap.values()) {
+            List<BinlogTombstone> dbTombstones = dbBinlog.gc();
+            if (dbTombstones != null) {
+                tombstones.addAll(dbTombstones);
             }
+        }
+        return tombstones;
+    }
+
+    public void replayGc(BinlogGcInfo binlogGcInfo) {
+        lock.writeLock().lock();
+        Map<Long, DBBinlog> gcDbBinlogMap = null;
+        try {
+            gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap);
         } finally {
             lock.writeLock().unlock();
         }
 
-        if (minCommitSeq == -1) {
+        if (gcDbBinlogMap.isEmpty()) {
+            LOG.info("replay gc binlog, dbBinlogMap is null");
             return;
         }
 
+        for (BinlogTombstone tombstone : binlogGcInfo.getTombstones()) {
+            long dbId = tombstone.getDbId();
+            DBBinlog dbBinlog = gcDbBinlogMap.get(dbId);
+            dbBinlog.replayGc(tombstone);
+        }
+    }
+
+    public void removeDB(long dbId) {
         lock.writeLock().lock();
+        try {
+            dbBinlogMap.remove(dbId);
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
+    public void removeTable(long dbId, long tableId) {
+        lock.writeLock().lock();
+        try {
+            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
+            if (dbBinlog != null) {
+                dbBinlog.removeTable(tableId);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+
     private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException {
         TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE);
         TBinaryProtocol protocol = new TBinaryProtocol(buffer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
new file mode 100644
index 0000000000..e1c01a5cfa
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.List;
+import java.util.Map;
+
+public class BinlogTombstone {
+    @SerializedName(value = "dbBinlogTombstone")
+    private boolean dbBinlogTombstone;
+
+    @SerializedName(value = "dbId")
+    private long dbId;
+
+    @SerializedName(value = "commitSeq")
+    private long commitSeq;
+
+    @SerializedName(value = "tableIds")
+    private List<Long> tableIds;
+
+    @SerializedName(value = "tableVersionMap")
+    // this map keep last upsert record <tableId, UpsertRecord>
+    // only for master fe to send be gc task, not need persist
+    private Map<Long, UpsertRecord.TableRecord> tableVersionMap = Maps.newHashMap();
+
+    public BinlogTombstone(long dbId, List<Long> tableIds, long commitSeq) {
+        this.dbBinlogTombstone = true;
+        this.dbId = dbId;
+        this.tableIds = tableIds;
+        this.commitSeq = commitSeq;
+    }
+
+    public BinlogTombstone(long dbId, long commitSeq) {
+        this.dbBinlogTombstone = false;
+        this.dbId = dbId;
+        this.tableIds = null;
+        this.commitSeq = commitSeq;
+    }
+
+    public void addTableRecord(long tableId, UpsertRecord upsertRecord) {
+        Map<Long, UpsertRecord.TableRecord> tableRecords = upsertRecord.getTableRecords();
+        UpsertRecord.TableRecord tableRecord = tableRecords.get(tableId);
+        tableVersionMap.put(tableId, tableRecord);
+    }
+
+    public void addTableRecord(long tableId, UpsertRecord.TableRecord record) {
+        tableVersionMap.put(tableId, record);
+    }
+
+    public boolean isDbBinlogTomstone() {
+        return dbBinlogTombstone;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public List<Long> getTableIds() {
+        return tableIds;
+    }
+
+    public long getCommitSeq() {
+        return commitSeq;
+    }
+
+    public Map<Long, UpsertRecord.TableRecord> getTableVersionMap() {
+        return tableVersionMap;
+    }
+
+    // only call when log editlog
+    public void clearTableVersionMap() {
+        tableVersionMap.clear();
+    }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    @Override
+    public String toString() {
+        return toJson();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index c4312d2134..d6408b3076 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -17,18 +17,29 @@
 
 package org.apache.doris.binlog;
 
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class DBBinlog {
+    private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
+
     private long dbId;
     // guard for allBinlogs && tableBinlogMap
     private ReentrantReadWriteLock lock;
@@ -37,6 +48,10 @@ public class DBBinlog {
     // table binlogs
     private Map<Long, TableBinlog> tableBinlogMap;
 
+    // Pair(commitSeq, timestamp), used for gc
+    // need UpsertRecord to add timestamps for gc
+    private List<Pair<Long, Long>> timestamps;
+
     public DBBinlog(long dbId) {
         lock = new ReentrantReadWriteLock();
         this.dbId = dbId;
@@ -51,13 +66,19 @@ public class DBBinlog {
             }
         });
         tableBinlogMap = new HashMap<Long, TableBinlog>();
+        timestamps = new ArrayList<Pair<Long, Long>>();
     }
 
-    public void addBinlog(TBinlog binlog) {
+    public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) {
         List<Long> tableIds = binlog.getTableIds();
         lock.writeLock().lock();
         try {
+            if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
+                timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
+            }
+
             allBinlogs.add(binlog);
+
             if (tableIds == null) {
                 return;
             }
@@ -98,8 +119,189 @@ public class DBBinlog {
         }
     }
 
+    public List<BinlogTombstone> gc() {
+        // check db
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            LOG.error("db not found. dbId: {}", dbId);
+            return null;
+        }
+
+        boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
+        if (dbBinlogEnable) {
+            // db binlog is enable, only one binlogTombstones
+            long ttlSeconds = db.getBinlogConfig().getTtlSeconds();
+            long currentSeconds = System.currentTimeMillis() / 1000;
+            long expireSeconds = currentSeconds - ttlSeconds;
+            long expireMs = expireSeconds * 1000;
+
+            BinlogTombstone tombstone = dbBinlogEnableGc(expireMs);
+            List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>();
+            if (tombstone != null) {
+                tombstones.add(tombstone);
+            }
+            return tombstones;
+        } else {
+            return dbBinlogDisableGc(db);
+        }
+    }
+
+    private List<BinlogTombstone> dbBinlogDisableGc(Database db) {
+        List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>();
+        List<TableBinlog> tableBinlogs = null;
+
+        lock.writeLock().lock();
+        try {
+            tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        for (TableBinlog tableBinlog : tableBinlogs) {
+            BinlogTombstone tombstone = tableBinlog.gc(db);
+            if (tombstone != null) {
+                tombstones.add(tombstone);
+            }
+        }
+        return tombstones;
+    }
+
+    private BinlogTombstone dbBinlogEnableGc(long expireMs) {
+        // find commitSeq from timestamps, if commitSeq's timestamp is less than expireSeconds, then remove it
+        long largestExpiredCommitSeq = -1;
+        TBinlog tombstoneBinlog = null;
+        List<Long> tableIds = null;
+        List<TableBinlog> tableBinlogs = null;
+
+        lock.writeLock().lock();
+        try {
+            Iterator<Pair<Long, Long>> iterator = timestamps.iterator();
+            while (iterator.hasNext()) {
+                Pair<Long, Long> pair = iterator.next();
+                if (pair.second < expireMs) {
+                    largestExpiredCommitSeq = pair.first;
+                    iterator.remove();
+                } else {
+                    break;
+                }
+            }
+
+            Iterator<TBinlog> binlogIterator = allBinlogs.iterator();
+            while (binlogIterator.hasNext()) {
+                TBinlog binlog = binlogIterator.next();
+                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+                    tombstoneBinlog = binlog;
+                    binlogIterator.remove();
+                } else {
+                    break;
+                }
+            }
+
+            tableIds = new ArrayList<Long>(tableBinlogMap.keySet());
+            tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+        } finally {
+            lock.writeLock().unlock();
+        }
+        LOG.info("gc binlog. dbId: {}, expireMs: {}, largestExpiredCommitSeq: {}",
+                dbId, expireMs, largestExpiredCommitSeq);
+        if (tombstoneBinlog == null) {
+            return null;
+        }
+
+        BinlogTombstone tombstone = new BinlogTombstone(dbId, tableIds, tombstoneBinlog.getCommitSeq());
+        for (TableBinlog tableBinlog : tableBinlogs) {
+            BinlogTombstone binlogTombstone = tableBinlog.gc(largestExpiredCommitSeq);
+            if (binlogTombstone == null) {
+                continue;
+            }
+
+            Map<Long, UpsertRecord.TableRecord> tableVersionMap = binlogTombstone.getTableVersionMap();
+            if (tableVersionMap.size() > 1) {
+                LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap);
+            }
+            for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) {
+                long tableId = entry.getKey();
+                UpsertRecord.TableRecord record = entry.getValue();
+                tombstone.addTableRecord(tableId, record);
+            }
+        }
+
+        return tombstone;
+    }
+
+    public void replayGc(BinlogTombstone tombstone) {
+        if (tombstone.isDbBinlogTomstone()) {
+            dbBinlogEnableReplayGc(tombstone);
+        } else {
+            dbBinlogDisableReplayGc(tombstone);
+        }
+    }
+
+    public void dbBinlogEnableReplayGc(BinlogTombstone tombstone) {
+        long largestExpiredCommitSeq = tombstone.getCommitSeq();
+
+        lock.writeLock().lock();
+        try {
+            Iterator<Pair<Long, Long>> iterator = timestamps.iterator();
+            while (iterator.hasNext()) {
+                Pair<Long, Long> pair = iterator.next();
+                if (pair.first <= largestExpiredCommitSeq) {
+                    iterator.remove();
+                } else {
+                    break;
+                }
+            }
+
+            Iterator<TBinlog> binlogIterator = allBinlogs.iterator();
+            while (binlogIterator.hasNext()) {
+                TBinlog binlog = binlogIterator.next();
+                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+                    binlogIterator.remove();
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        dbBinlogDisableReplayGc(tombstone);
+    }
+
+    public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
+        List<TableBinlog> tableBinlogs = null;
+
+        lock.writeLock().lock();
+        try {
+            tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (tableBinlogs.isEmpty()) {
+            return;
+        }
+
+        Set<Long> tableIds = new HashSet<Long>(tombstone.getTableIds());
+        long largestExpiredCommitSeq = tombstone.getCommitSeq();
+        for (TableBinlog tableBinlog : tableBinlogs) {
+            if (tableIds.contains(tableBinlog.getTableId())) {
+                tableBinlog.replayGc(largestExpiredCommitSeq);
+            }
+        }
+    }
+
     // not thread safety, do this without lock
     public void getAllBinlogs(List<TBinlog> binlogs) {
         binlogs.addAll(allBinlogs);
     }
+
+    public void removeTable(long tableId) {
+        lock.writeLock().lock();
+        try {
+            tableBinlogMap.remove(tableId);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 8a3391847b..659f32e6f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -17,14 +17,24 @@
 
 package org.apache.doris.binlog;
 
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Pair;
 import org.apache.doris.thrift.TBinlog;
+import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Iterator;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class TableBinlog {
+    private static final Logger LOG = LogManager.getLogger(TableBinlog.class);
+
     private long tableId;
     private ReentrantReadWriteLock lock;
     private TreeSet<TBinlog> binlogs;
@@ -65,4 +75,108 @@ public class TableBinlog {
             lock.readLock().unlock();
         }
     }
+
+    // this method call when db binlog enable
+    public BinlogTombstone gc(long largestExpiredCommitSeq) {
+        TBinlog tombstoneUpsert = null;
+
+        lock.writeLock().lock();
+        try {
+            Iterator<TBinlog> iter = binlogs.iterator();
+            while (iter.hasNext()) {
+                TBinlog binlog = iter.next();
+                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+                    if (binlog.getType() == TBinlogType.UPSERT) {
+                        tombstoneUpsert = binlog;
+                    }
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (tombstoneUpsert == null) {
+            return null;
+        }
+
+        BinlogTombstone tombstone = new BinlogTombstone(-1, largestExpiredCommitSeq);
+        UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData());
+        tombstone.addTableRecord(tableId, upsertRecord);
+        return tombstone;
+    }
+
+    // this method call when db binlog disable
+    public BinlogTombstone gc(Database db) {
+        OlapTable table = null;
+        try {
+            Table tbl = db.getTableOrMetaException(tableId);
+            if (tbl == null) {
+                LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+                return null;
+            }
+            if (!(tbl instanceof OlapTable)) {
+                LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
+                return null;
+            }
+            table = (OlapTable) tbl;
+        } catch (Exception e) {
+            LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+            return null;
+        }
+
+        long dbId = db.getId();
+        long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
+        long currentSeconds = System.currentTimeMillis() / 1000;
+        long expireSeconds = currentSeconds - ttlSeconds;
+        long expireMs = expireSeconds * 1000;
+
+        TBinlog tombstoneUpsert = null;
+        long largestExpiredCommitSeq = 0;
+        lock.writeLock().lock();
+        try {
+            Iterator<TBinlog> iter = binlogs.iterator();
+            while (iter.hasNext()) {
+                TBinlog binlog = iter.next();
+                if (binlog.getTimestamp() <= expireMs) {
+                    if (binlog.getType() == TBinlogType.UPSERT) {
+                        tombstoneUpsert = binlog;
+                    }
+                    largestExpiredCommitSeq = binlog.getCommitSeq();
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        BinlogTombstone tombstone = new BinlogTombstone(dbId, largestExpiredCommitSeq);
+        if (tombstoneUpsert != null) {
+            UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData());
+            tombstone.addTableRecord(tableId, upsertRecord);
+        }
+
+        return tombstone;
+    }
+
+    public void replayGc(long largestExpiredCommitSeq) {
+        lock.writeLock().lock();
+        try {
+            Iterator<TBinlog> iter = binlogs.iterator();
+            while (iter.hasNext()) {
+                TBinlog binlog = iter.next();
+                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+                    iter.remove();
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
index e564250093..32052f798a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
@@ -31,8 +31,8 @@ import java.util.List;
 import java.util.Map;
 
 public class UpsertRecord {
-    class TableRecord {
-        class PartitionRecord {
+    public static class TableRecord {
+        public static class PartitionRecord {
             @SerializedName(value = "partitionId")
             public long partitionId;
             @SerializedName(value = "version")
@@ -52,6 +52,10 @@ public class UpsertRecord {
             partitionRecord.version = partitionCommitInfo.getVersion();
             partitionRecords.add(partitionRecord);
         }
+
+        public List<PartitionRecord> getPartitionRecords() {
+            return partitionRecords;
+        }
     }
 
     @SerializedName(value = "commitSeq")
@@ -105,10 +109,18 @@ public class UpsertRecord {
         return new ArrayList<>(tableRecords.keySet());
     }
 
+    public Map<Long, TableRecord> getTableRecords() {
+        return tableRecords;
+    }
+
     public String toJson() {
         return GsonUtils.GSON.toJson(this);
     }
 
+    public static UpsertRecord fromJson(String json) {
+        return GsonUtils.GSON.fromJson(json, UpsertRecord.class);
+    }
+
     @Override
     public String toString() {
         return toJson();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index b853232f42..dc822b1470 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -79,6 +79,7 @@ import org.apache.doris.analysis.TableRenameClause;
 import org.apache.doris.analysis.TruncateTableStmt;
 import org.apache.doris.analysis.UninstallPluginStmt;
 import org.apache.doris.backup.BackupHandler;
+import org.apache.doris.binlog.BinlogGcer;
 import org.apache.doris.binlog.BinlogManager;
 import org.apache.doris.blockrule.SqlBlockRuleMgr;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
@@ -172,6 +173,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.AlterMultiMaterializedView;
 import org.apache.doris.persist.BackendReplicasInfo;
 import org.apache.doris.persist.BackendTabletsInfo;
+import org.apache.doris.persist.BinlogGcInfo;
 import org.apache.doris.persist.CleanQueryStatsInfo;
 import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.persist.EditLog;
@@ -448,6 +450,8 @@ public class Env {
 
     private BinlogManager binlogManager;
 
+    private BinlogGcer binlogGcer;
+
     /**
      * TODO(tsy): to be removed after load refactor
      */
@@ -666,6 +670,7 @@ public class Env {
         this.loadManagerAdapter = new LoadManagerAdapter();
         this.hiveTransactionMgr = new HiveTransactionMgr();
         this.binlogManager = new BinlogManager();
+        this.binlogGcer = new BinlogGcer();
     }
 
     public static void destroyCheckpoint() {
@@ -1480,6 +1485,9 @@ public class Env {
         // start mtmv jobManager
         mtmvJobManager.start();
         getRefreshManager().start();
+
+        // binlog gcer
+        binlogGcer.start();
     }
 
     // start threads that should running on all FE
@@ -2805,6 +2813,10 @@ public class Env {
         getInternalCatalog().replayRecoverPartition(info);
     }
 
+    public void replayGcBinlog(BinlogGcInfo binlogGcInfo) {
+        binlogManager.replayGc(binlogGcInfo);
+    }
+
     public static void getDdlStmt(TableIf table, List<String> createTableStmt, List<String> addPartitionStmt,
                                   List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
                                   long specificVersion) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 1626bedbf2..39b804eb40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -73,6 +73,7 @@ import org.apache.doris.persist.BatchDropInfo;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.BatchRemoveTransactionsOperation;
 import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
+import org.apache.doris.persist.BinlogGcInfo;
 import org.apache.doris.persist.CleanLabelOperationLog;
 import org.apache.doris.persist.CleanQueryStatsInfo;
 import org.apache.doris.persist.ColocatePersistInfo;
@@ -823,6 +824,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_GC_BINLOG: {
+                data = BinlogGcInfo.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java
new file mode 100644
index 0000000000..55b13ad5ce
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+
+import org.apache.doris.binlog.BinlogTombstone;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class BinlogGcInfo implements Writable {
+    @SerializedName(value = "tombstones")
+    private List<BinlogTombstone> tombstones = null;
+
+    public BinlogGcInfo() {
+        // for persist
+        this.tombstones = null;
+    }
+
+    public BinlogGcInfo(List<BinlogTombstone> tombstones) {
+        this.tombstones = tombstones;
+    }
+
+    public List<BinlogTombstone> getTombstones() {
+        return tombstones;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static BinlogGcInfo read(DataInput in) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(in), BinlogGcInfo.class);
+    }
+
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+
+    @Override
+    public String toString() {
+        return toJson();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 541518148f..04c1b58d6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1031,6 +1031,12 @@ public class EditLog {
                             alterDatabasePropertyInfo.getProperties());
                     break;
                 }
+                case OperationType.OP_GC_BINLOG: {
+                    BinlogGcInfo binlogGcInfo = (BinlogGcInfo) journal.getData();
+                    LOG.info("replay gc binlog: {}", binlogGcInfo);
+                    env.replayGcBinlog(binlogGcInfo);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1797,4 +1803,8 @@ public class EditLog {
     public void logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
         logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
     }
+
+    public void logGcBinlog(BinlogGcInfo log) {
+        logEdit(OperationType.OP_GC_BINLOG, log);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 4893bbc1ec..674374f917 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -306,6 +306,8 @@ public class OperationType {
 
     public static final short OP_ALTER_DATABASE_PROPERTY = 434;
 
+    public static final short OP_GC_BINLOG = 435;
+
 
     /**
      * Get opcode name by op code.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2815c9229e..d296663745 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -110,6 +110,8 @@ import org.apache.doris.thrift.TGetBinlogRequest;
 import org.apache.doris.thrift.TGetBinlogResult;
 import org.apache.doris.thrift.TGetDbsParams;
 import org.apache.doris.thrift.TGetDbsResult;
+import org.apache.doris.thrift.TGetMasterTokenRequest;
+import org.apache.doris.thrift.TGetMasterTokenResult;
 import org.apache.doris.thrift.TGetQueryStatsRequest;
 import org.apache.doris.thrift.TGetSnapshotRequest;
 import org.apache.doris.thrift.TGetSnapshotResult;
@@ -940,6 +942,17 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
     }
 
+    private void checkPassword(String cluster, String user, String passwd, String clientIp)
+            throws AuthenticationException {
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+        final String fullUserName = ClusterNamespace.getFullName(cluster, user);
+        List<UserIdentity> currentUser = Lists.newArrayList();
+        Env.getCurrentEnv().getAuth().checkPlainPassword(fullUserName, clientIp, passwd, currentUser);
+        Preconditions.checkState(currentUser.size() == 1);
+    }
+
     @Override
     public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
         String clientAddr = getClientAddrAsString();
@@ -2296,7 +2309,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         Env env = Env.getCurrentEnv();
         String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
         Database db = env.getInternalCatalog().getDbNullable(fullDbName);
-        long dbId = db.getId();
         if (db == null) {
             String dbName = fullDbName;
             if (Strings.isNullOrEmpty(request.getCluster())) {
@@ -2318,6 +2330,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
 
         // step 6: get binlog
+        long dbId = db.getId();
         TGetBinlogResult result = new TGetBinlogResult();
         result.setStatus(new TStatus(TStatusCode.OK));
         long prevCommitSeq = request.getPrevCommitSeq();
@@ -2516,4 +2529,27 @@ public class FrontendServiceImpl implements FrontendService.Iface {
 
         return result;
     }
+
+    public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.debug("receive get master token request: {}", request);
+
+        TGetMasterTokenResult result = new TGetMasterTokenResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            checkPassword(request.getCluster(), request.getUser(), request.getPassword(), clientAddr);
+            result.setToken(Env.getCurrentEnv().getToken());
+        } catch (AuthenticationException e) {
+            LOG.warn("failed to get master token: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.NOT_AUTHORIZED);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+        }
+
+        return result;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 4f9d9ef2f6..3c12160610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -34,6 +34,7 @@ import org.apache.doris.thrift.TCompactionReq;
 import org.apache.doris.thrift.TCreateTabletReq;
 import org.apache.doris.thrift.TDownloadReq;
 import org.apache.doris.thrift.TDropTabletReq;
+import org.apache.doris.thrift.TGcBinlogReq;
 import org.apache.doris.thrift.TMoveDirReq;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPublishVersionRequest;
@@ -375,6 +376,15 @@ public class AgentBatchTask implements Runnable {
                 tAgentTaskRequest.setPushCooldownConf(request);
                 return tAgentTaskRequest;
             }
+            case GC_BINLOG: {
+                BinlogGcTask binlogGcTask = (BinlogGcTask) task;
+                TGcBinlogReq request = binlogGcTask.toThrift();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(request.toString());
+                }
+                tAgentTaskRequest.setGcBinlogReq(request);
+                return tAgentTaskRequest;
+            }
             default:
                 LOG.debug("could not find task type for task [{}]", task);
                 return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java
new file mode 100644
index 0000000000..15bd81473e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TGcBinlogReq;
+import org.apache.doris.thrift.TTabletGcBinlogInfo;
+import org.apache.doris.thrift.TTaskType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BinlogGcTask extends AgentTask {
+    private List<TTabletGcBinlogInfo> tabletGcBinlogInfos = new ArrayList<>();
+
+    public BinlogGcTask(long backendId, long signature) {
+        super(null, backendId, TTaskType.GC_BINLOG, -1, -1, -1, -1, -1, signature);
+    }
+
+    public void addTask(long tabletId, long version) {
+        TTabletGcBinlogInfo tabletGcBinlogInfo = new TTabletGcBinlogInfo();
+        tabletGcBinlogInfo.setTabletId(tabletId);
+        tabletGcBinlogInfo.setVersion(version);
+        tabletGcBinlogInfos.add(tabletGcBinlogInfo);
+    }
+
+    public TGcBinlogReq toThrift() {
+        TGcBinlogReq req = new TGcBinlogReq();
+        req.setTabletGcBinlogInfos(tabletGcBinlogInfos);
+        return req;
+    }
+}
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 9bf2abe5c8..6a982fb58c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -59,7 +59,7 @@ message KeyBoundsPB {
 }
 
 message RowsetMetaPB {
-    required int64 rowset_id = 1;
+    required int64 rowset_id = 1; // Deprecated. Use rowset_id_v2 instead.
     optional int64 partition_id = 2;
     optional int64 tablet_id = 3;
     // only for pending rowset
@@ -328,7 +328,8 @@ message DeleteBitmapPB {
 message BinlogMetaEntryPB {
     optional int64 version = 1;
     optional int64 tablet_id = 2;
-    optional int64 rowset_id = 3;
+    optional int64 rowset_id = 3; // Deprecated use rowset_id_v2 instead
     optional int64 num_segments = 4;
     optional int64 creation_time = 5;
+    optional string rowset_id_v2 = 6;
 }
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index b30c7ed26a..f92f873fc4 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -198,6 +198,15 @@ struct TAlterInvertedIndexReq {
     10: optional i64 expiration
 }
 
+struct TTabletGcBinlogInfo {
+    1: optional Types.TTabletId tablet_id
+    2: optional i64 version
+}
+
+struct TGcBinlogReq {
+    1: optional list<TTabletGcBinlogInfo> tablet_gc_binlog_infos
+}
+
 struct TStorageMigrationReqV2 {
     1: optional Types.TTabletId base_tablet_id
     2: optional Types.TTabletId new_tablet_id
@@ -449,6 +458,7 @@ struct TAgentTaskRequest {
     30: optional TPushCooldownConfReq push_cooldown_conf
     31: optional TPushStoragePolicyReq push_storage_policy_req
     32: optional TAlterInvertedIndexReq alter_inverted_index_req
+    33: optional TGcBinlogReq gc_binlog_req
 }
 
 struct TAgentResult {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index e4bff3a4fc..28b77a3f20 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1029,6 +1029,17 @@ struct TRestoreSnapshotResult {
     1: optional Status.TStatus status
 }
 
+struct TGetMasterTokenRequest {
+    1: optional string cluster
+    2: optional string user
+    3: optional string password
+}
+
+struct TGetMasterTokenResult {
+    1: optional Status.TStatus status
+    2: optional string token
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1089,4 +1100,6 @@ service FrontendService {
     TQueryStatsResult getQueryStats(1: TGetQueryStatsRequest request)
 
     TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request)
+
+    TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request)
 }
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 91074669e7..4949ca55f2 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -218,7 +218,8 @@ enum TTaskType {
     NOTIFY_UPDATE_STORAGE_POLICY, // deprecated
     PUSH_COOLDOWN_CONF,
     PUSH_STORAGE_POLICY,
-    ALTER_INVERTED_INDEX
+    ALTER_INVERTED_INDEX,
+    GC_BINLOG
 }
 
 enum TStmtType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org