You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/16 03:25:17 UTC
[doris] branch master updated: [Feature](binlog) Add binlog gc && Auth master_token (#20854)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9d41edd9eb [Feature](binlog) Add binlog gc && Auth master_token (#20854)
9d41edd9eb is described below
commit 9d41edd9eb713b3b502b4001204bd45e2718cdcb
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Fri Jun 16 11:25:11 2023 +0800
[Feature](binlog) Add binlog gc && Auth master_token (#20854)
Signed-off-by: Jack Drogon <ja...@gmail.com>
---
be/src/agent/agent_server.cpp | 9 +
be/src/agent/agent_server.h | 1 +
be/src/agent/task_worker_pool.cpp | 39 ++++
be/src/agent/task_worker_pool.h | 4 +
be/src/olap/binlog.h | 8 +-
be/src/olap/rowset/rowset_meta_manager.cpp | 1 +
be/src/olap/storage_engine.cpp | 144 +--------------
be/src/olap/storage_engine.h | 5 +-
be/src/olap/tablet.cpp | 83 +++++++++
be/src/olap/tablet.h | 1 +
.../java/org/apache/doris/binlog/BinlogGcer.java | 179 ++++++++++++++++++
.../org/apache/doris/binlog/BinlogManager.java | 94 +++++++---
.../org/apache/doris/binlog/BinlogTombstone.java | 103 +++++++++++
.../java/org/apache/doris/binlog/DBBinlog.java | 204 ++++++++++++++++++++-
.../java/org/apache/doris/binlog/TableBinlog.java | 114 ++++++++++++
.../java/org/apache/doris/binlog/UpsertRecord.java | 16 +-
.../main/java/org/apache/doris/catalog/Env.java | 12 ++
.../org/apache/doris/journal/JournalEntity.java | 6 +
.../org/apache/doris/persist/BinlogGcInfo.java | 67 +++++++
.../java/org/apache/doris/persist/EditLog.java | 10 +
.../org/apache/doris/persist/OperationType.java | 2 +
.../apache/doris/service/FrontendServiceImpl.java | 38 +++-
.../java/org/apache/doris/task/AgentBatchTask.java | 10 +
.../java/org/apache/doris/task/BinlogGcTask.java | 46 +++++
gensrc/proto/olap_file.proto | 5 +-
gensrc/thrift/AgentService.thrift | 10 +
gensrc/thrift/FrontendService.thrift | 13 ++
gensrc/thrift/Types.thrift | 3 +-
28 files changed, 1055 insertions(+), 172 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index a7f2f9aa09..4660902333 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -128,6 +128,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
CREATE_AND_START_THREAD(REPORT_OLAP_TABLE, _report_tablet_workers);
CREATE_AND_START_POOL(SUBMIT_TABLE_COMPACTION, _submit_table_compaction_workers);
CREATE_AND_START_POOL(PUSH_STORAGE_POLICY, _push_storage_policy_workers);
+ CREATE_AND_START_THREAD(GC_BINLOG, _gc_binlog_workers);
#undef CREATE_AND_START_POOL
#undef CREATE_AND_START_THREAD
@@ -237,6 +238,14 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
signature);
}
break;
+ case TTaskType::GC_BINLOG:
+ if (task.__isset.gc_binlog_req) {
+ _gc_binlog_workers->submit_task(task);
+ } else {
+ ret_st = Status::InvalidArgument(
+ "task(signature={}) has wrong request member = gc_binlog_req", signature);
+ }
+ break;
default:
ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type",
signature, task_type);
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 3d98fd025d..daa1823b07 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -89,6 +89,7 @@ private:
std::unique_ptr<TaskWorkerPool> _push_storage_policy_workers;
std::unique_ptr<TopicSubscriber> _topic_subscriber;
+ std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
};
} // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 7da010eca0..fd2c8a17dd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -213,6 +213,10 @@ void TaskWorkerPool::start() {
_worker_count = 1;
_cb = std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, this);
break;
+ case TaskWorkerType::GC_BINLOG:
+ _worker_count = 1;
+ _cb = std::bind<void>(&TaskWorkerPool::_gc_binlog_worker_thread_callback, this);
+ break;
default:
// pass
break;
@@ -1705,6 +1709,41 @@ void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& agent_task_req,
finish_task_request->__set_task_status(status.to_thrift());
}
+void TaskWorkerPool::_gc_binlog_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+
+ std::unordered_map<int64_t, int64_t> gc_tablet_infos;
+ if (!agent_task_req.__isset.gc_binlog_req) {
+ LOG(WARNING) << "gc binlog task is not valid";
+ return;
+ }
+ if (!agent_task_req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
+ LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
+ return;
+ }
+
+ auto& tablet_gc_binlog_infos = agent_task_req.gc_binlog_req.tablet_gc_binlog_infos;
+ for (auto& tablet_info : tablet_gc_binlog_infos) {
+ // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
+ gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
+ }
+
+ StorageEngine::instance()->gc_binlogs(gc_tablet_infos);
+ }
+}
+
CloneTaskPool::CloneTaskPool(ExecEnv* env, ThreadModel thread_model)
: TaskWorkerPool(TaskWorkerType::CLONE, env, *env->master_info(), thread_model) {
_worker_count = config::clone_worker_count;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index a33704480d..598c77d3ef 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -80,6 +80,7 @@ public:
PUSH_COOLDOWN_CONF,
PUSH_STORAGE_POLICY,
ALTER_INVERTED_INDEX,
+ GC_BINLOG,
};
enum ReportType { TASK, DISK, TABLET };
@@ -141,6 +142,8 @@ public:
return "PUSH_STORAGE_POLICY";
case ALTER_INVERTED_INDEX:
return "ALTER_INVERTED_INDEX";
+ case GC_BINLOG:
+ return "GC_BINLOG";
default:
return "Unknown";
}
@@ -197,6 +200,7 @@ protected:
void _submit_table_compaction_worker_thread_callback();
void _push_cooldown_conf_worker_thread_callback();
void _push_storage_policy_worker_thread_callback();
+ void _gc_binlog_worker_thread_callback();
void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t signature,
const TTaskType::type task_type, TFinishTaskRequest* finish_task_request);
diff --git a/be/src/olap/binlog.h b/be/src/olap/binlog.h
index 9ae243d8bb..b6b95a9530 100644
--- a/be/src/olap/binlog.h
+++ b/be/src/olap/binlog.h
@@ -64,8 +64,12 @@ inline auto make_binlog_filename_key(const TabletUid& tablet_uid, std::string_vi
return fmt::format("{}meta_{}_{:0>20}_", kBinlogPrefix, tablet_uid.to_string(), version);
}
-inline auto make_binlog_meta_key_prefix(int64_t tablet_id) {
- return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_id);
+inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid) {
+ return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_uid.to_string());
+}
+
+inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid, int64_t version) {
+ return fmt::format("{}meta_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version);
}
inline bool starts_with_binlog_meta(std::string_view str) {
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp
index 29e5a4eae3..69f189aa78 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -146,6 +146,7 @@ Status RowsetMetaManager::_save_with_binlog(OlapMeta* meta, TabletUid tablet_uid
binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id());
binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments());
binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time());
+ binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2());
std::string binlog_meta_value;
if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) {
LOG(WARNING) << "serialize binlog pb failed. rowset id:" << binlog_meta_key;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 1d65da4d59..94f1bb5a87 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -97,13 +97,6 @@ using std::vector;
using strings::Substitute;
namespace doris {
-namespace {
-inline int64_t now_ms() {
- auto duration = std::chrono::steady_clock::now().time_since_epoch();
- return static_cast<int64_t>(
- std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
-}
-} // namespace
using namespace ErrorCode;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
@@ -686,8 +679,6 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
}
}
- // _gc_binlogs();
-
if (usage != nullptr) {
*usage = tmp_usage; // update usage
}
@@ -776,136 +767,13 @@ void StorageEngine::_clean_unused_rowset_metas() {
}
}
-void StorageEngine::_gc_binlogs() {
- LOG(INFO) << "start to gc binlogs";
-
- auto data_dirs = get_stores();
- struct tablet_info {
- std::string tablet_path;
- int64_t binlog_ttl_ms;
- };
- std::unordered_map<int64_t, tablet_info> tablets_info;
-
- auto get_tablet_info = [&tablets_info, this](int64_t tablet_id) -> const tablet_info& {
- if (auto iter = tablets_info.find(tablet_id); iter != tablets_info.end()) {
- return iter->second;
- }
-
- auto tablet = tablet_manager()->get_tablet(tablet_id);
- if (tablet == nullptr) {
- LOG(WARNING) << "failed to find tablet " << tablet_id;
- static tablet_info empty_tablet_info;
- return empty_tablet_info;
- }
-
- auto tablet_path = tablet->tablet_path();
- auto binlog_ttl_ms = tablet->binlog_ttl_ms();
- tablets_info.emplace(tablet_id, tablet_info {tablet_path, binlog_ttl_ms});
- return tablets_info[tablet_id];
- };
-
- for (auto data_dir : data_dirs) {
- std::string prefix_key {kBinlogMetaPrefix};
- OlapMeta* meta = data_dir->get_meta();
- DCHECK(meta != nullptr);
-
- auto now = now_ms();
- int64_t last_tablet_id = 0;
- std::vector<std::string> wait_for_deleted_binlog_keys;
- std::vector<std::string> wait_for_deleted_binlog_files;
- auto add_to_wait_for_deleted_binlog_keys =
- [&wait_for_deleted_binlog_keys](std::string_view key) {
- wait_for_deleted_binlog_keys.emplace_back(key);
- wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key));
- };
-
- auto add_to_wait_for_deleted = [&add_to_wait_for_deleted_binlog_keys,
- &wait_for_deleted_binlog_files](
- std::string_view key, std::string_view tablet_path,
- int64_t rowset_id, int64_t num_segments) {
- add_to_wait_for_deleted_binlog_keys(key);
- for (int64_t i = 0; i < num_segments; ++i) {
- auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
- wait_for_deleted_binlog_files.emplace_back(
- fmt::format("{}/_binlog/{}", tablet_path, segment_file));
- }
- };
-
- auto check_binlog_ttl = [now, &get_tablet_info, &last_tablet_id,
- &add_to_wait_for_deleted_binlog_keys, &add_to_wait_for_deleted](
- const std::string& key,
- const std::string& value) mutable -> bool {
- LOG(INFO) << fmt::format("check binlog ttl, key:{}, value:{}", key, value);
- if (!starts_with_binlog_meta(key)) {
- last_tablet_id = -1;
- return false;
- }
-
- BinlogMetaEntryPB binlog_meta_entry_pb;
- if (!binlog_meta_entry_pb.ParseFromString(value)) {
- LOG(WARNING) << "failed to parse binlog meta entry, key:" << key;
- return true;
- }
-
- auto tablet_id = binlog_meta_entry_pb.tablet_id();
- last_tablet_id = tablet_id;
- const auto& tablet_info = get_tablet_info(tablet_id);
- std::string_view tablet_path = tablet_info.tablet_path;
- // tablet has been removed, removed all these binlog meta
- if (tablet_path.empty()) {
- add_to_wait_for_deleted_binlog_keys(key);
- return true;
- }
-
- // check by ttl
- auto rowset_id = binlog_meta_entry_pb.rowset_id();
- auto binlog_ttl_ms = tablet_info.binlog_ttl_ms;
- auto num_segments = binlog_meta_entry_pb.num_segments();
- // binlog has been disabled, remove all
- if (binlog_ttl_ms <= 0) {
- add_to_wait_for_deleted(key, tablet_path, rowset_id, num_segments);
- return true;
- }
- auto binlog_creation_time_ms = binlog_meta_entry_pb.creation_time();
- if (now - binlog_creation_time_ms > binlog_ttl_ms) {
- add_to_wait_for_deleted(key, tablet_path, rowset_id, num_segments);
- return true;
- }
-
- // binlog not stale, skip
- return false;
- };
-
- while (last_tablet_id >= 0) {
- // every loop iterate one tablet
- // get binlog meta by prefix
- auto status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, check_binlog_ttl);
- if (!status.ok()) {
- LOG(WARNING) << "failed to iterate binlog meta, status:" << status;
- break;
- }
-
- prefix_key = make_binlog_meta_key_prefix(last_tablet_id);
- }
-
- // first remove binlog files, if failed, just break, then retry next time
- // this keep binlog meta in meta store, so that binlog can be removed next time
- bool remove_binlog_files_failed = false;
- for (auto& file : wait_for_deleted_binlog_files) {
- if (unlink(file.c_str()) != 0) {
- // file not exist, continue
- if (errno == ENOENT) {
- continue;
- }
+void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos) {
+ for (auto [tablet_id, version] : gc_tablet_infos) {
+ LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id,
+ version);
- remove_binlog_files_failed = true;
- LOG(WARNING) << "failed to remove binlog file:" << file << ", errno:" << errno;
- break;
- }
- }
- if (remove_binlog_files_failed) {
- meta->remove(META_COLUMN_FAMILY_INDEX, wait_for_deleted_binlog_keys);
- }
+ TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+ tablet->gc_binlogs(version);
}
}
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index fdf49c3e14..15b1a98a78 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -218,6 +218,8 @@ public:
Status process_index_change_task(const TAlterInvertedIndexReq& reqest);
+ void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
+
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
@@ -313,7 +315,8 @@ private:
SegCompactionCandidatesSharedPtr segments);
Status _handle_index_change(IndexBuilderSharedPtr index_builder);
- void _gc_binlogs();
+
+ void _gc_binlogs(int64_t tablet_id, int64_t version);
private:
struct CompactionCandidate {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index e7216d9401..dea85713f0 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -74,6 +74,7 @@
#include "io/fs/remote_file_system.h"
#include "olap/base_compaction.h"
#include "olap/base_tablet.h"
+#include "olap/binlog.h"
#include "olap/cumulative_compaction.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/delete_bitmap_calculator.h"
@@ -3445,6 +3446,88 @@ void Tablet::set_binlog_config(BinlogConfig binlog_config) {
tablet_meta()->set_binlog_config(std::move(binlog_config));
}
+void Tablet::gc_binlogs(int64_t version) {
+ auto meta = _data_dir->get_meta();
+ DCHECK(meta != nullptr);
+
+ const auto& tablet_uid = this->tablet_uid();
+ const auto tablet_id = this->tablet_id();
+ const auto& tablet_path = this->tablet_path();
+ std::string begin_key = make_binlog_meta_key_prefix(tablet_uid);
+ std::string end_key = make_binlog_meta_key_prefix(tablet_uid, version + 1);
+ LOG(INFO) << fmt::format("gc binlog meta, tablet_id:{}, begin_key:{}, end_key:{}", tablet_id,
+ begin_key, end_key);
+
+ std::vector<std::string> wait_for_deleted_binlog_keys;
+ std::vector<std::string> wait_for_deleted_binlog_files;
+ auto add_to_wait_for_deleted = [&](std::string_view key, std::string_view rowset_id,
+ int64_t num_segments) {
+ // add binlog meta key and binlog data key
+ wait_for_deleted_binlog_keys.emplace_back(key);
+ wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key));
+
+ for (int64_t i = 0; i < num_segments; ++i) {
+ auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
+ wait_for_deleted_binlog_files.emplace_back(
+ fmt::format("{}/_binlog/{}", tablet_path, segment_file));
+ }
+ };
+
+ auto check_binlog_ttl = [&](const std::string& key, const std::string& value) mutable -> bool {
+ if (key >= end_key) {
+ return false;
+ }
+
+ BinlogMetaEntryPB binlog_meta_entry_pb;
+ if (!binlog_meta_entry_pb.ParseFromString(value)) {
+ LOG(WARNING) << "failed to parse binlog meta entry, key:" << key;
+ return true;
+ }
+
+ auto num_segments = binlog_meta_entry_pb.num_segments();
+ std::string rowset_id;
+ if (binlog_meta_entry_pb.has_rowset_id_v2()) {
+ rowset_id = binlog_meta_entry_pb.rowset_id_v2();
+ } else {
+ // key is 'binglog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
+ auto pos = key.rfind("_");
+ if (pos == std::string::npos) {
+ LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
+ return false;
+ }
+ rowset_id = key.substr(pos + 1);
+ }
+ add_to_wait_for_deleted(key, rowset_id, num_segments);
+
+ return true;
+ };
+
+ auto status = meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, check_binlog_ttl);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to iterate binlog meta, status:" << status;
+ return;
+ }
+
+ // first remove binlog files, if failed, just break, then retry next time
+ // this keep binlog meta in meta store, so that binlog can be removed next time
+ bool remove_binlog_files_failed = false;
+ for (auto& file : wait_for_deleted_binlog_files) {
+ if (unlink(file.c_str()) != 0) {
+ // file not exist, continue
+ if (errno == ENOENT) {
+ continue;
+ }
+
+ remove_binlog_files_failed = true;
+ LOG(WARNING) << "failed to remove binlog file:" << file << ", errno:" << errno;
+ break;
+ }
+ }
+ if (!remove_binlog_files_failed) {
+ meta->remove(META_COLUMN_FAMILY_INDEX, wait_for_deleted_binlog_keys);
+ }
+}
+
Status Tablet::calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 7ef38e1514..50eb3469f1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -504,6 +504,7 @@ public:
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
bool can_add_binlog(uint64_t total_binlog_size) const;
+ void gc_binlogs(int64_t version);
inline void increase_io_error_times() { ++_io_error_times; }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
new file mode 100644
index 0000000000..a2e6afd1fc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.persist.BinlogGcInfo;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.BinlogGcTask;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public class BinlogGcer extends MasterDaemon {
+ private static final Logger LOG = LogManager.getLogger(BinlogGcer.class);
+ private static final long GC_DURATION_MS = 313 * 1000L; // 313s
+
+ // TODO(Drogon): use this to control gc frequency by real gc time waste sample
+ private long lastGcTime = 0L;
+
+ public BinlogGcer() {
+ super("binlog-gcer", GC_DURATION_MS);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ LOG.debug("start binlog syncer jobs.");
+ try {
+ List<BinlogTombstone> tombstones = Env.getCurrentEnv().getBinlogManager().gc();
+ if (tombstones != null && !tombstones.isEmpty()) {
+ LOG.info("tomebstones size: {}", tombstones.size());
+ } else {
+ LOG.info("no gc binlogg");
+ return;
+ }
+
+ try {
+ sendGcInfoToBe(tombstones);
+ } catch (Throwable e) {
+ // TODO(Drogon): retry
+ // if send gc info to be failed, next gc depend on gc duration
+ LOG.warn("Failed to send gc info to be", e);
+ }
+
+ for (BinlogTombstone tombstone : tombstones) {
+ tombstone.clearTableVersionMap();
+ }
+ BinlogGcInfo info = new BinlogGcInfo(tombstones);
+ Env.getCurrentEnv().getEditLog().logGcBinlog(info);
+ } catch (Throwable e) {
+ LOG.warn("Failed to process one round of BinlogGcer", e);
+ }
+ }
+
+ private void sendGcInfoToBe(List<BinlogTombstone> tombstones) {
+ if (tombstones == null || tombstones.isEmpty()) {
+ return;
+ }
+
+ Map<Long, BinlogGcTask> beBinlogGcTaskMap = Maps.newHashMap();
+ for (BinlogTombstone tombstone : tombstones) {
+ sendDbGcInfoToBe(beBinlogGcTaskMap, tombstone);
+ }
+
+ if (beBinlogGcTaskMap.isEmpty()) {
+ return;
+ }
+
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (BinlogGcTask task : beBinlogGcTaskMap.values()) {
+ batchTask.addTask(task);
+ }
+ AgentTaskExecutor.submit(batchTask);
+ }
+
+ private void sendDbGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, BinlogTombstone tombstone) {
+ long dbId = tombstone.getDbId();
+ Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ LOG.warn("db {} does not exist", dbId);
+ return;
+ }
+
+ Map<Long, UpsertRecord.TableRecord> tableVersionMap = tombstone.getTableVersionMap();
+ for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) {
+ long tableId = entry.getKey();
+
+ OlapTable table = null;
+ try {
+ Table tbl = db.getTableOrMetaException(tableId);
+ if (tbl == null) {
+ LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+ continue;
+ }
+ if (!(tbl instanceof OlapTable)) {
+ LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
+ continue;
+ }
+ table = (OlapTable) tbl;
+ } catch (Exception e) {
+ LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+ continue;
+ }
+
+ UpsertRecord.TableRecord record = entry.getValue();
+ sendTableGcInfoToBe(beBinlogGcTaskMap, table, record);
+ }
+ }
+
+ private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, OlapTable table,
+ UpsertRecord.TableRecord tableRecord) {
+ OlapTable olapTable = (OlapTable) table;
+
+ olapTable.readLock();
+ try {
+ for (UpsertRecord.TableRecord.PartitionRecord partitionRecord : tableRecord.getPartitionRecords()) {
+ long partitionId = partitionRecord.partitionId;
+ Partition partition = olapTable.getPartition(partitionId);
+ if (partition == null) {
+ LOG.warn("fail to get partition. table: {}, partition id: {}", olapTable.getName(), partitionId);
+ continue;
+ }
+
+ long version = partitionRecord.version;
+
+ List<MaterializedIndex> indexes = partition.getMaterializedIndices(IndexExtState.VISIBLE);
+ for (MaterializedIndex index : indexes) {
+ List<Tablet> tablets = index.getTablets();
+ for (Tablet tablet : tablets) {
+ List<Replica> replicas = tablet.getReplicas();
+ for (Replica replica : replicas) {
+ long beId = replica.getBackendId();
+ long signature = -1;
+ BinlogGcTask binlogGcTask = null;
+ if (beBinlogGcTaskMap.containsKey(beId)) {
+ binlogGcTask = beBinlogGcTaskMap.get(beId);
+ } else {
+ binlogGcTask = new BinlogGcTask(beId, signature);
+ beBinlogGcTaskMap.put(beId, binlogGcTask);
+ }
+
+ binlogGcTask.addTask(tablet.getId(), version);
+ }
+ }
+ }
+ }
+ } finally {
+ table.readUnlock();
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index c2e4800935..59ba596152 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -17,13 +17,17 @@
package org.apache.doris.binlog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
+import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -39,25 +43,22 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class BinlogManager {
- private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
private static final int BUFFER_SIZE = 16 * 1024;
+ private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
+
private ReentrantReadWriteLock lock;
private Map<Long, DBBinlog> dbBinlogMap;
- // Pair(commitSeq, timestamp), used for gc
- // need UpsertRecord to add timestamps for gc
- private List<Pair<Long, Long>> timestamps;
public BinlogManager() {
lock = new ReentrantReadWriteLock();
dbBinlogMap = Maps.newHashMap();
- timestamps = new ArrayList<Pair<Long, Long>>();
}
private void addBinlog(TBinlog binlog) {
@@ -65,7 +66,15 @@ public class BinlogManager {
return;
}
+ // find db BinlogConfig
long dbId = binlog.getDbId();
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ LOG.warn("db not found. dbId: {}", dbId);
+ return;
+ }
+ boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
+
DBBinlog dbBinlog;
lock.writeLock().lock();
try {
@@ -74,14 +83,11 @@ public class BinlogManager {
dbBinlog = new DBBinlog(dbId);
dbBinlogMap.put(dbId, dbBinlog);
}
- if (binlog.getTimestamp() > 0) {
- timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
- }
} finally {
lock.writeLock().unlock();
}
- dbBinlog.addBinlog(binlog);
+ dbBinlog.addBinlog(binlog, dbBinlogEnable);
}
private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type,
@@ -140,35 +146,75 @@ public class BinlogManager {
}
}
- // gc binlog, remove all binlog timestamp < minTimestamp
- // TODO(Drogon): get minCommitSeq from timestamps
- public void gc(long minTimestamp) {
+ public List<BinlogTombstone> gc() {
+ LOG.info("begin gc binlog");
+
lock.writeLock().lock();
- long minCommitSeq = -1;
+ Map<Long, DBBinlog> gcDbBinlogMap = null;
try {
- // user iterator to remove element in timestamps
- for (Iterator<Pair<Long, Long>> iterator = timestamps.iterator(); iterator.hasNext();) {
- Pair<Long, Long> pair = iterator.next();
- // long commitSeq = pair.first;
- long timestamp = pair.second;
+ gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap);
+ } finally {
+ lock.writeLock().unlock();
+ }
- if (timestamp >= minTimestamp) {
- break;
- }
+ if (gcDbBinlogMap.isEmpty()) {
+ LOG.info("gc binlog, dbBinlogMap is null");
+ return null;
+ }
- iterator.remove();
+ List<BinlogTombstone> tombstones = Lists.newArrayList();
+ for (DBBinlog dbBinlog : gcDbBinlogMap.values()) {
+ List<BinlogTombstone> dbTombstones = dbBinlog.gc();
+ if (dbTombstones != null) {
+ tombstones.addAll(dbTombstones);
}
+ }
+ return tombstones;
+ }
+
+ public void replayGc(BinlogGcInfo binlogGcInfo) {
+ lock.writeLock().lock();
+ Map<Long, DBBinlog> gcDbBinlogMap = null;
+ try {
+ gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap);
} finally {
lock.writeLock().unlock();
}
- if (minCommitSeq == -1) {
+ if (gcDbBinlogMap.isEmpty()) {
+ LOG.info("replay gc binlog, dbBinlogMap is null");
return;
}
+ for (BinlogTombstone tombstone : binlogGcInfo.getTombstones()) {
+ long dbId = tombstone.getDbId();
+ DBBinlog dbBinlog = gcDbBinlogMap.get(dbId);
+ dbBinlog.replayGc(tombstone);
+ }
+ }
+
+ public void removeDB(long dbId) {
lock.writeLock().lock();
+ try {
+ dbBinlogMap.remove(dbId);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
+ public void removeTable(long dbId, long tableId) {
+ lock.writeLock().lock();
+ try {
+ DBBinlog dbBinlog = dbBinlogMap.get(dbId);
+ if (dbBinlog != null) {
+ dbBinlog.removeTable(tableId);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+
private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException {
TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE);
TBinaryProtocol protocol = new TBinaryProtocol(buffer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
new file mode 100644
index 0000000000..e1c01a5cfa
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.List;
+import java.util.Map;
+
+public class BinlogTombstone {
+ @SerializedName(value = "dbBinlogTombstone")
+ private boolean dbBinlogTombstone;
+
+ @SerializedName(value = "dbId")
+ private long dbId;
+
+ @SerializedName(value = "commitSeq")
+ private long commitSeq;
+
+ @SerializedName(value = "tableIds")
+ private List<Long> tableIds;
+
+ @SerializedName(value = "tableVersionMap")
+ // this map keep last upsert record <tableId, UpsertRecord>
+ // only for master fe to send be gc task, not need persist
+ private Map<Long, UpsertRecord.TableRecord> tableVersionMap = Maps.newHashMap();
+
+ public BinlogTombstone(long dbId, List<Long> tableIds, long commitSeq) {
+ this.dbBinlogTombstone = true;
+ this.dbId = dbId;
+ this.tableIds = tableIds;
+ this.commitSeq = commitSeq;
+ }
+
+ public BinlogTombstone(long dbId, long commitSeq) {
+ this.dbBinlogTombstone = false;
+ this.dbId = dbId;
+ this.tableIds = null;
+ this.commitSeq = commitSeq;
+ }
+
+ public void addTableRecord(long tableId, UpsertRecord upsertRecord) {
+ Map<Long, UpsertRecord.TableRecord> tableRecords = upsertRecord.getTableRecords();
+ UpsertRecord.TableRecord tableRecord = tableRecords.get(tableId);
+ tableVersionMap.put(tableId, tableRecord);
+ }
+
+ public void addTableRecord(long tableId, UpsertRecord.TableRecord record) {
+ tableVersionMap.put(tableId, record);
+ }
+
+ public boolean isDbBinlogTomstone() {
+ return dbBinlogTombstone;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public List<Long> getTableIds() {
+ return tableIds;
+ }
+
+ public long getCommitSeq() {
+ return commitSeq;
+ }
+
+ public Map<Long, UpsertRecord.TableRecord> getTableVersionMap() {
+ return tableVersionMap;
+ }
+
+ // only call when log editlog
+ public void clearTableVersionMap() {
+ tableVersionMap.clear();
+ }
+
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index c4312d2134..d6408b3076 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -17,18 +17,29 @@
package org.apache.doris.binlog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DBBinlog {
+ private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
+
private long dbId;
// guard for allBinlogs && tableBinlogMap
private ReentrantReadWriteLock lock;
@@ -37,6 +48,10 @@ public class DBBinlog {
// table binlogs
private Map<Long, TableBinlog> tableBinlogMap;
+ // Pair(commitSeq, timestamp), used for gc
+ // need UpsertRecord to add timestamps for gc
+ private List<Pair<Long, Long>> timestamps;
+
public DBBinlog(long dbId) {
lock = new ReentrantReadWriteLock();
this.dbId = dbId;
@@ -51,13 +66,19 @@ public class DBBinlog {
}
});
tableBinlogMap = new HashMap<Long, TableBinlog>();
+ timestamps = new ArrayList<Pair<Long, Long>>();
}
- public void addBinlog(TBinlog binlog) {
+ public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) {
List<Long> tableIds = binlog.getTableIds();
lock.writeLock().lock();
try {
+ if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
+ timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
+ }
+
allBinlogs.add(binlog);
+
if (tableIds == null) {
return;
}
@@ -98,8 +119,189 @@ public class DBBinlog {
}
}
+ public List<BinlogTombstone> gc() {
+ // check db
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ LOG.error("db not found. dbId: {}", dbId);
+ return null;
+ }
+
+ boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
+ if (dbBinlogEnable) {
+ // db binlog is enable, only one binlogTombstones
+ long ttlSeconds = db.getBinlogConfig().getTtlSeconds();
+ long currentSeconds = System.currentTimeMillis() / 1000;
+ long expireSeconds = currentSeconds - ttlSeconds;
+ long expireMs = expireSeconds * 1000;
+
+ BinlogTombstone tombstone = dbBinlogEnableGc(expireMs);
+ List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>();
+ if (tombstone != null) {
+ tombstones.add(tombstone);
+ }
+ return tombstones;
+ } else {
+ return dbBinlogDisableGc(db);
+ }
+ }
+
+ private List<BinlogTombstone> dbBinlogDisableGc(Database db) {
+ List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>();
+ List<TableBinlog> tableBinlogs = null;
+
+ lock.writeLock().lock();
+ try {
+ tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ for (TableBinlog tableBinlog : tableBinlogs) {
+ BinlogTombstone tombstone = tableBinlog.gc(db);
+ if (tombstone != null) {
+ tombstones.add(tombstone);
+ }
+ }
+ return tombstones;
+ }
+
+ private BinlogTombstone dbBinlogEnableGc(long expireMs) {
+ // find commitSeq from timestamps, if commitSeq's timestamp is less than expireSeconds, then remove it
+ long largestExpiredCommitSeq = -1;
+ TBinlog tombstoneBinlog = null;
+ List<Long> tableIds = null;
+ List<TableBinlog> tableBinlogs = null;
+
+ lock.writeLock().lock();
+ try {
+ Iterator<Pair<Long, Long>> iterator = timestamps.iterator();
+ while (iterator.hasNext()) {
+ Pair<Long, Long> pair = iterator.next();
+ if (pair.second < expireMs) {
+ largestExpiredCommitSeq = pair.first;
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+
+ Iterator<TBinlog> binlogIterator = allBinlogs.iterator();
+ while (binlogIterator.hasNext()) {
+ TBinlog binlog = binlogIterator.next();
+ if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+ tombstoneBinlog = binlog;
+ binlogIterator.remove();
+ } else {
+ break;
+ }
+ }
+
+ tableIds = new ArrayList<Long>(tableBinlogMap.keySet());
+ tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+ } finally {
+ lock.writeLock().unlock();
+ }
+ LOG.info("gc binlog. dbId: {}, expireMs: {}, largestExpiredCommitSeq: {}",
+ dbId, expireMs, largestExpiredCommitSeq);
+ if (tombstoneBinlog == null) {
+ return null;
+ }
+
+ BinlogTombstone tombstone = new BinlogTombstone(dbId, tableIds, tombstoneBinlog.getCommitSeq());
+ for (TableBinlog tableBinlog : tableBinlogs) {
+ BinlogTombstone binlogTombstone = tableBinlog.gc(largestExpiredCommitSeq);
+ if (binlogTombstone == null) {
+ continue;
+ }
+
+ Map<Long, UpsertRecord.TableRecord> tableVersionMap = binlogTombstone.getTableVersionMap();
+ if (tableVersionMap.size() > 1) {
+ LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap);
+ }
+ for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) {
+ long tableId = entry.getKey();
+ UpsertRecord.TableRecord record = entry.getValue();
+ tombstone.addTableRecord(tableId, record);
+ }
+ }
+
+ return tombstone;
+ }
+
+ public void replayGc(BinlogTombstone tombstone) {
+ if (tombstone.isDbBinlogTomstone()) {
+ dbBinlogEnableReplayGc(tombstone);
+ } else {
+ dbBinlogDisableReplayGc(tombstone);
+ }
+ }
+
+ public void dbBinlogEnableReplayGc(BinlogTombstone tombstone) {
+ long largestExpiredCommitSeq = tombstone.getCommitSeq();
+
+ lock.writeLock().lock();
+ try {
+ Iterator<Pair<Long, Long>> iterator = timestamps.iterator();
+ while (iterator.hasNext()) {
+ Pair<Long, Long> pair = iterator.next();
+ if (pair.first <= largestExpiredCommitSeq) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+
+ Iterator<TBinlog> binlogIterator = allBinlogs.iterator();
+ while (binlogIterator.hasNext()) {
+ TBinlog binlog = binlogIterator.next();
+ if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+ binlogIterator.remove();
+ } else {
+ break;
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ dbBinlogDisableReplayGc(tombstone);
+ }
+
+ public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
+ List<TableBinlog> tableBinlogs = null;
+
+ lock.writeLock().lock();
+ try {
+ tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (tableBinlogs.isEmpty()) {
+ return;
+ }
+
+ Set<Long> tableIds = new HashSet<Long>(tombstone.getTableIds());
+ long largestExpiredCommitSeq = tombstone.getCommitSeq();
+ for (TableBinlog tableBinlog : tableBinlogs) {
+ if (tableIds.contains(tableBinlog.getTableId())) {
+ tableBinlog.replayGc(largestExpiredCommitSeq);
+ }
+ }
+ }
+
// not thread safety, do this without lock
public void getAllBinlogs(List<TBinlog> binlogs) {
binlogs.addAll(allBinlogs);
}
+
+ public void removeTable(long tableId) {
+ lock.writeLock().lock();
+ try {
+ tableBinlogMap.remove(tableId);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 8a3391847b..659f32e6f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -17,14 +17,24 @@
package org.apache.doris.binlog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
import org.apache.doris.common.Pair;
import org.apache.doris.thrift.TBinlog;
+import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Iterator;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TableBinlog {
+ private static final Logger LOG = LogManager.getLogger(TableBinlog.class);
+
private long tableId;
private ReentrantReadWriteLock lock;
private TreeSet<TBinlog> binlogs;
@@ -65,4 +75,108 @@ public class TableBinlog {
lock.readLock().unlock();
}
}
+
+ // this method call when db binlog enable
+ public BinlogTombstone gc(long largestExpiredCommitSeq) {
+ TBinlog tombstoneUpsert = null;
+
+ lock.writeLock().lock();
+ try {
+ Iterator<TBinlog> iter = binlogs.iterator();
+ while (iter.hasNext()) {
+ TBinlog binlog = iter.next();
+ if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+ if (binlog.getType() == TBinlogType.UPSERT) {
+ tombstoneUpsert = binlog;
+ }
+ iter.remove();
+ } else {
+ break;
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (tombstoneUpsert == null) {
+ return null;
+ }
+
+ BinlogTombstone tombstone = new BinlogTombstone(-1, largestExpiredCommitSeq);
+ UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData());
+ tombstone.addTableRecord(tableId, upsertRecord);
+ return tombstone;
+ }
+
+ // this method call when db binlog disable
+ public BinlogTombstone gc(Database db) {
+ OlapTable table = null;
+ try {
+ Table tbl = db.getTableOrMetaException(tableId);
+ if (tbl == null) {
+ LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+ return null;
+ }
+ if (!(tbl instanceof OlapTable)) {
+ LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
+ return null;
+ }
+ table = (OlapTable) tbl;
+ } catch (Exception e) {
+ LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
+ return null;
+ }
+
+ long dbId = db.getId();
+ long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
+ long currentSeconds = System.currentTimeMillis() / 1000;
+ long expireSeconds = currentSeconds - ttlSeconds;
+ long expireMs = expireSeconds * 1000;
+
+ TBinlog tombstoneUpsert = null;
+ long largestExpiredCommitSeq = 0;
+ lock.writeLock().lock();
+ try {
+ Iterator<TBinlog> iter = binlogs.iterator();
+ while (iter.hasNext()) {
+ TBinlog binlog = iter.next();
+ if (binlog.getTimestamp() <= expireMs) {
+ if (binlog.getType() == TBinlogType.UPSERT) {
+ tombstoneUpsert = binlog;
+ }
+ largestExpiredCommitSeq = binlog.getCommitSeq();
+ iter.remove();
+ } else {
+ break;
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ BinlogTombstone tombstone = new BinlogTombstone(dbId, largestExpiredCommitSeq);
+ if (tombstoneUpsert != null) {
+ UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData());
+ tombstone.addTableRecord(tableId, upsertRecord);
+ }
+
+ return tombstone;
+ }
+
+ public void replayGc(long largestExpiredCommitSeq) {
+ lock.writeLock().lock();
+ try {
+ Iterator<TBinlog> iter = binlogs.iterator();
+ while (iter.hasNext()) {
+ TBinlog binlog = iter.next();
+ if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+ iter.remove();
+ } else {
+ break;
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
index e564250093..32052f798a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
@@ -31,8 +31,8 @@ import java.util.List;
import java.util.Map;
public class UpsertRecord {
- class TableRecord {
- class PartitionRecord {
+ public static class TableRecord {
+ public static class PartitionRecord {
@SerializedName(value = "partitionId")
public long partitionId;
@SerializedName(value = "version")
@@ -52,6 +52,10 @@ public class UpsertRecord {
partitionRecord.version = partitionCommitInfo.getVersion();
partitionRecords.add(partitionRecord);
}
+
+ public List<PartitionRecord> getPartitionRecords() {
+ return partitionRecords;
+ }
}
@SerializedName(value = "commitSeq")
@@ -105,10 +109,18 @@ public class UpsertRecord {
return new ArrayList<>(tableRecords.keySet());
}
+ public Map<Long, TableRecord> getTableRecords() {
+ return tableRecords;
+ }
+
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
+ public static UpsertRecord fromJson(String json) {
+ return GsonUtils.GSON.fromJson(json, UpsertRecord.class);
+ }
+
@Override
public String toString() {
return toJson();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index b853232f42..dc822b1470 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -79,6 +79,7 @@ import org.apache.doris.analysis.TableRenameClause;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.backup.BackupHandler;
+import org.apache.doris.binlog.BinlogGcer;
import org.apache.doris.binlog.BinlogManager;
import org.apache.doris.blockrule.SqlBlockRuleMgr;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
@@ -172,6 +173,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.AlterMultiMaterializedView;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
+import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.EditLog;
@@ -448,6 +450,8 @@ public class Env {
private BinlogManager binlogManager;
+ private BinlogGcer binlogGcer;
+
/**
* TODO(tsy): to be removed after load refactor
*/
@@ -666,6 +670,7 @@ public class Env {
this.loadManagerAdapter = new LoadManagerAdapter();
this.hiveTransactionMgr = new HiveTransactionMgr();
this.binlogManager = new BinlogManager();
+ this.binlogGcer = new BinlogGcer();
}
public static void destroyCheckpoint() {
@@ -1480,6 +1485,9 @@ public class Env {
// start mtmv jobManager
mtmvJobManager.start();
getRefreshManager().start();
+
+ // binlog gcer
+ binlogGcer.start();
}
// start threads that should running on all FE
@@ -2805,6 +2813,10 @@ public class Env {
getInternalCatalog().replayRecoverPartition(info);
}
+ public void replayGcBinlog(BinlogGcInfo binlogGcInfo) {
+ binlogManager.replayGc(binlogGcInfo);
+ }
+
public static void getDdlStmt(TableIf table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
long specificVersion) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 1626bedbf2..39b804eb40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -73,6 +73,7 @@ import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
+import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.persist.ColocatePersistInfo;
@@ -823,6 +824,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_GC_BINLOG: {
+ data = BinlogGcInfo.read(in);
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java
new file mode 100644
index 0000000000..55b13ad5ce
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BinlogGcInfo.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+
+import org.apache.doris.binlog.BinlogTombstone;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class BinlogGcInfo implements Writable {
+ @SerializedName(value = "tombstones")
+ private List<BinlogTombstone> tombstones = null;
+
+ public BinlogGcInfo() {
+ // for persist
+ this.tombstones = null;
+ }
+
+ public BinlogGcInfo(List<BinlogTombstone> tombstones) {
+ this.tombstones = tombstones;
+ }
+
+ public List<BinlogTombstone> getTombstones() {
+ return tombstones;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static BinlogGcInfo read(DataInput in) throws IOException {
+ return GsonUtils.GSON.fromJson(Text.readString(in), BinlogGcInfo.class);
+ }
+
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 541518148f..04c1b58d6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1031,6 +1031,12 @@ public class EditLog {
alterDatabasePropertyInfo.getProperties());
break;
}
+ case OperationType.OP_GC_BINLOG: {
+ BinlogGcInfo binlogGcInfo = (BinlogGcInfo) journal.getData();
+ LOG.info("replay gc binlog: {}", binlogGcInfo);
+ env.replayGcBinlog(binlogGcInfo);
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1797,4 +1803,8 @@ public class EditLog {
public void logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
}
+
+ public void logGcBinlog(BinlogGcInfo log) {
+ logEdit(OperationType.OP_GC_BINLOG, log);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 4893bbc1ec..674374f917 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -306,6 +306,8 @@ public class OperationType {
public static final short OP_ALTER_DATABASE_PROPERTY = 434;
+ public static final short OP_GC_BINLOG = 435;
+
/**
* Get opcode name by op code.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2815c9229e..d296663745 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -110,6 +110,8 @@ import org.apache.doris.thrift.TGetBinlogRequest;
import org.apache.doris.thrift.TGetBinlogResult;
import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
+import org.apache.doris.thrift.TGetMasterTokenRequest;
+import org.apache.doris.thrift.TGetMasterTokenResult;
import org.apache.doris.thrift.TGetQueryStatsRequest;
import org.apache.doris.thrift.TGetSnapshotRequest;
import org.apache.doris.thrift.TGetSnapshotResult;
@@ -940,6 +942,17 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
}
+ private void checkPassword(String cluster, String user, String passwd, String clientIp)
+ throws AuthenticationException {
+ if (Strings.isNullOrEmpty(cluster)) {
+ cluster = SystemInfoService.DEFAULT_CLUSTER;
+ }
+ final String fullUserName = ClusterNamespace.getFullName(cluster, user);
+ List<UserIdentity> currentUser = Lists.newArrayList();
+ Env.getCurrentEnv().getAuth().checkPlainPassword(fullUserName, clientIp, passwd, currentUser);
+ Preconditions.checkState(currentUser.size() == 1);
+ }
+
@Override
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
String clientAddr = getClientAddrAsString();
@@ -2296,7 +2309,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
Env env = Env.getCurrentEnv();
String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
- long dbId = db.getId();
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
@@ -2318,6 +2330,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
// step 6: get binlog
+ long dbId = db.getId();
TGetBinlogResult result = new TGetBinlogResult();
result.setStatus(new TStatus(TStatusCode.OK));
long prevCommitSeq = request.getPrevCommitSeq();
@@ -2516,4 +2529,27 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
+
+ public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) throws TException {
+ String clientAddr = getClientAddrAsString();
+ LOG.debug("receive get master token request: {}", request);
+
+ TGetMasterTokenResult result = new TGetMasterTokenResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ checkPassword(request.getCluster(), request.getUser(), request.getPassword(), clientAddr);
+ result.setToken(Env.getCurrentEnv().getToken());
+ } catch (AuthenticationException e) {
+ LOG.warn("failed to get master token: {}", e.getMessage());
+ status.setStatusCode(TStatusCode.NOT_AUTHORIZED);
+ status.addToErrorMsgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ }
+
+ return result;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 4f9d9ef2f6..3c12160610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -34,6 +34,7 @@ import org.apache.doris.thrift.TCompactionReq;
import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TDownloadReq;
import org.apache.doris.thrift.TDropTabletReq;
+import org.apache.doris.thrift.TGcBinlogReq;
import org.apache.doris.thrift.TMoveDirReq;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishVersionRequest;
@@ -375,6 +376,15 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setPushCooldownConf(request);
return tAgentTaskRequest;
}
+ case GC_BINLOG: {
+ BinlogGcTask binlogGcTask = (BinlogGcTask) task;
+ TGcBinlogReq request = binlogGcTask.toThrift();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(request.toString());
+ }
+ tAgentTaskRequest.setGcBinlogReq(request);
+ return tAgentTaskRequest;
+ }
default:
LOG.debug("could not find task type for task [{}]", task);
return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java
new file mode 100644
index 0000000000..15bd81473e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/BinlogGcTask.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TGcBinlogReq;
+import org.apache.doris.thrift.TTabletGcBinlogInfo;
+import org.apache.doris.thrift.TTaskType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BinlogGcTask extends AgentTask {
+ private List<TTabletGcBinlogInfo> tabletGcBinlogInfos = new ArrayList<>();
+
+ public BinlogGcTask(long backendId, long signature) {
+ super(null, backendId, TTaskType.GC_BINLOG, -1, -1, -1, -1, -1, signature);
+ }
+
+ public void addTask(long tabletId, long version) {
+ TTabletGcBinlogInfo tabletGcBinlogInfo = new TTabletGcBinlogInfo();
+ tabletGcBinlogInfo.setTabletId(tabletId);
+ tabletGcBinlogInfo.setVersion(version);
+ tabletGcBinlogInfos.add(tabletGcBinlogInfo);
+ }
+
+ public TGcBinlogReq toThrift() {
+ TGcBinlogReq req = new TGcBinlogReq();
+ req.setTabletGcBinlogInfos(tabletGcBinlogInfos);
+ return req;
+ }
+}
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 9bf2abe5c8..6a982fb58c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -59,7 +59,7 @@ message KeyBoundsPB {
}
message RowsetMetaPB {
- required int64 rowset_id = 1;
+ required int64 rowset_id = 1; // Deprecated. Use rowset_id_v2 instead.
optional int64 partition_id = 2;
optional int64 tablet_id = 3;
// only for pending rowset
@@ -328,7 +328,8 @@ message DeleteBitmapPB {
message BinlogMetaEntryPB {
optional int64 version = 1;
optional int64 tablet_id = 2;
- optional int64 rowset_id = 3;
+ optional int64 rowset_id = 3; // Deprecated use rowset_id_v2 instead
optional int64 num_segments = 4;
optional int64 creation_time = 5;
+ optional string rowset_id_v2 = 6;
}
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index b30c7ed26a..f92f873fc4 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -198,6 +198,15 @@ struct TAlterInvertedIndexReq {
10: optional i64 expiration
}
+struct TTabletGcBinlogInfo {
+ 1: optional Types.TTabletId tablet_id
+ 2: optional i64 version
+}
+
+struct TGcBinlogReq {
+ 1: optional list<TTabletGcBinlogInfo> tablet_gc_binlog_infos
+}
+
struct TStorageMigrationReqV2 {
1: optional Types.TTabletId base_tablet_id
2: optional Types.TTabletId new_tablet_id
@@ -449,6 +458,7 @@ struct TAgentTaskRequest {
30: optional TPushCooldownConfReq push_cooldown_conf
31: optional TPushStoragePolicyReq push_storage_policy_req
32: optional TAlterInvertedIndexReq alter_inverted_index_req
+ 33: optional TGcBinlogReq gc_binlog_req
}
struct TAgentResult {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index e4bff3a4fc..28b77a3f20 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1029,6 +1029,17 @@ struct TRestoreSnapshotResult {
1: optional Status.TStatus status
}
+struct TGetMasterTokenRequest {
+ 1: optional string cluster
+ 2: optional string user
+ 3: optional string password
+}
+
+struct TGetMasterTokenResult {
+ 1: optional Status.TStatus status
+ 2: optional string token
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1089,4 +1100,6 @@ service FrontendService {
TQueryStatsResult getQueryStats(1: TGetQueryStatsRequest request)
TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request)
+
+ TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request)
}
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 91074669e7..4949ca55f2 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -218,7 +218,8 @@ enum TTaskType {
NOTIFY_UPDATE_STORAGE_POLICY, // deprecated
PUSH_COOLDOWN_CONF,
PUSH_STORAGE_POLICY,
- ALTER_INVERTED_INDEX
+ ALTER_INVERTED_INDEX,
+ GC_BINLOG
}
enum TStmtType {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org