You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/04 04:29:07 UTC
[incubator-doris] branch master updated: [feature] support convert alpha rowset (#9890)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3743f19369 [feature] support convert alpha rowset (#9890)
3743f19369 is described below
commit 3743f19369654682377c77b9c903ed417847874b
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Sat Jun 4 12:29:03 2022 +0800
[feature] support convert alpha rowset (#9890)
Add alpha rowset to beta rowset convert to convert rowset automatically. We will remove alpha rowset's code after 1.1.
---
be/src/common/config.h | 6 ++
be/src/olap/CMakeLists.txt | 1 +
be/src/olap/convert_rowset.cpp | 140 +++++++++++++++++++++++++++++++++++++++++
be/src/olap/convert_rowset.h | 42 +++++++++++++
be/src/olap/olap_server.cpp | 50 +++++++++++++++
be/src/olap/storage_engine.cpp | 6 ++
be/src/olap/storage_engine.h | 5 ++
be/src/olap/tablet.cpp | 9 +++
be/src/olap/tablet.h | 2 +
be/src/olap/tablet_manager.cpp | 14 +++++
be/src/olap/tablet_manager.h | 2 +
11 files changed, 277 insertions(+)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8205ac2537..fe40be22a8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -294,6 +294,12 @@ CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds
// This config can be set to limit thread number in compaction thread pool.
CONF_mInt32(max_compaction_threads, "10");
+// This config can be set to limit thread number in convert rowset thread pool.
+CONF_mInt32(convert_rowset_thread_num, "0");
+
+// initial sleep interval in seconds of scan alpha rowset
+CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3");
+
// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 657ebe3518..2c02122883 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -38,6 +38,7 @@ add_library(Olap STATIC
compaction_permit_limiter.cpp
comparison_predicate.cpp
compress.cpp
+ convert_rowset.cpp
cumulative_compaction.cpp
cumulative_compaction_policy.cpp
delete_handler.cpp
diff --git a/be/src/olap/convert_rowset.cpp b/be/src/olap/convert_rowset.cpp
new file mode 100644
index 0000000000..c71e66c246
--- /dev/null
+++ b/be/src/olap/convert_rowset.cpp
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/convert_rowset.h"
+
+namespace doris {
+
+Status ConvertRowset::do_convert() {
+ if (!_tablet->init_succeeded()) {
+ return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
+ }
+ std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
+ std::try_to_lock);
+ std::unique_lock<std::mutex> cumulative_compaction_lock(
+ _tablet->get_cumulative_compaction_lock(), std::try_to_lock);
+ if (!base_compaction_lock.owns_lock() || !cumulative_compaction_lock.owns_lock()) {
+ LOG(INFO) << "The tablet is under compaction. tablet=" << _tablet->full_name();
+ return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR);
+ }
+
+ std::vector<RowsetSharedPtr> alpah_rowsets;
+ _tablet->find_alpha_rowsets(&alpah_rowsets);
+
+ Merger::Statistics stats;
+ Status res;
+ const size_t max_convert_row_count = 20000000;
+ size_t row_count = 0;
+ for (size_t i = 0; i < alpah_rowsets.size(); ++i) {
+ Version output_version =
+ Version(alpah_rowsets[i]->start_version(), alpah_rowsets[i]->end_version());
+
+ RowsetReaderSharedPtr input_rs_reader;
+ RETURN_NOT_OK(alpah_rowsets[i]->create_reader(&input_rs_reader));
+
+ std::unique_ptr<RowsetWriter> output_rs_writer;
+ RETURN_NOT_OK(_tablet->create_rowset_writer(output_version, VISIBLE, NONOVERLAPPING,
+ &output_rs_writer));
+ res = Merger::merge_rowsets(_tablet, ReaderType::READER_BASE_COMPACTION, {input_rs_reader},
+ output_rs_writer.get(), &stats);
+
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to convert rowset. res=" << res
+ << ", tablet=" << _tablet->full_name();
+ return res;
+ } else {
+ auto output_rowset = output_rs_writer->build();
+ if (output_rowset == nullptr) {
+ LOG(WARNING) << "rowset writer build failed"
+ << ", tablet=" << _tablet->full_name();
+ return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
+ }
+
+ RETURN_NOT_OK(check_correctness(alpah_rowsets[i], output_rowset, stats));
+
+ row_count += alpah_rowsets[i]->num_rows();
+
+ RETURN_NOT_OK(_modify_rowsets(alpah_rowsets[i], output_rowset));
+
+ LOG(INFO) << "succeed to convert rowset"
+ << ". tablet=" << _tablet->full_name()
+ << ", output_version=" << output_version
+ << ", disk=" << _tablet->data_dir()->path();
+
+ if (row_count >= max_convert_row_count) {
+ break;
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status ConvertRowset::check_correctness(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset,
+ const Merger::Statistics& stats) {
+ // 1. check row number
+ if (input_rowset->num_rows() !=
+ output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) {
+ LOG(WARNING) << "row_num does not match between input and output! "
+ << "input_row_num=" << input_rowset->num_rows()
+ << ", merged_row_num=" << stats.merged_rows
+ << ", filtered_row_num=" << stats.filtered_rows
+ << ", output_row_num=" << output_rowset->num_rows();
+
+ // ATTN(cmy): We found that the num_rows in some rowset meta may be set to the wrong value,
+ // but it is not known which version of the code has the problem. So when the convert
+ // result is inconsistent, we then try to verify by num_rows recorded in segment_groups.
+ // If the check passes, ignore the error and set the correct value in the output rowset meta
+ // to fix this problem.
+ // Only handle alpha rowset because we only find this bug in alpha rowset
+ int64_t num_rows = _get_input_num_rows_from_seg_grps(input_rowset);
+ if (num_rows == -1) {
+ return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+ }
+ if (num_rows != output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) {
+ // If it is still incorrect, it may be another problem
+ LOG(WARNING) << "row_num got from seg groups does not match between cumulative input "
+ "and output! "
+ << "input_row_num=" << num_rows << ", merged_row_num=" << stats.merged_rows
+ << ", filtered_row_num=" << stats.filtered_rows
+ << ", output_row_num=" << output_rowset->num_rows();
+
+ return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+ }
+ }
+ return Status::OK();
+}
+
+int64_t ConvertRowset::_get_input_num_rows_from_seg_grps(RowsetSharedPtr rowset) {
+ int64_t num_rows = 0;
+ for (auto& seg_grp : rowset->rowset_meta()->alpha_rowset_extra_meta_pb().segment_groups()) {
+ num_rows += seg_grp.num_rows();
+ }
+ return num_rows;
+}
+Status ConvertRowset::_modify_rowsets(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset) {
+ std::vector<RowsetSharedPtr> input_rowsets;
+ input_rowsets.push_back(input_rowset);
+
+ std::vector<RowsetSharedPtr> output_rowsets;
+ output_rowsets.push_back(output_rowset);
+
+ std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
+ RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, input_rowsets, true));
+ _tablet->save_meta();
+ return Status::OK();
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/convert_rowset.h b/be/src/olap/convert_rowset.h
new file mode 100644
index 0000000000..a691d38624
--- /dev/null
+++ b/be/src/olap/convert_rowset.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/merger.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+
+namespace doris {
+class DataDir;
+class ConvertRowset {
+public:
+ ConvertRowset(TabletSharedPtr tablet) : _tablet(tablet) {}
+ Status do_convert();
+
+private:
+ Status check_correctness(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset,
+ const Merger::Statistics& stats);
+ int64_t _get_input_num_rows_from_seg_grps(RowsetSharedPtr rowset);
+ Status _modify_rowsets(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset);
+
+private:
+ TabletSharedPtr _tablet;
+
+ DISALLOW_COPY_AND_ASSIGN(ConvertRowset);
+};
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index f4732cbe04..2ad264d2bc 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -28,6 +28,7 @@
#include "agent/cgroups_mgr.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
+#include "olap/convert_rowset.h"
#include "olap/cumulative_compaction.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -72,6 +73,21 @@ Status StorageEngine::start_bg_threads() {
.set_max_threads(max_thread_num)
.build(&_compaction_thread_pool);
+ int32_t convert_rowset_thread_num = config::convert_rowset_thread_num;
+ if (convert_rowset_thread_num > 0) {
+ ThreadPoolBuilder("ConvertRowsetTaskThreadPool")
+ .set_min_threads(convert_rowset_thread_num)
+ .set_max_threads(convert_rowset_thread_num)
+ .build(&_convert_rowset_thread_pool);
+
+ // alpha rowset scan thread
+ RETURN_IF_ERROR(Thread::create(
+ "StorageEngine", "alpha_rowset_scan_thread",
+ [this]() { this->_alpha_rowset_scan_thread_callback(); },
+ &_alpha_rowset_scan_thread));
+ LOG(INFO) << "alpha rowset scan thread started";
+ }
+
// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "compaction_tasks_producer_thread",
@@ -304,6 +320,40 @@ void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& dat
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
+void StorageEngine::_alpha_rowset_scan_thread_callback() {
+ LOG(INFO) << "try to start alpha rowset scan thread!";
+
+ auto scan_interval_sec = config::scan_alpha_rowset_min_interval_sec;
+ auto max_convert_task = config::convert_rowset_thread_num * 2;
+ do {
+ std::vector<TabletSharedPtr> tablet_have_alpha_rowset;
+ _tablet_manager->find_tablet_have_alpha_rowset(tablet_have_alpha_rowset);
+
+ std::random_device rd;
+ std::mt19937 g(rd());
+ std::shuffle(tablet_have_alpha_rowset.begin(), tablet_have_alpha_rowset.end(), g);
+
+ for (int i = 0; i < max_convert_task && i < tablet_have_alpha_rowset.size(); ++i) {
+ auto tablet = tablet_have_alpha_rowset[i];
+ auto st = _convert_rowset_thread_pool->submit_func([=]() {
+ CgroupsMgr::apply_system_cgroup();
+ auto convert_rowset = std::make_shared<ConvertRowset>(tablet);
+ convert_rowset->do_convert();
+ });
+ if (!st.ok()) {
+ LOG(WARNING) << "submit convert tablet tasks failed.";
+ }
+ }
+
+ if (tablet_have_alpha_rowset.size() == 0) {
+ scan_interval_sec = std::min(3600, scan_interval_sec * 2);
+ } else {
+ _convert_rowset_thread_pool->wait();
+ scan_interval_sec = config::scan_alpha_rowset_min_interval_sec;
+ }
+ } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(scan_interval_sec)));
+}
+
void StorageEngine::_compaction_tasks_producer_callback() {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 6a1871b8f1..35dc3dd1e1 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -160,6 +160,9 @@ StorageEngine::~StorageEngine() {
if (_compaction_thread_pool) {
_compaction_thread_pool->shutdown();
}
+ if (_convert_rowset_thread_pool) {
+ _convert_rowset_thread_pool->shutdown();
+ }
if (_tablet_meta_checkpoint_thread_pool) {
_tablet_meta_checkpoint_thread_pool->shutdown();
}
@@ -578,6 +581,9 @@ void StorageEngine::stop() {
}
THREAD_JOIN(_compaction_tasks_producer_thread);
+ if (_alpha_rowset_scan_thread) {
+ THREAD_JOIN(_alpha_rowset_scan_thread);
+ }
THREAD_JOIN(_unused_rowset_monitor_thread);
THREAD_JOIN(_garbage_sweeper_thread);
THREAD_JOIN(_disk_stat_monitor_thread);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index b3768cccf3..ce33223c5a 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -253,6 +253,8 @@ private:
void _compaction_tasks_producer_callback();
+ void _alpha_rowset_scan_thread_callback();
+
std::vector<TabletSharedPtr> _generate_compaction_tasks(CompactionType compaction_type,
std::vector<DataDir*>& data_dirs,
bool check_score);
@@ -377,6 +379,9 @@ private:
std::unique_ptr<ThreadPool> _compaction_thread_pool;
+ scoped_refptr<Thread> _alpha_rowset_scan_thread;
+ std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
+
std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
CompactionPermitLimiter _permit_limiter;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a457e3eb42..df7f02a1f9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1051,6 +1051,15 @@ void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
_cumulative_point, candidate_rowsets);
}
+void Tablet::find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const {
+ std::shared_lock rdlock(_meta_lock);
+ for (auto& it : _rs_version_map) {
+ if (it.second->rowset_meta()->rowset_type() == RowsetTypePB::ALPHA_ROWSET) {
+ rowsets->push_back(it.second);
+ }
+ }
+}
+
void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) {
std::shared_lock rdlock(_meta_lock);
for (auto& it : _rs_version_map) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 8f08833c33..a7f1830ef1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -260,6 +260,8 @@ public:
return _tablet_meta->all_beta();
}
+ void find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const;
+
Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state,
const SegmentsOverlapPB& overlap,
std::unique_ptr<RowsetWriter>* rowset_writer);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index c3a682f283..06008d5c78 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -601,6 +601,20 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {
result->__set_tablet_stat_list(*local_cache);
}
+void TabletManager::find_tablet_have_alpha_rowset(std::vector<TabletSharedPtr>& tablets) {
+ for (const auto& tablets_shard : _tablets_shards) {
+ std::shared_lock rdlock(tablets_shard.lock);
+ for (const auto& tablet_map : tablets_shard.tablet_map) {
+ const TabletSharedPtr& tablet_ptr = tablet_map.second;
+ if (!tablet_ptr->all_beta() &&
+ tablet_ptr->can_do_compaction(tablet_ptr->data_dir()->path_hash(),
+ BASE_COMPACTION)) {
+ tablets.push_back(tablet_ptr);
+ }
+ }
+ }
+}
+
TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score,
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 85b4c644ae..1627ab741e 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -140,6 +140,8 @@ public:
void get_all_tablets_storage_format(TCheckStorageFormatResult* result);
+ void find_tablet_have_alpha_rowset(std::vector<TabletSharedPtr>& tablets);
+
private:
// Add a tablet pointer to StorageEngine
// If force, drop the existing tablet add this new one
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org