You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/24 14:22:36 UTC
[incubator-doris] branch master updated: [Tablet] A small refactor
on class Tablet (#3339)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 37fccd5 [Tablet] A small refactor on class Tablet (#3339)
37fccd5 is described below
commit 37fccd53c4aa60eb0e995aa4fe9111cff344356e
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Fri Apr 24 22:22:26 2020 +0800
[Tablet] A small refactor on class Tablet (#3339)
There is no functional changes in this patch.
Key refactor points are:
- Remove meaningless return value of functions in class Tablet, and
also some related functions in other classes
- Allow RowsetGraph::capture_consistent_versions to pass a nullptr
to the output parameter
- Use CHECK instead of LOG(FATAL) to simplify code
---
be/src/http/action/compaction_action.cpp | 5 +-
be/src/olap/compaction.cpp | 25 +---
be/src/olap/compaction.h | 2 +-
be/src/olap/cumulative_compaction.cpp | 2 +-
be/src/olap/rowset_graph.cpp | 36 +++---
be/src/olap/schema_change.cpp | 47 ++-----
be/src/olap/tablet.cpp | 203 +++++++++++--------------------
be/src/olap/tablet.h | 33 +++--
be/src/olap/tablet_manager.cpp | 13 +-
9 files changed, 125 insertions(+), 241 deletions(-)
diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp
index 11f5f09..489ba6d 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -56,10 +56,7 @@ Status CompactionAction::_handle_show_compaction(HttpRequest *req, std::string*
return Status::NotFound("Tablet not found");
}
- OLAPStatus s = tablet->get_compaction_status(json_result);
- if (s != OLAP_SUCCESS) {
- return Status::InternalError(strings::Substitute("failed to get tablet compaction status. res $0", s));
- }
+ tablet->get_compaction_status(json_result);
return Status::OK();
}
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 461de41..d203b63 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -90,7 +90,7 @@ OLAPStatus Compaction::do_compaction_impl() {
RETURN_NOT_OK(check_correctness(stats));
// 4. modify rowsets in memory
- RETURN_NOT_OK(modify_rowsets());
+ modify_rowsets();
// 5. update last success compaction time
int64_t now = UnixMillis();
@@ -140,30 +140,13 @@ OLAPStatus Compaction::construct_input_rowset_readers() {
return OLAP_SUCCESS;
}
-OLAPStatus Compaction::modify_rowsets() {
+void Compaction::modify_rowsets() {
std::vector<RowsetSharedPtr> output_rowsets;
output_rowsets.push_back(_output_rowset);
WriteLock wrlock(_tablet->get_header_lock_ptr());
- OLAPStatus res = _tablet->modify_rowsets(output_rowsets, _input_rowsets);
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to replace data sources. res" << res
- << ", tablet=" << _tablet->full_name()
- << ", compaction__version=" << _output_version.first
- << "-" << _output_version.second;
- return res;
- }
-
- res = _tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save tablet meta. res=" << res
- << ", tablet=" << _tablet->full_name()
- << ", compaction_version=" << _output_version.first
- << "-" << _output_version.second;
- return OLAP_ERR_BE_SAVE_HEADER_ERROR;
- }
-
- return OLAP_SUCCESS;
+ _tablet->modify_rowsets(output_rowsets, _input_rowsets);
+ _tablet->save_meta();
}
OLAPStatus Compaction::gc_unused_rowsets() {
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index aeccc8d..9837ac5 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -59,7 +59,7 @@ protected:
OLAPStatus do_compaction();
OLAPStatus do_compaction_impl();
- OLAPStatus modify_rowsets();
+ void modify_rowsets();
OLAPStatus gc_unused_rowsets();
OLAPStatus construct_output_rowset_writer();
diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index 4a32553..5aa6a8d 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -40,7 +40,7 @@ OLAPStatus CumulativeCompaction::compact() {
}
// 1.calculate cumulative point
- RETURN_NOT_OK(_tablet->calculate_cumulative_point());
+ _tablet->calculate_cumulative_point();
// 2. pick rowsets to compact
RETURN_NOT_OK(pick_rowsets_to_compact());
diff --git a/be/src/olap/rowset_graph.cpp b/be/src/olap/rowset_graph.cpp
index cb59ea0..4b56ba9 100644
--- a/be/src/olap/rowset_graph.cpp
+++ b/be/src/olap/rowset_graph.cpp
@@ -150,11 +150,6 @@ OLAPStatus RowsetGraph::capture_consistent_versions(const Version& spec_version,
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}
- if (version_path == nullptr) {
- LOG(WARNING) << "param version_path is nullptr.";
- return OLAP_ERR_INPUT_PARAMETER_ERROR;
- }
-
// bfs_queue's element is vertex_index.
std::queue<int64_t> bfs_queue;
// predecessor[i] means the predecessor of vertex_index 'i'.
@@ -227,25 +222,26 @@ OLAPStatus RowsetGraph::capture_consistent_versions(const Version& spec_version,
reversed_path.push_back(tmp_vertex_index);
}
- // Make version_path from reversed_path.
- std::stringstream shortest_path_for_debug;
- for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) {
- int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value;
- int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value;
+ if (version_path != nullptr) {
+ // Make version_path from reversed_path.
+ std::stringstream shortest_path_for_debug;
+ for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) {
+ int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value;
+ int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value;
- // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
- if (tmp_start_vertex_value <= tmp_end_vertex_value) {
- version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1);
- } else {
- version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1);
- }
+ // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
+ if (tmp_start_vertex_value <= tmp_end_vertex_value) {
+ version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1);
+ } else {
+ version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1);
+ }
- shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' ';
+ shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' ';
+ }
+ VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version
+ << ", path=" << shortest_path_for_debug.str();
}
- VLOG(10) << "success to find path for spec_version. spec_version=" << spec_version
- << ", path=" << shortest_path_for_debug.str();
-
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index e69d040..65725b7 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1322,11 +1322,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
// check if new_tablet.ce_point > base_tablet.ce_point?
new_tablet->set_cumulative_layer_point(-1);
// save tablet meta
- res = new_tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save tablet meta after remove rowset from new tablet"
- << new_tablet->full_name();
- }
+ new_tablet->save_meta();
for (auto& rowset : rowsets_to_delete) {
// do not call rowset.remove directly, using gc thread to delete it
StorageEngine::instance()->add_unused_rowset(rowset);
@@ -1399,10 +1395,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
if (res != OLAP_SUCCESS) {
break;
}
- res = new_tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- break;
- }
+ new_tablet->save_meta();
} while(0);
if (res == OLAP_SUCCESS) {
@@ -1588,25 +1581,14 @@ OLAPStatus SchemaChangeHandler::_add_alter_task(
new_tablet->schema_hash(),
versions_to_be_changed,
alter_tablet_type);
- OLAPStatus res = base_tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save base tablet meta. res=" << res
- << ", tablet=" << base_tablet->full_name();
- return res;
- }
-
+ base_tablet->save_meta();
new_tablet->add_alter_task(base_tablet->tablet_id(),
base_tablet->schema_hash(),
vector<Version>(), // empty versions
alter_tablet_type);
- res = new_tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save new tablet meta. res=" << res
- << ", tablet=" << new_tablet->full_name();
- return res;
- }
+ new_tablet->save_meta();
LOG(INFO) << "successfully add alter task to both base and new";
- return res;
+ return OLAP_SUCCESS;
}
OLAPStatus SchemaChangeHandler::_save_alter_state(
@@ -1627,13 +1609,7 @@ OLAPStatus SchemaChangeHandler::_save_alter_state(
<< " res=" << res;
return res;
}
- res = base_tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save base tablet meta. res=" << res
- << ", base_tablet=" << base_tablet->full_name();
- return res;
- }
-
+ base_tablet->save_meta();
AlterTabletTaskSharedPtr new_alter_task = new_tablet->alter_task();
if (new_alter_task == nullptr) {
LOG(INFO) << "could not find alter task info from new tablet " << new_tablet->full_name();
@@ -1646,14 +1622,9 @@ OLAPStatus SchemaChangeHandler::_save_alter_state(
<< " res" << res;
return res;
}
- res = new_tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save new tablet meta. res=" << res
- << ", new_tablet=" << base_tablet->full_name();
- return res;
- }
+ new_tablet->save_meta();
- return res;
+ return OLAP_SUCCESS;
}
OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params) {
@@ -1794,7 +1765,7 @@ PROCESS_ALTER_EXIT:
{
// save tablet meta here because rowset meta is not saved during add rowset
WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr());
- res = sc_params.new_tablet->save_meta();
+ sc_params.new_tablet->save_meta();
}
if (res == OLAP_SUCCESS) {
Version test_version(0, end_version);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8d96b9b..5119be0 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -70,21 +70,17 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
_last_cumu_compaction_failure_millis(0),
_last_base_compaction_failure_millis(0),
_last_cumu_compaction_success_millis(0),
- _last_base_compaction_success_millis(0) {
+ _last_base_compaction_success_millis(0),
+ _cumulative_point(kInvalidCumulativePoint) {
_gen_tablet_path();
_rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas());
}
-Tablet::~Tablet() {
- _rs_version_map.clear();
- _inc_rs_version_map.clear();
-}
-
OLAPStatus Tablet::_init_once_action() {
OLAPStatus res = OLAP_SUCCESS;
VLOG(3) << "begin to load tablet. tablet=" << full_name()
<< ", version_size=" << _tablet_meta->version_count();
- for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
+ for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
Version version = rs_meta->version();
RowsetSharedPtr rowset;
res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset);
@@ -115,7 +111,6 @@ OLAPStatus Tablet::_init_once_action() {
_inc_rs_version_map[version] = std::move(rowset);
}
- _cumulative_point = -1;
return res;
}
@@ -135,16 +130,12 @@ OLAPStatus Tablet::set_tablet_state(TabletState state) {
// should save tablet meta to remote meta store
// if it's a primary replica
-OLAPStatus Tablet::save_meta() {
- OLAPStatus res = _tablet_meta->save_meta(_data_dir);
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path();
- }
+void Tablet::save_meta() {
+ auto res = _tablet_meta->save_meta(_data_dir);
+ CHECK_EQ(res, OLAP_SUCCESS) << "fail to save tablet_meta. res=" << res << ", root=" << _data_dir->path();
// User could directly update tablet schema by _tablet_meta,
// So we need to refetch schema again
_schema = _tablet_meta->tablet_schema();
-
- return res;
}
OLAPStatus Tablet::revise_tablet_meta(
@@ -169,19 +160,10 @@ OLAPStatus Tablet::revise_tablet_meta(
<< full_name() << ", version=" << version << "]";
}
- if (res != OLAP_SUCCESS) {
- break;
- }
-
for (auto& rs_meta : rowsets_to_clone) {
new_tablet_meta->add_rs_meta(rs_meta);
}
-
- if (res != OLAP_SUCCESS) {
- break;
- }
-
- VLOG(3) << "load rowsets successfully when clone. tablet=" << full_name()
+ VLOG(3) << "load rowsets successfully when clone. tablet=" << full_name()
<< ", added rowset size=" << rowsets_to_clone.size();
// save and reload tablet_meta
res = new_tablet_meta->save_meta(_data_dir);
@@ -194,6 +176,7 @@ OLAPStatus Tablet::revise_tablet_meta(
for (auto& version : versions_to_delete) {
auto it = _rs_version_map.find(version);
+ DCHECK(it != _rs_version_map.end());
StorageEngine::instance()->add_unused_rowset(it->second);
_rs_version_map.erase(it);
}
@@ -207,7 +190,7 @@ OLAPStatus Tablet::revise_tablet_meta(
RowsetSharedPtr rowset;
res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to init rowset. version=" << version.first << "-" << version.second;
+ LOG(WARNING) << "fail to init rowset. version=" << version;
return res;
}
_rs_version_map[version] = std::move(rowset);
@@ -241,14 +224,10 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
// but it should be removed in multi path version
for (auto& it : _rs_version_map) {
if (rowset->version().contains(it.first) && rowset->version() != it.first) {
- if (it.second == nullptr) {
- LOG(FATAL) << "there exist a version=" << it.first
- << " contains the input rs with version=" << rowset->version()
- << ", but the related rs is null";
- return OLAP_ERR_PUSH_ROWSET_NOT_FOUND;
- } else {
- rowsets_to_delete.push_back(it.second);
- }
+ CHECK(it.second != nullptr) << "there exist a version=" << it.first
+ << " contains the input rs with version=" << rowset->version()
+ << ", but the related rs is null";
+ rowsets_to_delete.push_back(it.second);
}
}
modify_rowsets(std::vector<RowsetSharedPtr>(), rowsets_to_delete);
@@ -266,31 +245,23 @@ OLAPStatus Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
return OLAP_SUCCESS;
}
-OLAPStatus Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
- const vector<RowsetSharedPtr>& to_delete) {
+void Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
+ const vector<RowsetSharedPtr>& to_delete) {
vector<RowsetMetaSharedPtr> rs_metas_to_add;
for (auto& rs : to_add) {
rs_metas_to_add.push_back(rs->rowset_meta());
+ _rs_version_map[rs->version()] = rs;
+ ++_newly_created_rowset_num;
}
vector<RowsetMetaSharedPtr> rs_metas_to_delete;
for (auto& rs : to_delete) {
rs_metas_to_delete.push_back(rs->rowset_meta());
+ _rs_version_map.erase(rs->version());
}
_tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete);
- for (auto& rs : to_delete) {
- auto it = _rs_version_map.find(rs->version());
- _rs_version_map.erase(it);
- }
-
- for (auto& rs : to_add) {
- _rs_version_map[rs->version()] = rs;
- ++_newly_created_rowset_num;
- }
-
_rs_graph.reconstruct_rowset_graph(_tablet_meta->all_rs_metas());
- return OLAP_SUCCESS;
}
// snapshot manager may call this api to check if version exists, so that
@@ -298,16 +269,14 @@ OLAPStatus Tablet::modify_rowsets(const vector<RowsetSharedPtr>& to_add,
const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version) const {
auto iter = _rs_version_map.find(version);
if (iter == _rs_version_map.end()) {
- VLOG(3) << "no rowset for version:" << version.first << "-" << version.second
- << ", tablet: " << full_name();
+ VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name();
return nullptr;
}
- RowsetSharedPtr rowset = iter->second;
- return rowset;
+ return iter->second;
}
// This function only be called by SnapshotManager to perform incremental clone.
-// It will be called under protected of _metalock(SnapshotManager will fetch it manually),
+// It will be called under protected of _meta_lock(SnapshotManager will fetch it manually),
// so it is no need to lock here.
const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) const {
auto iter = _inc_rs_version_map.find(version);
@@ -315,8 +284,7 @@ const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version)
VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name();
return nullptr;
}
- RowsetSharedPtr rowset = iter->second;
- return rowset;
+ return iter->second;
}
// Already under _meta_lock
@@ -326,15 +294,10 @@ const RowsetSharedPtr Tablet::rowset_with_max_version() const {
return nullptr;
}
- DCHECK(_rs_version_map.find(max_version) != _rs_version_map.end())
- << "invalid version:" << max_version;
auto iter = _rs_version_map.find(max_version);
- if (iter == _rs_version_map.end()) {
- LOG(WARNING) << "no rowset for version:" << max_version;
- return nullptr;
- }
- RowsetSharedPtr rowset = iter->second;
- return rowset;
+ DCHECK(_rs_version_map.find(max_version) != _rs_version_map.end())
+ << "invalid version:" << max_version;
+ return iter->second;
}
RowsetSharedPtr Tablet::_rowset_with_largest_size() {
@@ -373,15 +336,12 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
void Tablet::_delete_inc_rowset_by_version(const Version& version,
const VersionHash& version_hash) {
// delete incremental rowset from map
- auto it = _inc_rs_version_map.find(version);
- if (it != _inc_rs_version_map.end()) {
- _inc_rs_version_map.erase(it);
- }
+ _inc_rs_version_map.erase(version);
+
RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_inc_rs_meta_by_version(version);
if (rowset_meta == nullptr) {
return;
}
-
_tablet_meta->delete_inc_rs_meta_by_version(version);
VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version;
}
@@ -412,10 +372,7 @@ void Tablet::delete_expired_inc_rowsets() {
<< ", version=" << pair.first;
}
- if (save_meta() != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save tablet_meta when delete expire incremental data."
- << "tablet=" << full_name();
- }
+ save_meta();
}
OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
@@ -433,15 +390,13 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
<< ", missed version for version:" << spec_version;
_print_missed_versions(missed_versions);
}
- return status;
}
return status;
}
OLAPStatus Tablet::check_version_integrity(const Version& version) {
- vector<Version> span_versions;
ReadLock rdlock(&_meta_lock);
- return capture_consistent_versions(version, &span_versions);
+ return capture_consistent_versions(version, nullptr);
}
// If any rowset contains the specific version, it means the version already exist
@@ -457,8 +412,9 @@ bool Tablet::check_version_exist(const Version& version) const {
void Tablet::list_versions(vector<Version>* versions) const {
DCHECK(versions != nullptr && versions->empty());
+ versions->reserve(_rs_version_map.size());
// versions vector is not sorted.
- for (auto& it : _rs_version_map) {
+ for (const auto& it : _rs_version_map) {
versions->push_back(it.first);
}
}
@@ -474,6 +430,7 @@ OLAPStatus Tablet::capture_consistent_rowsets(const Version& spec_version,
OLAPStatus Tablet::_capture_consistent_rowsets_unlocked(const vector<Version>& version_path,
vector<RowsetSharedPtr>* rowsets) const {
DCHECK(rowsets != nullptr && rowsets->empty());
+ rowsets->reserve(version_path.size());
for (auto& version : version_path) {
auto it = _rs_version_map.find(version);
if (it == _rs_version_map.end()) {
@@ -496,7 +453,7 @@ OLAPStatus Tablet::capture_rs_readers(const Version& spec_version,
OLAPStatus Tablet::capture_rs_readers(const vector<Version>& version_path,
vector<RowsetReaderSharedPtr>* rs_readers) const {
- DCHECK(rs_readers != NULL && rs_readers->empty());
+ DCHECK(rs_readers != nullptr && rs_readers->empty());
for (auto version : version_path) {
auto it = _rs_version_map.find(version);
if (it == _rs_version_map.end()) {
@@ -535,10 +492,10 @@ AlterTabletTaskSharedPtr Tablet::alter_task() {
return _tablet_meta->alter_task();
}
-OLAPStatus Tablet::add_alter_task(int64_t related_tablet_id,
- int32_t related_schema_hash,
- const vector<Version>& versions_to_alter,
- const AlterTabletType alter_type) {
+void Tablet::add_alter_task(int64_t related_tablet_id,
+ int32_t related_schema_hash,
+ const vector<Version>& versions_to_alter,
+ const AlterTabletType alter_type) {
AlterTabletTask alter_task;
alter_task.set_alter_state(ALTER_RUNNING);
alter_task.set_related_tablet_id(related_tablet_id);
@@ -550,7 +507,6 @@ OLAPStatus Tablet::add_alter_task(int64_t related_tablet_id,
<< ", related_tablet_id " << related_tablet_id
<< ", related_schema_hash " << related_schema_hash
<< ", alter_type " << alter_type;
- return OLAP_SUCCESS;
}
void Tablet::delete_alter_task() {
@@ -569,17 +525,16 @@ OLAPStatus Tablet::recover_tablet_until_specfic_version(const int64_t& spec_vers
bool Tablet::can_do_compaction() {
// 如果table正在做schema change,则通过选路判断数据是否转换完成
- // 如果选路成功,则转换完成,可以进行BE
- // 如果选路失败,则转换未完成,不能进行BE
+ // 如果选路成功,则转换完成,可以进行compaction
+ // 如果选路失败,则转换未完成,不能进行compaction
ReadLock rdlock(&_meta_lock);
const RowsetSharedPtr lastest_delta = rowset_with_max_version();
- if (lastest_delta == NULL) {
+ if (lastest_delta == nullptr) {
return false;
}
Version test_version = Version(0, lastest_delta->end_version());
- vector<Version> path_versions;
- if (OLAP_SUCCESS != capture_consistent_versions(test_version, &path_versions)) {
+ if (OLAP_SUCCESS != capture_consistent_versions(test_version, nullptr)) {
return false;
}
@@ -627,7 +582,7 @@ const uint32_t Tablet::calc_base_compaction_score() const {
}
void Tablet::compute_version_hash_from_rowsets(
- const std::vector<RowsetSharedPtr>& rowsets, VersionHash* version_hash) const {
+ const std::vector<RowsetSharedPtr>& rowsets, VersionHash* version_hash) {
DCHECK(version_hash != nullptr) << "invalid parameter, version_hash is nullptr";
int64_t v_hash = 0;
// version hash is useless since Doris version 0.11
@@ -637,7 +592,6 @@ void Tablet::compute_version_hash_from_rowsets(
*version_hash = v_hash;
}
-
void Tablet::calc_missed_versions(int64_t spec_version, vector<Version>* missed_versions) {
ReadLock rdlock(&_meta_lock);
calc_missed_versions_unlocked(spec_version, missed_versions);
@@ -666,7 +620,7 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
for (const Version& version : existing_versions) {
if (version.first > last_version + 1) {
for (int64_t i = last_version + 1; i < version.first; ++i) {
- missed_versions->emplace_back(i, i);
+ missed_versions->emplace_back(Version(i, i));
}
}
last_version = version.second;
@@ -675,18 +629,18 @@ void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
}
}
for (int64_t i = last_version + 1; i <= spec_version; ++i) {
- missed_versions->emplace_back(i, i);
+ missed_versions->emplace_back(Version(i, i));
}
}
-OLAPStatus Tablet::max_continuous_version_from_begining(Version* version,
- VersionHash* v_hash) {
+void Tablet::max_continuous_version_from_begining(Version* version,
+ VersionHash* v_hash) {
ReadLock rdlock(&_meta_lock);
- return _max_continuous_version_from_begining_unlocked(version, v_hash);
+ _max_continuous_version_from_begining_unlocked(version, v_hash);
}
-OLAPStatus Tablet::_max_continuous_version_from_begining_unlocked(Version* version,
- VersionHash* v_hash) const {
+void Tablet::_max_continuous_version_from_begining_unlocked(Version* version,
+ VersionHash* v_hash) const {
vector<pair<Version, VersionHash>> existing_versions;
for (auto& rs : _tablet_meta->all_rs_metas()) {
existing_versions.emplace_back(rs->version() , rs->version_hash());
@@ -710,15 +664,14 @@ OLAPStatus Tablet::_max_continuous_version_from_begining_unlocked(Version* versi
}
*version = max_continuous_version;
*v_hash = max_continuous_version_hash;
- return OLAP_SUCCESS;
}
-OLAPStatus Tablet::calculate_cumulative_point() {
+void Tablet::calculate_cumulative_point() {
WriteLock wrlock(&_meta_lock);
- if (_cumulative_point != -1) {
+ if (_cumulative_point != kInvalidCumulativePoint) {
// only calculate the point once.
// after that, cumulative point will be updated along with compaction process.
- return OLAP_SUCCESS;
+ return;
}
std::list<RowsetMetaSharedPtr> existing_rss;
@@ -755,21 +708,15 @@ OLAPStatus Tablet::calculate_cumulative_point() {
prev_version = rs->version().second;
_cumulative_point = prev_version + 1;
}
- return OLAP_SUCCESS;
}
OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings,
const OlapTuple& end_key_strings,
uint64_t request_block_row_count,
vector<OlapTuple>* ranges) {
- if (ranges == nullptr) {
- LOG(WARNING) << "parameter end_row is null.";
- return OLAP_ERR_INPUT_PARAMETER_ERROR;
- }
+ DCHECK(ranges != nullptr);
RowCursor start_key;
- RowCursor end_key;
-
// 如果有startkey,用startkey初始化;反之则用minkey初始化
if (start_key_strings.size() > 0) {
if (start_key.init_scan_key(_schema, start_key_strings.values()) != OLAP_SUCCESS) {
@@ -791,6 +738,7 @@ OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings,
start_key.build_min_key();
}
+ RowCursor end_key;
// 和startkey一样处理,没有则用maxkey初始化
if (end_key_strings.size() > 0) {
if (OLAP_SUCCESS != end_key.init_scan_key(_schema, end_key_strings.values())) {
@@ -832,17 +780,17 @@ void Tablet::delete_all_files() {
// we have to call list_versions first, or else error occurs when
// removing hash_map item and iterating hash_map concurrently.
ReadLock rdlock(&_meta_lock);
- for (auto it = _rs_version_map.begin(); it != _rs_version_map.end(); ++it) {
- it->second->remove();
+ for (auto it : _rs_version_map) {
+ it.second->remove();
}
_rs_version_map.clear();
- for (auto it = _inc_rs_version_map.begin(); it != _inc_rs_version_map.end(); ++it) {
- it->second->remove();
+ for (auto it : _inc_rs_version_map) {
+ it.second->remove();
}
_inc_rs_version_map.clear();
}
-bool Tablet::check_path(const std::string& path_to_check) {
+bool Tablet::check_path(const std::string& path_to_check) const {
ReadLock rdlock(&_meta_lock);
if (path_to_check == _tablet_path) {
return true;
@@ -911,14 +859,10 @@ OLAPStatus Tablet::_contains_version(const Version& version) {
// because the value type is std::shared_ptr, when will it be nullptr?
// In addition, in this class, there are many places that do not make this judgment
// when access _rs_version_map's value.
- if (it.second == nullptr) {
- LOG(FATAL) << "there exist a version=" << it.first
- << " contains the input rs with version=" << version
- << ", but the related rs is null";
- return OLAP_ERR_PUSH_ROWSET_NOT_FOUND;
- } else {
- return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
- }
+ CHECK(it.second != nullptr) << "there exist a version=" << it.first
+ << " contains the input rs with version=" << version
+ << ", but the related rs is null";
+ return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
}
}
@@ -954,7 +898,7 @@ void Tablet::pick_candicate_rowsets_to_base_compaction(vector<RowsetSharedPtr>*
}
// For http compaction action
-OLAPStatus Tablet::get_compaction_status(std::string* json_result) {
+void Tablet::get_compaction_status(std::string* json_result) {
rapidjson::Document root;
root.SetObject();
@@ -962,10 +906,13 @@ OLAPStatus Tablet::get_compaction_status(std::string* json_result) {
std::vector<bool> delete_flags;
{
ReadLock rdlock(&_meta_lock);
+ rowsets.reserve(_rs_version_map.size());
for (auto& it : _rs_version_map) {
rowsets.push_back(it.second);
}
std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
+
+ delete_flags.reserve(rowsets.size());
for (auto& rs : rowsets) {
delete_flags.push_back(version_for_delete_predicate(rs->version()));
}
@@ -1008,29 +955,28 @@ OLAPStatus Tablet::get_compaction_status(std::string* json_result) {
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
root.Accept(writer);
*json_result = std::string(strbuf.GetString());
-
- return OLAP_SUCCESS;
}
-OLAPStatus Tablet::do_tablet_meta_checkpoint() {
+void Tablet::do_tablet_meta_checkpoint() {
WriteLock store_lock(&_meta_store_lock);
if (_newly_created_rowset_num == 0) {
- return OLAP_SUCCESS;
+ return;
}
if (UnixMillis() - _last_checkpoint_time < config::tablet_meta_checkpoint_min_interval_secs * 1000
&& _newly_created_rowset_num < config::tablet_meta_checkpoint_min_new_rowsets_num) {
- return OLAP_SUCCESS;
+ return;
}
+
// hold read-lock other than write-lock, because it will not modify meta structure
ReadLock rdlock(&_meta_lock);
if (tablet_state() != TABLET_RUNNING) {
LOG(INFO) << "tablet is under state=" << tablet_state()
<< ", not running, skip do checkpoint"
<< ", tablet=" << full_name();
- return OLAP_SUCCESS;
+ return;
}
LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name();
- RETURN_NOT_OK(save_meta());
+ save_meta();
// if save meta successfully, then should remove the rowset meta existing in tablet
// meta from rowset meta store
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
@@ -1048,7 +994,6 @@ OLAPStatus Tablet::do_tablet_meta_checkpoint() {
}
_newly_created_rowset_num = 0;
_last_checkpoint_time = UnixMillis();
- return OLAP_SUCCESS;
}
bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) {
@@ -1131,7 +1076,7 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
// there are some rowset meta in local meta store and in in-memory tablet meta
// but not in tablet meta in local meta store
// TODO(lingbin): do we need _meta_lock?
-void Tablet::generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) {
+void Tablet::generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const {
TabletMetaPB tablet_meta_pb;
_tablet_meta->to_meta_pb(&tablet_meta_pb);
new_tablet_meta->init_from_pb(tablet_meta_pb);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index bb64f0f..8cb6565 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -51,7 +51,6 @@ public:
DataDir* data_dir = nullptr);
Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
- ~Tablet();
OLAPStatus init();
inline bool init_succeeded();
@@ -69,12 +68,11 @@ public:
// Property encapsulated in TabletMeta
inline const TabletMetaSharedPtr tablet_meta();
- OLAPStatus save_meta();
+ void save_meta();
// Used in clone task, to update local meta when finishing a clone job
OLAPStatus revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
-
inline TabletUid tablet_uid() const;
inline int64_t table_id() const;
// Returns a string can be used to uniquely identify a tablet.
@@ -111,8 +109,8 @@ public:
// operation in rowsets
OLAPStatus add_rowset(RowsetSharedPtr rowset, bool need_persist = true);
- OLAPStatus modify_rowsets(const vector<RowsetSharedPtr>& to_add,
- const vector<RowsetSharedPtr>& to_delete);
+ void modify_rowsets(const vector<RowsetSharedPtr>& to_add,
+ const vector<RowsetSharedPtr>& to_delete);
// _rs_version_map and _inc_rs_version_map should be protected by _meta_lock
// The caller must call hold _meta_lock when call this two function.
@@ -144,7 +142,7 @@ public:
// message for alter task
AlterTabletTaskSharedPtr alter_task();
- OLAPStatus add_alter_task(int64_t related_tablet_id, int32_t related_schema_hash,
+ void add_alter_task(int64_t related_tablet_id, int32_t related_schema_hash,
const vector<Version>& versions_to_alter,
const AlterTabletType alter_type);
void delete_alter_task();
@@ -177,8 +175,8 @@ public:
bool can_do_compaction();
const uint32_t calc_cumulative_compaction_score() const;
const uint32_t calc_base_compaction_score() const;
- void compute_version_hash_from_rowsets(const std::vector<RowsetSharedPtr>& rowsets,
- VersionHash* version_hash) const;
+ static void compute_version_hash_from_rowsets(const std::vector<RowsetSharedPtr>& rowsets,
+ VersionHash* version_hash);
// operation for clone
void calc_missed_versions(int64_t spec_version, vector<Version>* missed_versions);
@@ -187,7 +185,7 @@ public:
// This function to find max continous version from the beginning.
// For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, then 3 is target.
- OLAPStatus max_continuous_version_from_begining(Version* version, VersionHash* v_hash);
+ void max_continuous_version_from_begining(Version* version, VersionHash* v_hash);
// operation for query
OLAPStatus split_range(
@@ -217,7 +215,7 @@ public:
void delete_all_files();
- bool check_path(const std::string& check_path);
+ bool check_path(const std::string& check_path) const;
bool check_rowset_id(const RowsetId& rowset_id);
OLAPStatus set_partition_id(int64_t partition_id);
@@ -228,7 +226,7 @@ public:
std::vector<RowsetSharedPtr>* candidate_rowsets);
void pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
- OLAPStatus calculate_cumulative_point();
+ void calculate_cumulative_point();
// TODO(ygl):
inline bool is_primary_replica() { return false; }
@@ -237,24 +235,24 @@ public:
// eco mode also means save money in palo
inline bool in_eco_mode() { return false; }
- OLAPStatus do_tablet_meta_checkpoint();
+ void do_tablet_meta_checkpoint();
bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta);
void build_tablet_report_info(TTabletInfo* tablet_info);
- void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta);
+ void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const;
// return a json string to show the compaction status of this tablet
- OLAPStatus get_compaction_status(std::string* json_result);
+ void get_compaction_status(std::string* json_result);
private:
OLAPStatus _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
bool _contains_rowset(const RowsetId rowset_id);
OLAPStatus _contains_version(const Version& version);
- OLAPStatus _max_continuous_version_from_begining_unlocked(Version* version,
- VersionHash* v_hash) const ;
+ void _max_continuous_version_from_begining_unlocked(Version* version,
+ VersionHash* v_hash) const ;
void _gen_tablet_path();
RowsetSharedPtr _rowset_with_largest_size();
void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash);
@@ -262,6 +260,7 @@ private:
vector<RowsetSharedPtr>* rowsets) const;
private:
+ static const int64_t kInvalidCumulativePoint = -1;
TabletState _state;
TabletMetaSharedPtr _tablet_meta;
TabletSchema _schema;
@@ -281,7 +280,7 @@ private:
// TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to
// explain how these two locks work together.
- RWMutex _meta_lock;
+ mutable RWMutex _meta_lock;
// A new load job will produce a new rowset, which will be inserted into both _rs_version_map
// and _inc_rs_version_map. Only the most recent rowsets are kept in _inc_rs_version_map to
// reduce the amount of data that needs to be copied during the clone task.
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index ca78d05..5239e1c 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -170,9 +170,7 @@ OLAPStatus TabletManager::_add_tablet_to_map_unlocked(TTabletId tablet_id, Schem
OLAPStatus res = OLAP_SUCCESS;
if (update_meta) {
// call tablet save meta in order to valid the meta
- RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute(
- "failed to save new tablet's meta. tablet_id=$0, schema_hash=$1",
- tablet_id, schema_hash));
+ tablet->save_meta();
}
if (drop_old) {
// If the new tablet is fresher than the existing one, then replace
@@ -529,11 +527,7 @@ OLAPStatus TabletManager::_drop_tablet_unlocked(
&& related_alter_task->related_tablet_id() == tablet_id
&& related_alter_task->related_schema_hash() == schema_hash) {
related_tablet->delete_alter_task();
- res = related_tablet->save_meta();
- if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "fail to save tablet header. res=" << res
- << ", tablet=" << related_tablet->full_name();
- }
+ related_tablet->save_meta();
}
related_tablet->release_header_lock();
res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files);
@@ -1340,8 +1334,7 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
// and the tablet will be loaded at restart time.
// To avoid this exception, we first set the state of the tablet to `SHUTDOWN`.
tablet->set_tablet_state(TABLET_SHUTDOWN);
- RETURN_NOT_OK_LOG(tablet->save_meta(), Substitute(
- "fail to drop tablet.tablet_id=$0, schema_hash=$1", tablet_id, schema_hash));
+ tablet->save_meta();
{
WriteLock wlock(&_shutdown_tablets_lock);
_shutdown_tablets.push_back(tablet);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org