You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/25 07:25:02 UTC
[doris] 01/03: [enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit e5997114bd23fd4422da27b6d40ca46a1fa2fdad
Author: Xin Liao <li...@126.com>
AuthorDate: Thu Jun 22 21:50:14 2023 +0800
[enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)
version discontinuity may occur when clone. To deal with this case, add async publish task when version is discontinuous.
---
be/src/agent/task_worker_pool.cpp | 9 ++-
be/src/olap/data_dir.cpp | 15 ++++
be/src/olap/olap_server.cpp | 98 ++++++++++++++++++++++++
be/src/olap/rowset/rowset.h | 7 ++
be/src/olap/storage_engine.cpp | 4 +
be/src/olap/storage_engine.h | 12 +++
be/src/olap/tablet_meta_manager.cpp | 39 ++++++++++
be/src/olap/tablet_meta_manager.h | 12 +++
be/src/olap/task/engine_clone_task.cpp | 16 +++-
be/src/olap/task/engine_publish_version_task.cpp | 63 ++++++++++++++-
be/src/olap/task/engine_publish_version_task.h | 30 +++++++-
gensrc/proto/olap_file.proto | 5 ++
12 files changed, 302 insertions(+), 8 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index a91540b445..463845d2c7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1448,13 +1448,15 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
+ // partition_id, tablet_id, publish_version
+ std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
- &succ_tablet_ids);
+ &succ_tablet_ids, &discontinuous_version_tablets);
status = _env->storage_engine()->execute_task(&engine_task);
if (status.ok()) {
break;
@@ -1488,6 +1490,11 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
continue;
}
+ for (auto& item : discontinuous_version_tablets) {
+ StorageEngine::instance()->add_async_publish_task(
+ std::get<0>(item), std::get<1>(item), std::get<2>(item),
+ publish_version_req.transaction_id, false);
+ }
TFinishTaskRequest finish_task_request;
if (!status) {
DorisMetrics::instance()->publish_task_failed_total->increment(1);
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index b83ce43635..970138cd0e 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -452,6 +452,21 @@ Status DataDir::load() {
}
}
+ auto load_pending_publish_info_func = [](int64_t tablet_id, int64_t publish_version,
+ const string& info) {
+ PendingPublishInfoPB pending_publish_info_pb;
+ bool parsed = pending_publish_info_pb.ParseFromString(info);
+ if (!parsed) {
+ LOG(WARNING) << "parse pending publish info failed, tablt_id: " << tablet_id
+ << " publish_version: " << publish_version;
+ }
+ StorageEngine::instance()->add_async_publish_task(
+ pending_publish_info_pb.partition_id(), tablet_id, publish_version,
+ pending_publish_info_pb.transaction_id(), true);
+ return true;
+ };
+ TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func);
+
// traverse rowset
// 1. add committed rowset to txn map
// 2. add visible rowset to tablet
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 2fab94a2bf..57bf40a147 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -16,6 +16,7 @@
// under the License.
#include <gen_cpp/Types_types.h>
+#include <gen_cpp/olap_file.pb.h>
#include <stdint.h>
#include <algorithm>
@@ -60,7 +61,9 @@
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
+#include "olap/tablet_meta_manager.h"
#include "olap/tablet_schema.h"
+#include "olap/task/engine_publish_version_task.h"
#include "olap/task/index_builder.h"
#include "runtime/client_cache.h"
#include "service/brpc.h"
@@ -245,6 +248,11 @@ Status StorageEngine::start_bg_threads() {
.set_max_threads(config::calc_delete_bitmap_max_thread)
.build(&_calc_delete_bitmap_thread_pool);
+ RETURN_IF_ERROR(Thread::create(
+ "StorageEngine", "aync_publish_version_thread",
+ [this]() { this->_async_publish_callback(); }, &_async_publish_thread));
+ LOG(INFO) << "async publish thread started";
+
LOG(INFO) << "all storage engine's background threads are started.";
return Status::OK();
}
@@ -1204,4 +1212,94 @@ void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
+void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
+ int64_t publish_version, int64_t transaction_id,
+ bool is_recovery) {
+ if (!is_recovery) {
+ TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
+ PendingPublishInfoPB pending_publish_info_pb;
+ pending_publish_info_pb.set_partition_id(partition_id);
+ pending_publish_info_pb.set_transaction_id(transaction_id);
+ TabletMetaManager::save_pending_publish_info(tablet->data_dir(), tablet->tablet_id(),
+ publish_version,
+ pending_publish_info_pb.SerializeAsString());
+ }
+ LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
+ << " version: " << publish_version << " txn_id:" << transaction_id
+ << " is_recovery: " << is_recovery;
+ std::lock_guard<std::mutex> lock(_async_publish_mutex);
+ _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
+}
+
+int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
+ std::lock_guard<std::mutex> lock(_async_publish_mutex);
+ auto iter = _async_publish_tasks.find(tablet_id);
+ if (iter == _async_publish_tasks.end()) {
+ return INT64_MAX;
+ }
+ if (iter->second.empty()) {
+ return INT64_MAX;
+ }
+ return iter->second.begin()->first;
+}
+
+void StorageEngine::_async_publish_callback() {
+ while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
+ // tablet, publish_version
+ std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
+ {
+ std::lock_guard<std::mutex> lock(_async_publish_mutex);
+ for (auto tablet_iter = _async_publish_tasks.begin();
+ tablet_iter != _async_publish_tasks.end();) {
+ if (tablet_iter->second.empty()) {
+ tablet_iter = _async_publish_tasks.erase(tablet_iter);
+ continue;
+ }
+ int64_t tablet_id = tablet_iter->first;
+ TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
+ if (!tablet) {
+ LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
+ << tablet_id;
+ // TODO(liaoxin) remove pending publish info from db
+ tablet_iter = _async_publish_tasks.erase(tablet_iter);
+ continue;
+ }
+
+ auto task_iter = tablet_iter->second.begin();
+ int64_t version = task_iter->first;
+ int64_t transaction_id = task_iter->second.first;
+ int64_t partition_id = task_iter->second.second;
+ int64_t max_version;
+ {
+ std::shared_lock rdlock(tablet->get_header_lock());
+ max_version = tablet->max_version().second;
+ }
+
+ if (version <= max_version) {
+ need_removed_tasks.emplace_back(tablet, version);
+ tablet_iter->second.erase(task_iter);
+ tablet_iter++;
+ continue;
+ }
+ if (version != max_version + 1) {
+ tablet_iter++;
+ continue;
+ }
+
+ auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
+ tablet, partition_id, transaction_id, version);
+ StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
+ [=]() { async_publish_task->handle(); });
+ tablet_iter->second.erase(task_iter);
+ need_removed_tasks.emplace_back(tablet, version);
+ tablet_iter++;
+ }
+ }
+ for (auto& [tablet, publish_version] : need_removed_tasks) {
+ TabletMetaManager::remove_pending_publish_info(tablet->data_dir(), tablet->tablet_id(),
+ publish_version);
+ }
+ }
+}
+
} // namespace doris
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 7160af3424..8829cc0770 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -299,6 +299,12 @@ public:
bool check_rowset_segment();
+ bool start_publish() {
+ bool expect = false;
+ return _is_publish_running.compare_exchange_strong(expect, true);
+ }
+ void finish_publish() { _is_publish_running.store(false); }
+
[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
protected:
@@ -337,6 +343,7 @@ protected:
// rowset state machine
RowsetStateMachine _rowset_state_machine;
std::atomic<uint64_t> _delayed_expired_timestamp = 0;
+ std::atomic<bool> _is_publish_running {false};
};
} // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 94f1bb5a87..d10c11b1c2 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -162,6 +162,9 @@ StorageEngine::~StorageEngine() {
if (_tablet_meta_checkpoint_thread_pool) {
_tablet_meta_checkpoint_thread_pool->shutdown();
}
+ if (_calc_delete_bitmap_thread_pool) {
+ _calc_delete_bitmap_thread_pool->shutdown();
+ }
_clear();
_s_instance = nullptr;
}
@@ -556,6 +559,7 @@ void StorageEngine::stop() {
THREAD_JOIN(_disk_stat_monitor_thread);
THREAD_JOIN(_fd_cache_clean_thread);
THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
+ THREAD_JOIN(_async_publish_thread);
#undef THREAD_JOIN
#define THREADS_JOIN(threads) \
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 15b1a98a78..0113101565 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -220,6 +220,10 @@ public:
void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
+ void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
+ int64_t transaction_id, bool is_recover);
+ int64_t get_pending_publish_min_version(int64_t tablet_id);
+
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
@@ -318,6 +322,8 @@ private:
void _gc_binlogs(int64_t tablet_id, int64_t version);
+ void _async_publish_callback();
+
private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@@ -454,6 +460,12 @@ private:
std::mutex _running_cooldown_mutex;
std::unordered_set<int64_t> _running_cooldown_tablets;
+ // tablet_id, publish_version, transaction_id, partition_id
+ std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
+ // aync publish for discontinuous versions of merge_on_write table
+ scoped_refptr<Thread> _async_publish_thread;
+ std::mutex _async_publish_mutex;
+
DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};
diff --git a/be/src/olap/tablet_meta_manager.cpp b/be/src/olap/tablet_meta_manager.cpp
index ac4deacd14..d3bc41fa19 100644
--- a/be/src/olap/tablet_meta_manager.cpp
+++ b/be/src/olap/tablet_meta_manager.cpp
@@ -175,4 +175,43 @@ Status TabletMetaManager::load_json_meta(DataDir* store, const std::string& meta
return save(store, tablet_id, schema_hash, meta_binary);
}
+Status TabletMetaManager::save_pending_publish_info(DataDir* store, TTabletId tablet_id,
+ int64_t publish_version,
+ const std::string& meta_binary) {
+ std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
+ OlapMeta* meta = store->get_meta();
+ LOG(INFO) << "save pending publish rowset, key:" << key
+ << " meta_size=" << meta_binary.length();
+ return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary);
+}
+
+Status TabletMetaManager::remove_pending_publish_info(DataDir* store, TTabletId tablet_id,
+ int64_t publish_version) {
+ std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
+ OlapMeta* meta = store->get_meta();
+ Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
+ LOG(INFO) << "remove pending publish_info, key:" << key << ", res:" << res;
+ return res;
+}
+
+Status TabletMetaManager::traverse_pending_publish(
+ OlapMeta* meta, std::function<bool(int64_t, int64_t, const std::string&)> const& func) {
+ auto traverse_header_func = [&func](const std::string& key, const std::string& value) -> bool {
+ std::vector<std::string> parts;
+ // key format: "ppi_" + tablet_id + "_" + publish_version
+ split_string<char>(key, '_', &parts);
+ if (parts.size() != 3) {
+ LOG(WARNING) << "invalid pending publish info key:" << key
+ << ", split size:" << parts.size();
+ return true;
+ }
+ int64_t tablet_id = std::stol(parts[1], nullptr, 10);
+ int64_t version = std::stol(parts[2], nullptr, 10);
+ return func(tablet_id, version, value);
+ };
+ Status status =
+ meta->iterate(META_COLUMN_FAMILY_INDEX, PENDING_PUBLISH_INFO, traverse_header_func);
+ return status;
+}
+
} // namespace doris
diff --git a/be/src/olap/tablet_meta_manager.h b/be/src/olap/tablet_meta_manager.h
index 15d6763477..6ba1d76757 100644
--- a/be/src/olap/tablet_meta_manager.h
+++ b/be/src/olap/tablet_meta_manager.h
@@ -34,6 +34,8 @@ const std::string OLD_HEADER_PREFIX = "hdr_";
const std::string HEADER_PREFIX = "tabletmeta_";
+const std::string PENDING_PUBLISH_INFO = "ppi_";
+
// Helper Class for managing tablet headers of one root path.
class TabletMetaManager {
public:
@@ -57,6 +59,16 @@ public:
const string& header_prefix = "tabletmeta_");
static Status load_json_meta(DataDir* store, const std::string& meta_path);
+
+ static Status save_pending_publish_info(DataDir* store, TTabletId tablet_id,
+ int64_t publish_version,
+ const std::string& meta_binary);
+
+ static Status remove_pending_publish_info(DataDir* store, TTabletId tablet_id,
+ int64_t publish_version);
+
+ static Status traverse_pending_publish(
+ OlapMeta* meta, std::function<bool(int64_t, int64_t, const std::string&)> const& func);
};
} // namespace doris
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index 912b3aa4e5..845cbe1117 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -130,7 +130,19 @@ Status EngineCloneTask::_do_clone() {
auto local_data_path = fmt::format("{}/{}", tablet->tablet_path(), CLONE_PREFIX);
bool allow_incremental_clone = false;
- tablet->calc_missed_versions(_clone_req.committed_version, &missed_versions);
+ int64_t specified_version = _clone_req.committed_version;
+ if (tablet->enable_unique_key_merge_on_write()) {
+ int64_t min_pending_ver =
+ StorageEngine::instance()->get_pending_publish_min_version(tablet->tablet_id());
+ if (min_pending_ver - 1 < specified_version) {
+ LOG(INFO) << "use min pending publish version for clone, min_pending_ver: "
+ << min_pending_ver
+ << " committed_version: " << _clone_req.committed_version;
+ specified_version = min_pending_ver - 1;
+ }
+ }
+
+ tablet->calc_missed_versions(specified_version, &missed_versions);
// if missed version size is 0, then it is useless to clone from remote be, it means local data is
// completed. Or remote be will just return header not the rowset files. clone will failed.
@@ -153,7 +165,7 @@ Status EngineCloneTask::_do_clone() {
RETURN_IF_ERROR(_make_and_download_snapshots(*(tablet->data_dir()), local_data_path,
&src_host, &src_file_path, missed_versions,
&allow_incremental_clone));
- RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, _clone_req.committed_version,
+ RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, specified_version,
allow_incremental_clone));
} else {
LOG(INFO) << "clone tablet not exist, begin clone a new tablet from remote be. "
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 748706e421..02af6bf674 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -70,11 +70,13 @@ void TabletPublishStatistics::record_in_bvar() {
EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids,
- std::vector<TTabletId>* succ_tablet_ids)
+ std::vector<TTabletId>* succ_tablet_ids,
+ std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
: _total_task_num(0),
_publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
- _succ_tablet_ids(succ_tablet_ids) {}
+ _succ_tablet_ids(succ_tablet_ids),
+ _discontinuous_version_tablets(discontinuous_version_tablets) {}
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
@@ -178,6 +180,8 @@ Status EnginePublishVersionTask::finish() {
// publish failed
if (!tablet->check_version_exist(version)) {
add_error_tablet_id(tablet_info.tablet_id);
+ _discontinuous_version_tablets->emplace_back(
+ partition_id, tablet_info.tablet_id, version.first);
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>();
}
continue;
@@ -250,7 +254,14 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
void TabletPublishTxnTask::handle() {
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
+ if (!_rowset->start_publish()) {
+ LOG(WARNING) << "publish is running. rowset_id=" << _rowset->rowset_id()
+ << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
+ _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
+ return;
+ }
Defer defer {[&] {
+ _rowset->finish_publish();
if (_engine_publish_version_task->finish_task() == 1) {
_engine_publish_version_task->notify();
}
@@ -289,4 +300,52 @@ void TabletPublishTxnTask::handle() {
<< (cost_us > 500 * 1000 ? _stats.to_string() : "");
}
+void AsyncTabletPublishTask::handle() {
+ _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
+ std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+ StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+ _transaction_id, _partition_id, &tablet_related_rs);
+ auto iter = tablet_related_rs.find(
+ TabletInfo(_tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid()));
+ if (iter == tablet_related_rs.end()) {
+ return;
+ }
+ RowsetSharedPtr rowset = iter->second;
+ if (!rowset->start_publish()) {
+ LOG(WARNING) << "publish is running. rowset_id=" << rowset->rowset_id()
+ << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
+ return;
+ }
+ Defer defer {[&] { rowset->finish_publish(); }};
+ Version version(_version, _version);
+ auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
+ _partition_id, _tablet, _transaction_id, version, &_stats);
+ if (publish_status != Status::OK()) {
+ LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id()
+ << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
+ << ", res=" << publish_status;
+ return;
+ }
+
+ // add visible rowset to tablet
+ int64_t t1 = MonotonicMicros();
+ publish_status = _tablet->add_inc_rowset(rowset);
+ _stats.add_inc_rowset_us = MonotonicMicros() - t1;
+ if (publish_status != Status::OK() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
+ LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << rowset->rowset_id()
+ << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
+ << ", res=" << publish_status;
+ return;
+ }
+ int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
+ // print stats if publish cost > 500ms
+ g_tablet_publish_latency << cost_us;
+ _stats.record_in_bvar();
+ LOG(INFO) << "async publish version successfully on tablet, table_id=" << _tablet->table_id()
+ << ", tablet=" << _tablet->full_name() << ", transaction_id=" << _transaction_id
+ << ", version=" << _version << ", num_rows=" << rowset->num_rows()
+ << ", res=" << publish_status << ", cost: " << cost_us << "(us) "
+ << (cost_us > 500 * 1000 ? _stats.to_string() : "");
+}
+
} // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index bd6907b913..c8a68dedea 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -82,9 +82,10 @@ private:
class EnginePublishVersionTask : public EngineTask {
public:
- EnginePublishVersionTask(const TPublishVersionRequest& publish_version_req,
- vector<TTabletId>* error_tablet_ids,
- std::vector<TTabletId>* succ_tablet_ids = nullptr);
+ EnginePublishVersionTask(
+ const TPublishVersionRequest& publish_version_req, vector<TTabletId>* error_tablet_ids,
+ std::vector<TTabletId>* succ_tablet_ids,
+ std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets);
~EnginePublishVersionTask() {}
virtual Status finish() override;
@@ -103,11 +104,34 @@ private:
std::mutex _tablet_ids_mutex;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
+ std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
std::mutex _tablet_finish_mutex;
std::condition_variable _tablet_finish_cond;
};
+class AsyncTabletPublishTask {
+public:
+ AsyncTabletPublishTask(TabletSharedPtr tablet, int64_t partition_id, int64_t transaction_id,
+ int64_t version)
+ : _tablet(tablet),
+ _partition_id(partition_id),
+ _transaction_id(transaction_id),
+ _version(version) {
+ _stats.submit_time_us = MonotonicMicros();
+ }
+ ~AsyncTabletPublishTask() = default;
+
+ void handle();
+
+private:
+ TabletSharedPtr _tablet;
+ int64_t _partition_id;
+ int64_t _transaction_id;
+ int64_t _version;
+ TabletPublishStatistics _stats;
+};
+
} // namespace doris
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_PUBLISH_VERSION_TASK_H
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 6a982fb58c..fe894dba9d 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -333,3 +333,8 @@ message BinlogMetaEntryPB {
optional int64 creation_time = 5;
optional string rowset_id_v2 = 6;
}
+
+message PendingPublishInfoPB {
+ optional int64 partition_id = 1;
+ optional int64 transaction_id = 2;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org