You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/04 04:29:07 UTC

[incubator-doris] branch master updated: [feature] support convert alpha rowset (#9890)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3743f19369 [feature] support convert alpha rowset (#9890)
3743f19369 is described below

commit 3743f19369654682377c77b9c903ed417847874b
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Sat Jun 4 12:29:03 2022 +0800

    [feature] support convert alpha rowset (#9890)
    
    Add alpha rowset to beta rowset convert to convert rowset automatically. We will remove alpha rowset's code after 1.1.
---
 be/src/common/config.h         |   6 ++
 be/src/olap/CMakeLists.txt     |   1 +
 be/src/olap/convert_rowset.cpp | 140 +++++++++++++++++++++++++++++++++++++++++
 be/src/olap/convert_rowset.h   |  42 +++++++++++++
 be/src/olap/olap_server.cpp    |  50 +++++++++++++++
 be/src/olap/storage_engine.cpp |   6 ++
 be/src/olap/storage_engine.h   |   5 ++
 be/src/olap/tablet.cpp         |   9 +++
 be/src/olap/tablet.h           |   2 +
 be/src/olap/tablet_manager.cpp |  14 +++++
 be/src/olap/tablet_manager.h   |   2 +
 11 files changed, 277 insertions(+)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8205ac2537..fe40be22a8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -294,6 +294,12 @@ CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds
 // This config can be set to limit thread number in compaction thread pool.
 CONF_mInt32(max_compaction_threads, "10");
 
+// This config can be set to limit thread number in convert rowset thread pool.
+CONF_mInt32(convert_rowset_thread_num, "0");
+
+// initial sleep interval in seconds of scan alpha rowset
+CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3");
+
 // Thread count to do tablet meta checkpoint, -1 means use the data directories count.
 CONF_Int32(max_meta_checkpoint_threads, "-1");
 
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 657ebe3518..2c02122883 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -38,6 +38,7 @@ add_library(Olap STATIC
     compaction_permit_limiter.cpp
     comparison_predicate.cpp
     compress.cpp
+    convert_rowset.cpp
     cumulative_compaction.cpp
     cumulative_compaction_policy.cpp
     delete_handler.cpp
diff --git a/be/src/olap/convert_rowset.cpp b/be/src/olap/convert_rowset.cpp
new file mode 100644
index 0000000000..c71e66c246
--- /dev/null
+++ b/be/src/olap/convert_rowset.cpp
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/convert_rowset.h"
+
+namespace doris {
+
+Status ConvertRowset::do_convert() {
+    if (!_tablet->init_succeeded()) {
+        return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
+    }
+    std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
+                                                      std::try_to_lock);
+    std::unique_lock<std::mutex> cumulative_compaction_lock(
+            _tablet->get_cumulative_compaction_lock(), std::try_to_lock);
+    if (!base_compaction_lock.owns_lock() || !cumulative_compaction_lock.owns_lock()) {
+        LOG(INFO) << "The tablet is under compaction. tablet=" << _tablet->full_name();
+        return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR);
+    }
+
+    std::vector<RowsetSharedPtr> alpah_rowsets;
+    _tablet->find_alpha_rowsets(&alpah_rowsets);
+
+    Merger::Statistics stats;
+    Status res;
+    const size_t max_convert_row_count = 20000000;
+    size_t row_count = 0;
+    for (size_t i = 0; i < alpah_rowsets.size(); ++i) {
+        Version output_version =
+                Version(alpah_rowsets[i]->start_version(), alpah_rowsets[i]->end_version());
+
+        RowsetReaderSharedPtr input_rs_reader;
+        RETURN_NOT_OK(alpah_rowsets[i]->create_reader(&input_rs_reader));
+
+        std::unique_ptr<RowsetWriter> output_rs_writer;
+        RETURN_NOT_OK(_tablet->create_rowset_writer(output_version, VISIBLE, NONOVERLAPPING,
+                                                    &output_rs_writer));
+        res = Merger::merge_rowsets(_tablet, ReaderType::READER_BASE_COMPACTION, {input_rs_reader},
+                                    output_rs_writer.get(), &stats);
+
+        if (!res.ok()) {
+            LOG(WARNING) << "fail to convert rowset. res=" << res
+                         << ", tablet=" << _tablet->full_name();
+            return res;
+        } else {
+            auto output_rowset = output_rs_writer->build();
+            if (output_rowset == nullptr) {
+                LOG(WARNING) << "rowset writer build failed"
+                             << ", tablet=" << _tablet->full_name();
+                return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
+            }
+
+            RETURN_NOT_OK(check_correctness(alpah_rowsets[i], output_rowset, stats));
+
+            row_count += alpah_rowsets[i]->num_rows();
+
+            RETURN_NOT_OK(_modify_rowsets(alpah_rowsets[i], output_rowset));
+
+            LOG(INFO) << "succeed to convert rowset"
+                      << ". tablet=" << _tablet->full_name()
+                      << ", output_version=" << output_version
+                      << ", disk=" << _tablet->data_dir()->path();
+
+            if (row_count >= max_convert_row_count) {
+                break;
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status ConvertRowset::check_correctness(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset,
+                                        const Merger::Statistics& stats) {
+    // 1. check row number
+    if (input_rowset->num_rows() !=
+        output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) {
+        LOG(WARNING) << "row_num does not match between input and output! "
+                     << "input_row_num=" << input_rowset->num_rows()
+                     << ", merged_row_num=" << stats.merged_rows
+                     << ", filtered_row_num=" << stats.filtered_rows
+                     << ", output_row_num=" << output_rowset->num_rows();
+
+        // ATTN(cmy): We found that the num_rows in some rowset meta may be set to the wrong value,
+        // but it is not known which version of the code has the problem. So when the convert
+        // result is inconsistent, we then try to verify by num_rows recorded in segment_groups.
+        // If the check passes, ignore the error and set the correct value in the output rowset meta
+        // to fix this problem.
+        // Only handle alpha rowset because we only find this bug in alpha rowset
+        int64_t num_rows = _get_input_num_rows_from_seg_grps(input_rowset);
+        if (num_rows == -1) {
+            return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+        }
+        if (num_rows != output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) {
+            // If it is still incorrect, it may be another problem
+            LOG(WARNING) << "row_num got from seg groups does not match between cumulative input "
+                            "and output! "
+                         << "input_row_num=" << num_rows << ", merged_row_num=" << stats.merged_rows
+                         << ", filtered_row_num=" << stats.filtered_rows
+                         << ", output_row_num=" << output_rowset->num_rows();
+
+            return Status::OLAPInternalError(OLAP_ERR_CHECK_LINES_ERROR);
+        }
+    }
+    return Status::OK();
+}
+
+int64_t ConvertRowset::_get_input_num_rows_from_seg_grps(RowsetSharedPtr rowset) {
+    int64_t num_rows = 0;
+    for (auto& seg_grp : rowset->rowset_meta()->alpha_rowset_extra_meta_pb().segment_groups()) {
+        num_rows += seg_grp.num_rows();
+    }
+    return num_rows;
+}
+Status ConvertRowset::_modify_rowsets(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset) {
+    std::vector<RowsetSharedPtr> input_rowsets;
+    input_rowsets.push_back(input_rowset);
+
+    std::vector<RowsetSharedPtr> output_rowsets;
+    output_rowsets.push_back(output_rowset);
+
+    std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
+    RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, input_rowsets, true));
+    _tablet->save_meta();
+    return Status::OK();
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/convert_rowset.h b/be/src/olap/convert_rowset.h
new file mode 100644
index 0000000000..a691d38624
--- /dev/null
+++ b/be/src/olap/convert_rowset.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/merger.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+
+namespace doris {
+class DataDir;
+class ConvertRowset {
+public:
+    ConvertRowset(TabletSharedPtr tablet) : _tablet(tablet) {}
+    Status do_convert();
+
+private:
+    Status check_correctness(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset,
+                             const Merger::Statistics& stats);
+    int64_t _get_input_num_rows_from_seg_grps(RowsetSharedPtr rowset);
+    Status _modify_rowsets(RowsetSharedPtr input_rowset, RowsetSharedPtr output_rowset);
+
+private:
+    TabletSharedPtr _tablet;
+
+    DISALLOW_COPY_AND_ASSIGN(ConvertRowset);
+};
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index f4732cbe04..2ad264d2bc 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -28,6 +28,7 @@
 #include "agent/cgroups_mgr.h"
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
+#include "olap/convert_rowset.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -72,6 +73,21 @@ Status StorageEngine::start_bg_threads() {
             .set_max_threads(max_thread_num)
             .build(&_compaction_thread_pool);
 
+    int32_t convert_rowset_thread_num = config::convert_rowset_thread_num;
+    if (convert_rowset_thread_num > 0) {
+        ThreadPoolBuilder("ConvertRowsetTaskThreadPool")
+                .set_min_threads(convert_rowset_thread_num)
+                .set_max_threads(convert_rowset_thread_num)
+                .build(&_convert_rowset_thread_pool);
+
+        // alpha rowset scan thread
+        RETURN_IF_ERROR(Thread::create(
+                "StorageEngine", "alpha_rowset_scan_thread",
+                [this]() { this->_alpha_rowset_scan_thread_callback(); },
+                &_alpha_rowset_scan_thread));
+        LOG(INFO) << "alpha rowset scan thread started";
+    }
+
     // compaction tasks producer thread
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "compaction_tasks_producer_thread",
@@ -304,6 +320,40 @@ void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& dat
     } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
 }
 
+void StorageEngine::_alpha_rowset_scan_thread_callback() {
+    LOG(INFO) << "try to start alpha rowset scan thread!";
+
+    auto scan_interval_sec = config::scan_alpha_rowset_min_interval_sec;
+    auto max_convert_task = config::convert_rowset_thread_num * 2;
+    do {
+        std::vector<TabletSharedPtr> tablet_have_alpha_rowset;
+        _tablet_manager->find_tablet_have_alpha_rowset(tablet_have_alpha_rowset);
+
+        std::random_device rd;
+        std::mt19937 g(rd());
+        std::shuffle(tablet_have_alpha_rowset.begin(), tablet_have_alpha_rowset.end(), g);
+
+        for (int i = 0; i < max_convert_task && i < tablet_have_alpha_rowset.size(); ++i) {
+            auto tablet = tablet_have_alpha_rowset[i];
+            auto st = _convert_rowset_thread_pool->submit_func([=]() {
+                CgroupsMgr::apply_system_cgroup();
+                auto convert_rowset = std::make_shared<ConvertRowset>(tablet);
+                convert_rowset->do_convert();
+            });
+            if (!st.ok()) {
+                LOG(WARNING) << "submit convert tablet tasks failed.";
+            }
+        }
+
+        if (tablet_have_alpha_rowset.size() == 0) {
+            scan_interval_sec = std::min(3600, scan_interval_sec * 2);
+        } else {
+            _convert_rowset_thread_pool->wait();
+            scan_interval_sec = config::scan_alpha_rowset_min_interval_sec;
+        }
+    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(scan_interval_sec)));
+}
+
 void StorageEngine::_compaction_tasks_producer_callback() {
 #ifdef GOOGLE_PROFILER
     ProfilerRegisterThread();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 6a1871b8f1..35dc3dd1e1 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -160,6 +160,9 @@ StorageEngine::~StorageEngine() {
     if (_compaction_thread_pool) {
         _compaction_thread_pool->shutdown();
     }
+    if (_convert_rowset_thread_pool) {
+        _convert_rowset_thread_pool->shutdown();
+    }
     if (_tablet_meta_checkpoint_thread_pool) {
         _tablet_meta_checkpoint_thread_pool->shutdown();
     }
@@ -578,6 +581,9 @@ void StorageEngine::stop() {
     }
 
     THREAD_JOIN(_compaction_tasks_producer_thread);
+    if (_alpha_rowset_scan_thread) {
+        THREAD_JOIN(_alpha_rowset_scan_thread);
+    }
     THREAD_JOIN(_unused_rowset_monitor_thread);
     THREAD_JOIN(_garbage_sweeper_thread);
     THREAD_JOIN(_disk_stat_monitor_thread);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index b3768cccf3..ce33223c5a 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -253,6 +253,8 @@ private:
 
     void _compaction_tasks_producer_callback();
 
+    void _alpha_rowset_scan_thread_callback();
+
     std::vector<TabletSharedPtr> _generate_compaction_tasks(CompactionType compaction_type,
                                                             std::vector<DataDir*>& data_dirs,
                                                             bool check_score);
@@ -377,6 +379,9 @@ private:
 
     std::unique_ptr<ThreadPool> _compaction_thread_pool;
 
+    scoped_refptr<Thread> _alpha_rowset_scan_thread;
+    std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
+
     std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
 
     CompactionPermitLimiter _permit_limiter;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a457e3eb42..df7f02a1f9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1051,6 +1051,15 @@ void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
                                                           _cumulative_point, candidate_rowsets);
 }
 
+void Tablet::find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const {
+    std::shared_lock rdlock(_meta_lock);
+    for (auto& it : _rs_version_map) {
+        if (it.second->rowset_meta()->rowset_type() == RowsetTypePB::ALPHA_ROWSET) {
+            rowsets->push_back(it.second);
+        }
+    }
+}
+
 void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) {
     std::shared_lock rdlock(_meta_lock);
     for (auto& it : _rs_version_map) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 8f08833c33..a7f1830ef1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -260,6 +260,8 @@ public:
         return _tablet_meta->all_beta();
     }
 
+    void find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const;
+
     Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state,
                                 const SegmentsOverlapPB& overlap,
                                 std::unique_ptr<RowsetWriter>* rowset_writer);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index c3a682f283..06008d5c78 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -601,6 +601,20 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {
     result->__set_tablet_stat_list(*local_cache);
 }
 
+void TabletManager::find_tablet_have_alpha_rowset(std::vector<TabletSharedPtr>& tablets) {
+    for (const auto& tablets_shard : _tablets_shards) {
+        std::shared_lock rdlock(tablets_shard.lock);
+        for (const auto& tablet_map : tablets_shard.tablet_map) {
+            const TabletSharedPtr& tablet_ptr = tablet_map.second;
+            if (!tablet_ptr->all_beta() &&
+                tablet_ptr->can_do_compaction(tablet_ptr->data_dir()->path_hash(),
+                                              BASE_COMPACTION)) {
+                tablets.push_back(tablet_ptr);
+            }
+        }
+    }
+}
+
 TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
         CompactionType compaction_type, DataDir* data_dir,
         const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score,
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 85b4c644ae..1627ab741e 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -140,6 +140,8 @@ public:
 
     void get_all_tablets_storage_format(TCheckStorageFormatResult* result);
 
+    void find_tablet_have_alpha_rowset(std::vector<TabletSharedPtr>& tablets);
+
 private:
     // Add a tablet pointer to StorageEngine
     // If force, drop the existing tablet add this new one


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org